Inicia novos recursos

Dentre eles estão recurso de adicao do faster-whisper, geração de legenda e integracao com Gemini e Open Router
This commit is contained in:
LeoMortari
2025-10-17 09:27:50 -03:00
commit 0c0a9c3b5c
15 changed files with 997 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

1
__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Top-level package for the video processing pipeline."""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

35
docker-compose.yml Normal file
View File

@@ -0,0 +1,35 @@
services:
video-render-new:
restart: unless-stopped
build: .
container_name: video-render-new
environment:
# RabbitMQ credentials
- RABBITMQ_PASS=${RABBITMQ_PASS}
- RABBITMQ_HOST=${RABBITMQ_HOST}
- RABBITMQ_USER=${RABBITMQ_USER}
- RABBITMQ_PORT=${RABBITMQ_PORT}
- RABBITMQ_QUEUE=${RABBITMQ_QUEUE}
- RABBITMQ_UPLOAD_QUEUE=${RABBITMQ_UPLOAD_QUEUE}
# API keys for the LLMs
- GEMINI_API_KEY=${GEMINI_API_KEY}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OPENROUTER_MODEL=${OPENROUTER_MODEL}
# Optional whisper settings
- WHISPER_MODEL=${WHISPER_MODEL}
- WHISPER_DEVICE=${WHISPER_DEVICE}
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE}
volumes:
# Mount host directories into the container so that videos can be
# provided and outputs collected. These paths can be customised when
# deploying the stack. The defaults assume /root/videos and
# /root/outputs on the host.
- "/root/videos:/app/videos"
- "/root/outputs:/app/outputs"
command: "python -u main.py"
networks:
- dokploy-network
networks:
dokploy-network:
external: true

45
dockerfile Normal file
View File

@@ -0,0 +1,45 @@
FROM python:3.11-slim
# Create and set the working directory
WORKDIR /app
# Prevent some interactive prompts during package installation
ENV DEBIAN_FRONTEND=noninteractive
# Install ffmpeg and other system dependencies. The list largely mirrors
# the original project but omits PostgreSQL development headers which are
# unused here. We include libgl1 and libglib2.0-0 so that MoviePy
# (through its dependencies) can find OpenGL and GLib when using the
# Pillow and numpy backends.
RUN apt-get update && \
apt-get install -y --no-install-recommends \
ffmpeg \
libgl1 \
libglib2.0-0 \
build-essential \
xvfb \
xdg-utils \
wget \
unzip \
libmagick++-dev \
imagemagick \
fonts-liberation \
sox \
bc \
gsfonts && \
rm -rf /var/lib/apt/lists/*
# Copy dependency specification and install Python dependencies
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the application code
COPY . .
# Declare volumes for videos and outputs. These paths correspond to the
# mount points defined in the docker-compose file. Using VOLUME here
# documents the intended persistent storage locations.
VOLUME ["/app/videos", "/app/outputs"]
# The default command starts the consumer loop
CMD ["python", "-u", "main.py"]

234
llm.py Normal file
View File

@@ -0,0 +1,234 @@
"""High-level helpers for interacting with the Gemini and OpenRouter APIs.
This module encapsulates all of the logic needed to call the LLM endpoints
used throughout the application. It uses the OpenAI Python client under the
hood because both Gemini and OpenRouter expose OpenAI-compatible APIs.
Two functions are exposed:
* ``select_highlights`` takes an SRT-like string (the transcription of a
video) and returns a list of highlight objects with start and end
timestamps and their corresponding text. It uses the Gemini model to
identify which parts of the video are most likely to engage viewers on
social media.
* ``generate_titles`` takes a list of highlight objects and returns a list
of the same objects enriched with a ``topText`` field, which contains a
sensational title for the clip. It uses the OpenRouter API with a model
specified via the ``OPENROUTER_MODEL`` environment variable.
Both functions are resilient to malformed outputs from the models. They try
to extract the first JSON array found in the model responses; if that
fails, a descriptive exception is raised. These exceptions should be
handled by callers to post appropriate error messages back to the queue.
"""
from __future__ import annotations
import json
import os
import re
from typing import Any, Dict, List
import openai
class LLMError(Exception):
"""Raised when the LLM response cannot be parsed into the expected format."""
def _extract_json_array(text: str) -> Any:
"""Extract the first JSON array from a string.
LLMs sometimes return explanatory text before or after the JSON. This
helper uses a regular expression to find the first substring that
resembles a JSON array (i.e. starts with '[' and ends with ']'). It
returns the corresponding Python object if successful, otherwise
raises a ``LLMError``.
"""
# Remove Markdown code fences and other formatting noise
cleaned = text.replace("`", "").replace("json", "")
# Find the first [ ... ] block
match = re.search(r"\[.*\]", cleaned, re.DOTALL)
if not match:
raise LLMError("Não foi possível encontrar um JSON válido na resposta da IA.")
json_str = match.group(0)
try:
return json.loads(json_str)
except json.JSONDecodeError as exc:
raise LLMError(f"Erro ao decodificar JSON: {exc}")
def select_highlights(srt_text: str) -> List[Dict[str, Any]]:
"""Call the Gemini API to select highlight segments from a transcription.
The input ``srt_text`` should be a string containing the transcription
formatted like an SRT file, with lines of the form
``00:00:10,140 --> 00:01:00,990`` followed by the spoken text.
Returns a list of dictionaries, each with ``start``, ``end`` and
``text`` keys. On failure to parse the response, a ``LLMError`` is
raised.
"""
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY não definido no ambiente")
model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash")
# Initialise client for Gemini. The base_url points to the
# generativelanguage API; see the official docs for details.
client = openai.OpenAI(api_key=api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/")
# System prompt: instructs Gemini how to behave.
system_prompt = (
"Você é um assistente especializado em selecionar **HIGHLIGHTS** de vídeo "
"a partir da transcrição com timestamps.\n"
"Sua única função é **selecionar os trechos** conforme solicitado.\n"
"- **Não resuma, não interprete, não gere comentários ou textos complementares.**\n"
"- **Retorne a resposta exatamente no formato proposto pelo usuário**, sem adicionar ou remover nada além do pedido.\n"
"- Cada trecho selecionado deve ter **no mínimo 60 segundos e no máximo 120 segundos** de duração.\n"
"- Sempre responda **em português (PT-BR)**."
)
# Base prompt: describes how to select highlights and the format to return.
base_prompt = (
"Você assumirá o papel de um especialista em Marketing e Social Media, "
"sua tarefa é selecionar as melhores partes de uma transcrição que irei fornecer.\n\n"
"## Critérios de Seleção\n\n"
"- Escolha trechos baseando-se em:\n"
" - **Picos de emoção ou impacto**\n"
" - **Viradas de assunto**\n"
" - **Punchlines** (frases de efeito, momentos de virada)\n"
" - **Informações-chave**\n\n"
"## Regras Rápidas\n\n"
"- Sempre devolver pelo menos 3 trechos, não possui limite máximo\n"
"- Garanta que cada trecho fique com no MÍNIMO 60 segundos e no MÁXIMO 120 segundos.\n"
"- Nenhum outro texto além do JSON final.\n\n"
"## Restrições de Duração\n\n"
"- **Duração mínima do trecho escolhido:** 60 segundos\n"
"- **Duração máxima do trecho escolhido:** 90 a 120 segundos\n\n"
"## Tarefa\n\n"
"- Proponha o **máximo de trechos** com potencial, mas **sempre devolva no mínimo 3 trechos**.\n"
"- Extraia os trechos **apenas** da transcrição fornecida abaixo.\n\n"
"## IMPORTANTE\n"
"- Cada trecho deve ter no mínimo 60 segundos, e no máximo 120 segundos. Isso é indiscutível\n\n"
"## Entrada\n\n"
"- Transcrição:\n\n"
f"{srt_text}\n\n"
"## Saída\n\n"
"- Retorne **somente** a lista de trechos selecionados em formato JSON, conforme o exemplo abaixo.\n"
"- **Não escreva comentários ou qualquer texto extra.**\n"
"- No atributo \"text\", inclua o texto presente no trecho escolhido.\n\n"
"### Exemplo de Conversão\n\n"
"#### De SRT:\n"
"00:00:10,140 --> 00:01:00,990\n"
"Exemplo de escrita presente no trecho\n\n"
"#### Para JSON:\n"
"[\n"
" {\n"
" \"start\": \"00:00:10,140\",\n"
" \"end\": \"00:01:00,990\",\n"
" \"text\": \"Exemplo de escrita presente no trecho\"\n"
" }\n"
"]\n"
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": base_prompt},
]
try:
response = client.chat.completions.create(model=model, messages=messages)
except Exception as exc:
raise LLMError(f"Erro ao chamar a API Gemini: {exc}")
# Extract message content
content = response.choices[0].message.content if response.choices else None
if not content:
raise LLMError("A resposta da Gemini veio vazia.")
result = _extract_json_array(content)
if not isinstance(result, list):
raise LLMError("O JSON retornado pela Gemini não é uma lista.")
return result
def generate_titles(highlights: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Call the OpenRouter API to generate a title (topText) for each highlight.
The ``highlights`` argument should be a list of dictionaries as returned
by ``select_highlights``, each containing ``start``, ``end`` and ``text``.
This function adds a ``topText`` field to each dictionary using the
OpenRouter model specified via the ``OPENROUTER_MODEL`` environment
variable. If parsing fails, an ``LLMError`` is raised.
"""
api_key = os.environ.get("OPENROUTER_API_KEY")
if not api_key:
raise ValueError("OPENROUTER_API_KEY não definido no ambiente")
model = os.environ.get("OPENROUTER_MODEL")
if not model:
raise ValueError("OPENROUTER_MODEL não definido no ambiente")
# Create client for OpenRouter
client = openai.OpenAI(api_key=api_key, base_url="https://openrouter.ai/api/v1")
# Compose prompt: instruct to generate titles only
prompt_header = (
"Você é um especialista em Marketing Digital e Criação de Conteúdo Viral.\n\n"
"Sua tarefa é criar **títulos sensacionalistas** (*topText*) para cada trecho "
"de transcrição recebido em formato JSON.\n\n"
"## Instruções\n\n"
"- O texto deve ser **chamativo, impactante** e com alto potencial de viralização "
"em redes sociais, **mas sem sair do contexto do trecho**.\n"
"- Use expressões fortes e curiosas, mas **nunca palavras de baixo calão**.\n"
"- Cada *topText* deve ter **no máximo 2 linhas**.\n"
"- Utilize **exclusivamente** o conteúdo do trecho; não invente fatos.\n"
"- Não adicione comentários, explicações, ou qualquer texto extra na resposta.\n"
"- Responda **apenas** no seguinte formato (mantendo as chaves e colchetes):\n\n"
"[\n {\n \"start\": \"00:00:10,140\",\n \"end\": \"00:01:00,990\",\n \"topText\": \"Título impactante\"\n }\n]\n\n"
"## Observações:\n\n"
"- Nunca fuja do contexto do trecho.\n"
"- Não invente informações.\n"
"- Não utilize palavrões.\n"
"- Não escreva nada além do JSON de saída.\n\n"
"Aqui estão os trechos em JSON:\n"
)
# Compose input JSON for the model
json_input = json.dumps(highlights, ensure_ascii=False)
full_message = prompt_header + json_input
messages = [
{
"role": "system",
"content": "Você é um assistente útil e objetivo."
},
{
"role": "user",
"content": full_message
},
]
try:
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
)
except Exception as exc:
raise LLMError(f"Erro ao chamar a API OpenRouter: {exc}")
content = response.choices[0].message.content if response.choices else None
if not content:
raise LLMError("A resposta da OpenRouter veio vazia.")
result = _extract_json_array(content)
if not isinstance(result, list):
raise LLMError("O JSON retornado pela OpenRouter não é uma lista.")
# Merge topText back into highlights
# We assume the result list has the same order and length as input highlights
enriched: List[Dict[str, Any]] = []
input_map = {(item["start"], item["end"]): item for item in highlights}
for item in result:
key = (item.get("start"), item.get("end"))
original = input_map.get(key)
if original is None:
# If the model returns unexpected entries, skip them
continue
enriched_item = original.copy()
# Only topText is expected
enriched_item["topText"] = item.get("topText", "").strip()
enriched.append(enriched_item)
return enriched

266
main.py Normal file
View File

@@ -0,0 +1,266 @@
"""Entry point for the video processing pipeline.
This script listens to a RabbitMQ queue for new video processing tasks. When
a message arrives, it performs the following steps:
1. Creates a working directory for the video based off of its filename.
2. Extracts the audio track with FFMPEG and runs Faster-Whisper to produce
a transcription with word-level timestamps.
3. Uses the Gemini model to determine which parts of the video have the
highest potential for engagement. These highlight segments are
represented as a list of objects containing start/end timestamps and
text.
4. Uses the OpenRouter model to generate a sensational title for each
highlight. Only the ``topText`` field is kept; the description is
intentionally omitted since the caption will be burned into the video.
5. Cuts the original video into individual clips corresponding to each
highlight and renders them vertically with a title above and a dynamic
caption below.
6. Publishes a message to the upload queue with information about the
generated clips. On success, this message contains the list of output
files. On failure, ``hasError`` will be set to ``True`` and the
``error`` field will describe what went wrong.
7. Cleans up temporary files (audio, transcript, working directory) and
deletes the original source video from the ``videos`` directory to
conserve disk space.
The queue names and RabbitMQ credentials are configured via environment
variables. See the accompanying ``docker-compose.yml`` for defaults.
"""
from __future__ import annotations
import json
import os
import shutil
import time
import traceback
from typing import Any, Dict, List
import pika
from .utils import sanitize_filename, seconds_to_timestamp, timestamp_to_seconds
from .transcribe import transcribe
from .llm import LLMError, select_highlights, generate_titles
from .render import render_clip
# Environment variables with sensible defaults
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "rabbitmq")
RABBITMQ_PORT = int(os.environ.get("RABBITMQ_PORT", 5672))
RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "admin")
RABBITMQ_PASS = os.environ.get("RABBITMQ_PASS")
RABBITMQ_QUEUE = os.environ.get("RABBITMQ_QUEUE", "to-render")
RABBITMQ_UPLOAD_QUEUE = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
if not RABBITMQ_PASS:
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
def get_next_message() -> Any:
"""Retrieve a single message from the RABBITMQ_QUEUE.
Returns ``None`` if no messages are available. This helper opens a new
connection for each call to avoid keeping stale connections alive.
"""
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300,
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
method_frame, _, body = channel.basic_get(RABBITMQ_QUEUE)
if method_frame:
channel.basic_ack(method_frame.delivery_tag)
connection.close()
return body
connection.close()
return None
def publish_to_queue(payload: Dict[str, Any]) -> None:
"""Publish a JSON-serialisable payload to the RABBITMQ_UPLOAD_QUEUE."""
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300,
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True)
channel.basic_publish(
exchange="",
routing_key=RABBITMQ_UPLOAD_QUEUE,
body=json.dumps(payload),
properties=pika.BasicProperties(delivery_mode=2),
)
connection.close()
def build_srt(segments: List[Dict[str, Any]]) -> str:
"""Build an SRT-like string from a list of segments.
Each segment should have ``start``, ``end`` and ``text`` fields. The
timestamps are converted to the ``HH:MM:SS,mmm`` format expected by
the Gemini prompt. Segments are separated by a blank line.
"""
lines = []
for seg in segments:
start_ts = seconds_to_timestamp(seg["start"])
end_ts = seconds_to_timestamp(seg["end"])
lines.append(f"{start_ts} --> {end_ts}\n{seg['text']}")
return "\n\n".join(lines)
def process_message(data: Dict[str, Any]) -> Dict[str, Any]:
"""Process a single video task described in ``data``.
Returns the payload to be sent to the upload queue. Raises an
exception on failure; the caller is responsible for catching it and
posting an error payload.
"""
filename = data.get("filename")
if not filename:
raise ValueError("Campo 'filename' ausente na mensagem")
url = data.get("url")
video_id = data.get("videoId")
# Determine source video path; n8n stores videos in the 'videos' directory
video_path = os.path.join("videos", filename)
if not os.path.exists(video_path):
raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {video_path}")
# Sanitize the filename to use as directory name
base_no_ext = os.path.splitext(filename)[0]
sanitized = sanitize_filename(base_no_ext)
work_dir = os.path.join("app", "videos", sanitized)
# Transcribe video
segments, words = transcribe(video_path, work_dir)
# Build SRT string
srt_str = build_srt(segments)
# Call Gemini to select highlights
highlights = select_highlights(srt_str)
# Convert start/end times to floats and keep original strings for openrouter
for item in highlights:
item["start"] = item["start"].strip()
item["end"] = item["end"].strip()
# Generate titles
titles = generate_titles(highlights)
# Render clips
output_dir = os.path.join("outputs", sanitized)
processed_files: List[str] = []
for idx, item in enumerate(titles, start=1):
start_sec = timestamp_to_seconds(item.get("start"))
end_sec = timestamp_to_seconds(item.get("end"))
# Extract relative words for caption
relative_words = []
for w in words:
# Word must overlap clip interval
if w["end"] <= start_sec or w["start"] >= end_sec:
continue
rel_start = max(0.0, w["start"] - start_sec)
rel_end = max(0.0, w["end"] - start_sec)
relative_words.append({
"start": rel_start,
"end": rel_end,
"word": w["word"],
})
# If no words found (e.g. silence), create a dummy word to avoid errors
if not relative_words:
relative_words.append({"start": 0.0, "end": end_sec - start_sec, "word": ""})
out_path = render_clip(
video_path=video_path,
start=start_sec,
end=end_sec,
top_text=item.get("topText", ""),
words=relative_words,
out_dir=output_dir,
base_name=sanitized,
idx=idx,
)
processed_files.append(out_path)
# Compose payload
payload = {
"videosProcessedQuantity": len(processed_files),
"filename": filename,
"processedFiles": processed_files,
"url": url,
"videoId": video_id,
"hasError": False,
"error": None,
}
# Clean up working directory and original video
shutil.rmtree(work_dir, ignore_errors=True)
try:
os.remove(video_path)
except FileNotFoundError:
pass
return payload
def main():
print(" [*] Esperando mensagens. Para sair: CTRL+C")
while True:
body = get_next_message()
if body is None:
time.sleep(5)
continue
try:
data = json.loads(body)
except Exception:
print("⚠️ Mensagem inválida recebida (não é JSON)")
continue
try:
result = process_message(data)
except Exception as exc:
# Print stack trace for debugging
traceback.print_exc()
# Attempt to clean up any directories based on filename
filename = data.get("filename")
sanitized = sanitize_filename(os.path.splitext(filename or "")[0]) if filename else ""
work_dir = os.path.join("app", "videos", sanitized) if sanitized else None
output_dir = os.path.join("outputs", sanitized) if sanitized else None
# Remove working and output directories
if work_dir:
shutil.rmtree(work_dir, ignore_errors=True)
if output_dir:
shutil.rmtree(output_dir, ignore_errors=True)
# Remove original video if present
video_path = os.path.join("videos", filename) if filename else None
if video_path and os.path.exists(video_path):
try:
os.remove(video_path)
except Exception:
pass
# Build error payload
error_payload = {
"videosProcessedQuantity": 0,
"filename": filename,
"processedFiles": [],
"url": data.get("url"),
"videoId": data.get("videoId"),
"hasError": True,
"error": str(exc),
}
try:
publish_to_queue(error_payload)
print(f"Mensagem de erro publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
except Exception as publish_err:
print(f"Erro ao publicar mensagem de erro: {publish_err}")
continue
# On success publish payload
try:
publish_to_queue(result)
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
except Exception as publish_err:
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
# Loop continues
if __name__ == "__main__":
main()

205
render.py Normal file
View File

@@ -0,0 +1,205 @@
"""Rendering logic for producing vertical clips with dynamic captions.
This module defines a single function ``render_clip`` which takes a video
segment and produces a vertical clip suitable for social media. Each clip
contains three regions:
* A top region (480px high) showing a title generated by an LLM.
* A middle region (960px high) containing the original video, scaled to
fit horizontally while preserving aspect ratio and centred vertically.
* A bottom region (480px high) showing a dynamic caption. The caption
displays a sliding window of three to five words from the transcript,
colouring the currently spoken word differently to draw the viewer's
attention.
The function uses the MoviePy library to compose the various elements and
writes the resulting video to disk. It returns the path to the created
file.
"""
from __future__ import annotations
import os
from typing import Dict, List
import numpy as np
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.VideoClip import ColorClip, VideoClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.VideoClip import TextClip
from PIL import Image, ImageDraw, ImageFont
from .utils import wrap_text
def render_clip(
video_path: str,
start: float,
end: float,
top_text: str,
words: List[Dict[str, float]],
out_dir: str,
base_name: str,
idx: int,
# Use a widely available system font by default. DejaVuSans is installed
# in most Debian-based containers. The caller can override this path.
font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
final_width: int = 1080,
final_height: int = 1920,
top_h: int = 480,
middle_h: int = 960,
bottom_h: int = 480,
video_codec: str = "libx264",
bitrate: str = "3000k",
) -> str:
"""Render a single clip with title and dynamic caption.
Parameters
----------
video_path: str
Path to the source video file.
start: float
Start time of the clip in seconds.
end: float
End time of the clip in seconds.
top_text: str
The title to display in the top region.
words: List[Dict[str, float]]
List of word-level timestamps for this clip. Each dict must have
``start``, ``end`` and ``word`` keys. The start and end values
should be relative to the beginning of this clip (i.e. start at 0).
out_dir: str
Directory where the output file should be saved. The function
creates this directory if it doesn't exist.
base_name: str
Base name of the original video (sanitized). Used to build the
output filename.
idx: int
Index of the clip. Output will be named ``clip_{idx}.mp4``.
font_path: str
Path to the TrueType font to use for both title and caption.
final_width: int
Width of the final video in pixels.
final_height: int
Height of the final video in pixels.
top_h: int
Height of the title area in pixels.
middle_h: int
Height of the video area in pixels.
bottom_h: int
Height of the caption area in pixels.
video_codec: str
FFmpeg codec to use when writing the video.
bitrate: str
Bitrate for the output video.
Returns
-------
str
The path to the rendered video file.
"""
os.makedirs(out_dir, exist_ok=True)
# Extract the segment from the source video
with VideoFileClip(video_path) as clip:
segment = clip.subclip(start, end)
dur = segment.duration
# Background
bg = ColorClip(size=(final_width, final_height), color=(0, 0, 0), duration=dur)
# Resize video to fit width
video_resized = segment.resize(width=final_width)
# Compute vertical position to centre in the middle region
y = top_h + (middle_h - video_resized.h) // 2
video_resized = video_resized.set_position((0, y))
# Build title clip
# Wrap the title to avoid overflow
wrapped_lines = wrap_text(top_text, max_chars=40)
wrapped_title = "\n".join(wrapped_lines)
title_clip = TextClip(
wrapped_title,
font=font_path,
fontsize=70,
color="white",
method="caption",
size=(final_width, top_h),
align="center",
).set_duration(dur).set_position((0, 0))
# Prepare font for caption rendering
pil_font = ImageFont.truetype(font_path, size=60)
default_color = (255, 255, 255) # white
highlight_color = (255, 215, 0) # gold-like yellow
# Precompute widths of a space and bounding box height for vertical centering
space_width = pil_font.getbbox(" ")[2] - pil_font.getbbox(" ")[0]
bbox = pil_font.getbbox("A")
text_height = bbox[3] - bbox[1]
def make_caption_frame(t: float):
"""Generate an image for the caption at time t."""
# Determine current word index
idx_cur = 0
for i, w in enumerate(words):
if w["start"] <= t < w["end"]:
idx_cur = i
break
if t >= w["end"]:
idx_cur = i
# Define window of words to display: show up to 5 words
start_idx = max(0, idx_cur - 2)
end_idx = min(len(words), idx_cur + 3)
window = words[start_idx:end_idx]
# Compute widths for each word
word_sizes = []
for w in window:
bbox = pil_font.getbbox(w["word"])
word_width = bbox[2] - bbox[0]
word_sizes.append(word_width)
total_width = sum(word_sizes) + space_width * (len(window) - 1 if window else 0)
# Create blank image for caption area
img = Image.new("RGB", (final_width, bottom_h), color=(0, 0, 0))
draw = ImageDraw.Draw(img)
x = int((final_width - total_width) / 2)
y_pos = int((bottom_h - text_height) / 2)
for j, w in enumerate(window):
color = highlight_color if (start_idx + j) == idx_cur else default_color
draw.text((x, y_pos), w["word"], font=pil_font, fill=color)
x += word_sizes[j] + space_width
return np.array(img)
caption_clip = VideoClip(make_frame=make_caption_frame, duration=dur)
caption_clip = caption_clip.set_position((0, final_height - bottom_h))
# Compose final clip
final = CompositeVideoClip([
bg,
video_resized,
title_clip,
caption_clip,
], size=(final_width, final_height))
# Use the original audio from the video segment
final_audio = segment.audio
if final_audio is not None:
final = final.set_audio(final_audio)
# Define output path
out_path = os.path.join(out_dir, f"clip_{idx}.mp4")
# Write to disk
final.write_videofile(
out_path,
codec=video_codec,
fps=30,
bitrate=bitrate,
audio_codec="aac",
preset="ultrafast",
ffmpeg_params=[
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
"-level", "4.1",
],
threads=4,
)
# Close clips to free resources
final.close()
segment.close()
return out_path

7
requirements.txt Normal file
View File

@@ -0,0 +1,7 @@
pika==1.3.2
moviepy==2.0.0
faster-whisper==1.2.0
openai==1.16.0
numpy==1.26.4
Pillow==10.1.0
unidecode==1.3.6

111
transcribe.py Normal file
View File

@@ -0,0 +1,111 @@
"""Utilities for extracting audio from video and generating transcriptions.
This module handles two tasks:
1. Use FFMPEG to extract the audio track from a video file into a WAV file
suitable for consumption by the Whisper model. The audio is resampled to
16 kHz mono PCM as required by Whisper.
2. Use the Faster-Whisper implementation to generate a transcription with
word-level timestamps. The transcription is returned both as a list of
segments (for building an SRT) and as a flattened list of words (for
building dynamic subtitles).
If FFMPEG is not installed or fails, a ``RuntimeError`` is raised. The caller
is responsible for cleaning up the temporary files created in the working
directory.
"""
from __future__ import annotations
import os
import subprocess
from typing import Dict, List, Tuple
from faster_whisper import WhisperModel
def extract_audio_ffmpeg(video_path: str, audio_path: str) -> None:
"""Use FFMPEG to extract audio from ``video_path`` into ``audio_path``.
The output will be a 16 kHz mono WAV file in PCM S16LE format. Any
existing file at ``audio_path`` will be overwritten. If ffmpeg returns
a non-zero exit code, a ``RuntimeError`` is raised with the stderr.
"""
cmd = [
"ffmpeg",
"-y", # overwrite output
"-i",
video_path,
"-vn", # disable video recording
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
audio_path,
]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode != 0:
raise RuntimeError(f"FFMPEG error: {proc.stderr.decode(errors='ignore')}")
def load_whisper_model() -> WhisperModel:
"""Instantiate and cache a Faster-Whisper model.
The model name and device can be configured via the ``WHISPER_MODEL`` and
``WHISPER_DEVICE`` environment variables. The default model is
``large-v3`` for best accuracy. The device can be ``cuda`` or ``cpu``.
A module-level cache is used to prevent loading the model multiple times.
"""
if hasattr(load_whisper_model, "_cache"):
return load_whisper_model._cache # type: ignore[attr-defined]
model_name = os.environ.get("WHISPER_MODEL", "large-v3")
device = os.environ.get("WHISPER_DEVICE", "cpu")
# Compute type can be set via WHISPER_COMPUTE_TYPE; default to float16 on GPU
compute_type = os.environ.get("WHISPER_COMPUTE_TYPE")
# If not explicitly set, choose sensible defaults
if compute_type is None:
compute_type = "float16" if device == "cuda" else "int8"
model = WhisperModel(model_name, device=device, compute_type=compute_type)
load_whisper_model._cache = model # type: ignore[attr-defined]
return model
def transcribe(video_path: str, work_dir: str) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]:
"""Transcribe a video file using Faster-Whisper.
``video_path`` is the path to the video to transcribe. ``work_dir`` is a
directory where temporary files will be stored (audio file and
transcription). The function returns a tuple ``(segments, words)`` where
``segments`` is a list of dictionaries with ``start``, ``end`` and
``text`` fields, and ``words`` is a flat list of dictionaries with
``start``, ``end`` and ``word`` fields covering the entire video.
The timestamps are expressed in seconds as floats.
"""
os.makedirs(work_dir, exist_ok=True)
audio_path = os.path.join(work_dir, "audio.wav")
# Extract audio
extract_audio_ffmpeg(video_path, audio_path)
# Load Whisper model
model = load_whisper_model()
# Run transcription with word-level timestamps
segments, info = model.transcribe(audio_path, word_timestamps=True)
seg_list: List[Dict[str, float]] = []
words_list: List[Dict[str, float]] = []
for seg in segments:
seg_list.append({
"start": float(seg.start),
"end": float(seg.end),
"text": seg.text.strip(),
})
# Each segment may contain words attribute
for w in getattr(seg, "words", []) or []:
words_list.append({
"start": float(w.start),
"end": float(w.end),
"word": w.word,
})
# Sort words by start time to be safe
words_list.sort(key=lambda d: d["start"])
return seg_list, words_list

93
utils.py Normal file
View File

@@ -0,0 +1,93 @@
import re
import unicodedata
from typing import List, Tuple
def sanitize_filename(name: str) -> str:
"""Return a sanitized version of a filename.
This helper removes accents, converts to lowercase, replaces spaces
with underscores and removes any non alphanumeric characters except
underscores and dots. This makes the directory names safe to use on
most filesystems and matches the behaviour described in the spec.
"""
if not name:
return ""
# Decompose Unicode characters and strip accents
nfkd_form = unicodedata.normalize("NFKD", name)
no_accents = "".join(c for c in nfkd_form if not unicodedata.combining(c))
# Replace spaces with underscores
no_spaces = no_accents.replace(" ", "_")
# Lowercase and remove any character that is not a letter, digit, dot or underscore
sanitized = re.sub(r"[^A-Za-z0-9_.]+", "", no_spaces)
return sanitized
def timestamp_to_seconds(ts: str) -> float:
"""Convert a timestamp in HH:MM:SS,mmm format to seconds.
The Gemini and OpenRouter prompts use timestamps formatted with a comma
as the decimal separator. This helper splits the string into hours,
minutes and seconds and returns a float expressed in seconds.
"""
if ts is None:
return 0.0
ts = ts.strip()
if not ts:
return 0.0
# Replace comma by dot for decimal seconds
ts = ts.replace(",", ".")
parts = ts.split(":")
parts = [float(p) for p in parts]
if len(parts) == 3:
h, m, s = parts
return h * 3600 + m * 60 + s
elif len(parts) == 2:
m, s = parts
return m * 60 + s
else:
# only seconds
return parts[0]
def seconds_to_timestamp(seconds: float) -> str:
"""Convert a time in seconds to HH:MM:SS,mmm format expected by SRT."""
if seconds < 0:
seconds = 0
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
# Format with comma as decimal separator and three decimal places
return f"{h:02d}:{m:02d}:{s:06.3f}".replace(".", ",")
def wrap_text(text: str, max_chars: int = 80) -> List[str]:
"""Simple word-wrap for a string.
Splits ``text`` into a list of lines, each at most ``max_chars``
characters long. This does not attempt to hyphenate words a word
longer than ``max_chars`` will occupy its own line. The return value
is a list of lines without trailing whitespace.
"""
if not text:
return []
words = text.split()
lines: List[str] = []
current: List[str] = []
current_len = 0
for word in words:
# If adding this word would exceed the max, flush current line
if current and current_len + 1 + len(word) > max_chars:
lines.append(" ".join(current))
current = [word]
current_len = len(word)
else:
# Add to current line
if current:
current_len += 1 + len(word)
else:
current_len = len(word)
current.append(word)
if current:
lines.append(" ".join(current))
return lines