commit 0c0a9c3b5c5e6e50ee7308bbff077b85621e3a02 Author: LeoMortari Date: Fri Oct 17 09:27:50 2025 -0300 Inicia novos recursos Dentre eles estão recurso de adicao do faster-whisper, geração de legenda e integracao com Gemini e Open Router diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..38734ca Binary files /dev/null and b/.DS_Store differ diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..b437409 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +"""Top-level package for the video processing pipeline.""" \ No newline at end of file diff --git a/__pycache__/llm.cpython-311.pyc b/__pycache__/llm.cpython-311.pyc new file mode 100644 index 0000000..36d44a6 Binary files /dev/null and b/__pycache__/llm.cpython-311.pyc differ diff --git a/__pycache__/main.cpython-311.pyc b/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000..02fec65 Binary files /dev/null and b/__pycache__/main.cpython-311.pyc differ diff --git a/__pycache__/render.cpython-311.pyc b/__pycache__/render.cpython-311.pyc new file mode 100644 index 0000000..634cd2e Binary files /dev/null and b/__pycache__/render.cpython-311.pyc differ diff --git a/__pycache__/transcribe.cpython-311.pyc b/__pycache__/transcribe.cpython-311.pyc new file mode 100644 index 0000000..cac6337 Binary files /dev/null and b/__pycache__/transcribe.cpython-311.pyc differ diff --git a/__pycache__/utils.cpython-311.pyc b/__pycache__/utils.cpython-311.pyc new file mode 100644 index 0000000..3c4f202 Binary files /dev/null and b/__pycache__/utils.cpython-311.pyc differ diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..5d575cc --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,35 @@ +services: + video-render-new: + restart: unless-stopped + build: . + container_name: video-render-new + environment: + # RabbitMQ credentials + - RABBITMQ_PASS=${RABBITMQ_PASS} + - RABBITMQ_HOST=${RABBITMQ_HOST} + - RABBITMQ_USER=${RABBITMQ_USER} + - RABBITMQ_PORT=${RABBITMQ_PORT} + - RABBITMQ_QUEUE=${RABBITMQ_QUEUE} + - RABBITMQ_UPLOAD_QUEUE=${RABBITMQ_UPLOAD_QUEUE} + # API keys for the LLMs + - GEMINI_API_KEY=${GEMINI_API_KEY} + - OPENROUTER_API_KEY=${OPENROUTER_API_KEY} + - OPENROUTER_MODEL=${OPENROUTER_MODEL} + # Optional whisper settings + - WHISPER_MODEL=${WHISPER_MODEL} + - WHISPER_DEVICE=${WHISPER_DEVICE} + - WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE} + volumes: + # Mount host directories into the container so that videos can be + # provided and outputs collected. These paths can be customised when + # deploying the stack. The defaults assume /root/videos and + # /root/outputs on the host. + - "/root/videos:/app/videos" + - "/root/outputs:/app/outputs" + command: "python -u main.py" + networks: + - dokploy-network + +networks: + dokploy-network: + external: true \ No newline at end of file diff --git a/dockerfile b/dockerfile new file mode 100644 index 0000000..dc30f99 --- /dev/null +++ b/dockerfile @@ -0,0 +1,45 @@ +FROM python:3.11-slim + +# Create and set the working directory +WORKDIR /app + +# Prevent some interactive prompts during package installation +ENV DEBIAN_FRONTEND=noninteractive + +# Install ffmpeg and other system dependencies. The list largely mirrors +# the original project but omits PostgreSQL development headers which are +# unused here. We include libgl1 and libglib2.0-0 so that MoviePy +# (through its dependencies) can find OpenGL and GLib when using the +# Pillow and numpy backends. +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ffmpeg \ + libgl1 \ + libglib2.0-0 \ + build-essential \ + xvfb \ + xdg-utils \ + wget \ + unzip \ + libmagick++-dev \ + imagemagick \ + fonts-liberation \ + sox \ + bc \ + gsfonts && \ + rm -rf /var/lib/apt/lists/* + +# Copy dependency specification and install Python dependencies +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the rest of the application code +COPY . . + +# Declare volumes for videos and outputs. These paths correspond to the +# mount points defined in the docker-compose file. Using VOLUME here +# documents the intended persistent storage locations. +VOLUME ["/app/videos", "/app/outputs"] + +# The default command starts the consumer loop +CMD ["python", "-u", "main.py"] \ No newline at end of file diff --git a/llm.py b/llm.py new file mode 100644 index 0000000..f0a5a2a --- /dev/null +++ b/llm.py @@ -0,0 +1,234 @@ +"""High-level helpers for interacting with the Gemini and OpenRouter APIs. + +This module encapsulates all of the logic needed to call the LLM endpoints +used throughout the application. It uses the OpenAI Python client under the +hood because both Gemini and OpenRouter expose OpenAI-compatible APIs. + +Two functions are exposed: + +* ``select_highlights`` takes an SRT-like string (the transcription of a + video) and returns a list of highlight objects with start and end + timestamps and their corresponding text. It uses the Gemini model to + identify which parts of the video are most likely to engage viewers on + social media. +* ``generate_titles`` takes a list of highlight objects and returns a list + of the same objects enriched with a ``topText`` field, which contains a + sensational title for the clip. It uses the OpenRouter API with a model + specified via the ``OPENROUTER_MODEL`` environment variable. + +Both functions are resilient to malformed outputs from the models. They try +to extract the first JSON array found in the model responses; if that +fails, a descriptive exception is raised. These exceptions should be +handled by callers to post appropriate error messages back to the queue. +""" + +from __future__ import annotations + +import json +import os +import re +from typing import Any, Dict, List + +import openai + + +class LLMError(Exception): + """Raised when the LLM response cannot be parsed into the expected format.""" + + +def _extract_json_array(text: str) -> Any: + """Extract the first JSON array from a string. + + LLMs sometimes return explanatory text before or after the JSON. This + helper uses a regular expression to find the first substring that + resembles a JSON array (i.e. starts with '[' and ends with ']'). It + returns the corresponding Python object if successful, otherwise + raises a ``LLMError``. + """ + # Remove Markdown code fences and other formatting noise + cleaned = text.replace("`", "").replace("json", "") + # Find the first [ ... ] block + match = re.search(r"\[.*\]", cleaned, re.DOTALL) + if not match: + raise LLMError("Não foi possível encontrar um JSON válido na resposta da IA.") + json_str = match.group(0) + try: + return json.loads(json_str) + except json.JSONDecodeError as exc: + raise LLMError(f"Erro ao decodificar JSON: {exc}") + + +def select_highlights(srt_text: str) -> List[Dict[str, Any]]: + """Call the Gemini API to select highlight segments from a transcription. + + The input ``srt_text`` should be a string containing the transcription + formatted like an SRT file, with lines of the form + ``00:00:10,140 --> 00:01:00,990`` followed by the spoken text. + + Returns a list of dictionaries, each with ``start``, ``end`` and + ``text`` keys. On failure to parse the response, a ``LLMError`` is + raised. + """ + api_key = os.environ.get("GEMINI_API_KEY") + if not api_key: + raise ValueError("GEMINI_API_KEY não definido no ambiente") + + model = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") + + # Initialise client for Gemini. The base_url points to the + # generativelanguage API; see the official docs for details. + client = openai.OpenAI(api_key=api_key, base_url="https://generativelanguage.googleapis.com/v1beta/openai/") + + # System prompt: instructs Gemini how to behave. + system_prompt = ( + "Você é um assistente especializado em selecionar **HIGHLIGHTS** de vídeo " + "a partir da transcrição com timestamps.\n" + "Sua única função é **selecionar os trechos** conforme solicitado.\n" + "- **Não resuma, não interprete, não gere comentários ou textos complementares.**\n" + "- **Retorne a resposta exatamente no formato proposto pelo usuário**, sem adicionar ou remover nada além do pedido.\n" + "- Cada trecho selecionado deve ter **no mínimo 60 segundos e no máximo 120 segundos** de duração.\n" + "- Sempre responda **em português (PT-BR)**." + ) + + # Base prompt: describes how to select highlights and the format to return. + base_prompt = ( + "Você assumirá o papel de um especialista em Marketing e Social Media, " + "sua tarefa é selecionar as melhores partes de uma transcrição que irei fornecer.\n\n" + "## Critérios de Seleção\n\n" + "- Escolha trechos baseando-se em:\n" + " - **Picos de emoção ou impacto**\n" + " - **Viradas de assunto**\n" + " - **Punchlines** (frases de efeito, momentos de virada)\n" + " - **Informações-chave**\n\n" + "## Regras Rápidas\n\n" + "- Sempre devolver pelo menos 3 trechos, não possui limite máximo\n" + "- Garanta que cada trecho fique com no MÍNIMO 60 segundos e no MÁXIMO 120 segundos.\n" + "- Nenhum outro texto além do JSON final.\n\n" + "## Restrições de Duração\n\n" + "- **Duração mínima do trecho escolhido:** 60 segundos\n" + "- **Duração máxima do trecho escolhido:** 90 a 120 segundos\n\n" + "## Tarefa\n\n" + "- Proponha o **máximo de trechos** com potencial, mas **sempre devolva no mínimo 3 trechos**.\n" + "- Extraia os trechos **apenas** da transcrição fornecida abaixo.\n\n" + "## IMPORTANTE\n" + "- Cada trecho deve ter no mínimo 60 segundos, e no máximo 120 segundos. Isso é indiscutível\n\n" + "## Entrada\n\n" + "- Transcrição:\n\n" + f"{srt_text}\n\n" + "## Saída\n\n" + "- Retorne **somente** a lista de trechos selecionados em formato JSON, conforme o exemplo abaixo.\n" + "- **Não escreva comentários ou qualquer texto extra.**\n" + "- No atributo \"text\", inclua o texto presente no trecho escolhido.\n\n" + "### Exemplo de Conversão\n\n" + "#### De SRT:\n" + "00:00:10,140 --> 00:01:00,990\n" + "Exemplo de escrita presente no trecho\n\n" + "#### Para JSON:\n" + "[\n" + " {\n" + " \"start\": \"00:00:10,140\",\n" + " \"end\": \"00:01:00,990\",\n" + " \"text\": \"Exemplo de escrita presente no trecho\"\n" + " }\n" + "]\n" + ) + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": base_prompt}, + ] + try: + response = client.chat.completions.create(model=model, messages=messages) + except Exception as exc: + raise LLMError(f"Erro ao chamar a API Gemini: {exc}") + # Extract message content + content = response.choices[0].message.content if response.choices else None + if not content: + raise LLMError("A resposta da Gemini veio vazia.") + result = _extract_json_array(content) + if not isinstance(result, list): + raise LLMError("O JSON retornado pela Gemini não é uma lista.") + return result + + +def generate_titles(highlights: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Call the OpenRouter API to generate a title (topText) for each highlight. + + The ``highlights`` argument should be a list of dictionaries as returned + by ``select_highlights``, each containing ``start``, ``end`` and ``text``. + This function adds a ``topText`` field to each dictionary using the + OpenRouter model specified via the ``OPENROUTER_MODEL`` environment + variable. If parsing fails, an ``LLMError`` is raised. + """ + api_key = os.environ.get("OPENROUTER_API_KEY") + if not api_key: + raise ValueError("OPENROUTER_API_KEY não definido no ambiente") + model = os.environ.get("OPENROUTER_MODEL") + if not model: + raise ValueError("OPENROUTER_MODEL não definido no ambiente") + # Create client for OpenRouter + client = openai.OpenAI(api_key=api_key, base_url="https://openrouter.ai/api/v1") + + # Compose prompt: instruct to generate titles only + prompt_header = ( + "Você é um especialista em Marketing Digital e Criação de Conteúdo Viral.\n\n" + "Sua tarefa é criar **títulos sensacionalistas** (*topText*) para cada trecho " + "de transcrição recebido em formato JSON.\n\n" + "## Instruções\n\n" + "- O texto deve ser **chamativo, impactante** e com alto potencial de viralização " + "em redes sociais, **mas sem sair do contexto do trecho**.\n" + "- Use expressões fortes e curiosas, mas **nunca palavras de baixo calão**.\n" + "- Cada *topText* deve ter **no máximo 2 linhas**.\n" + "- Utilize **exclusivamente** o conteúdo do trecho; não invente fatos.\n" + "- Não adicione comentários, explicações, ou qualquer texto extra na resposta.\n" + "- Responda **apenas** no seguinte formato (mantendo as chaves e colchetes):\n\n" + "[\n {\n \"start\": \"00:00:10,140\",\n \"end\": \"00:01:00,990\",\n \"topText\": \"Título impactante\"\n }\n]\n\n" + "## Observações:\n\n" + "- Nunca fuja do contexto do trecho.\n" + "- Não invente informações.\n" + "- Não utilize palavrões.\n" + "- Não escreva nada além do JSON de saída.\n\n" + "Aqui estão os trechos em JSON:\n" + ) + # Compose input JSON for the model + json_input = json.dumps(highlights, ensure_ascii=False) + full_message = prompt_header + json_input + messages = [ + { + "role": "system", + "content": "Você é um assistente útil e objetivo." + }, + { + "role": "user", + "content": full_message + }, + ] + try: + response = client.chat.completions.create( + model=model, + messages=messages, + temperature=0.7, + ) + except Exception as exc: + raise LLMError(f"Erro ao chamar a API OpenRouter: {exc}") + content = response.choices[0].message.content if response.choices else None + if not content: + raise LLMError("A resposta da OpenRouter veio vazia.") + result = _extract_json_array(content) + if not isinstance(result, list): + raise LLMError("O JSON retornado pela OpenRouter não é uma lista.") + # Merge topText back into highlights + # We assume the result list has the same order and length as input highlights + enriched: List[Dict[str, Any]] = [] + input_map = {(item["start"], item["end"]): item for item in highlights} + for item in result: + key = (item.get("start"), item.get("end")) + original = input_map.get(key) + if original is None: + # If the model returns unexpected entries, skip them + continue + enriched_item = original.copy() + # Only topText is expected + enriched_item["topText"] = item.get("topText", "").strip() + enriched.append(enriched_item) + return enriched \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..32fd1d1 --- /dev/null +++ b/main.py @@ -0,0 +1,266 @@ +"""Entry point for the video processing pipeline. + +This script listens to a RabbitMQ queue for new video processing tasks. When +a message arrives, it performs the following steps: + +1. Creates a working directory for the video based off of its filename. +2. Extracts the audio track with FFMPEG and runs Faster-Whisper to produce + a transcription with word-level timestamps. +3. Uses the Gemini model to determine which parts of the video have the + highest potential for engagement. These highlight segments are + represented as a list of objects containing start/end timestamps and + text. +4. Uses the OpenRouter model to generate a sensational title for each + highlight. Only the ``topText`` field is kept; the description is + intentionally omitted since the caption will be burned into the video. +5. Cuts the original video into individual clips corresponding to each + highlight and renders them vertically with a title above and a dynamic + caption below. +6. Publishes a message to the upload queue with information about the + generated clips. On success, this message contains the list of output + files. On failure, ``hasError`` will be set to ``True`` and the + ``error`` field will describe what went wrong. +7. Cleans up temporary files (audio, transcript, working directory) and + deletes the original source video from the ``videos`` directory to + conserve disk space. + +The queue names and RabbitMQ credentials are configured via environment +variables. See the accompanying ``docker-compose.yml`` for defaults. +""" + +from __future__ import annotations + +import json +import os +import shutil +import time +import traceback +from typing import Any, Dict, List + +import pika + +from .utils import sanitize_filename, seconds_to_timestamp, timestamp_to_seconds +from .transcribe import transcribe +from .llm import LLMError, select_highlights, generate_titles +from .render import render_clip + + +# Environment variables with sensible defaults +RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "rabbitmq") +RABBITMQ_PORT = int(os.environ.get("RABBITMQ_PORT", 5672)) +RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "admin") +RABBITMQ_PASS = os.environ.get("RABBITMQ_PASS") +RABBITMQ_QUEUE = os.environ.get("RABBITMQ_QUEUE", "to-render") +RABBITMQ_UPLOAD_QUEUE = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload") + +if not RABBITMQ_PASS: + raise RuntimeError("RABBITMQ_PASS não definido no ambiente") + + +def get_next_message() -> Any: + """Retrieve a single message from the RABBITMQ_QUEUE. + + Returns ``None`` if no messages are available. This helper opens a new + connection for each call to avoid keeping stale connections alive. + """ + credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + credentials=credentials, + heartbeat=60, + blocked_connection_timeout=300, + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + method_frame, _, body = channel.basic_get(RABBITMQ_QUEUE) + if method_frame: + channel.basic_ack(method_frame.delivery_tag) + connection.close() + return body + connection.close() + return None + + +def publish_to_queue(payload: Dict[str, Any]) -> None: + """Publish a JSON-serialisable payload to the RABBITMQ_UPLOAD_QUEUE.""" + credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + credentials=credentials, + heartbeat=60, + blocked_connection_timeout=300, + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True) + channel.basic_publish( + exchange="", + routing_key=RABBITMQ_UPLOAD_QUEUE, + body=json.dumps(payload), + properties=pika.BasicProperties(delivery_mode=2), + ) + connection.close() + + +def build_srt(segments: List[Dict[str, Any]]) -> str: + """Build an SRT-like string from a list of segments. + + Each segment should have ``start``, ``end`` and ``text`` fields. The + timestamps are converted to the ``HH:MM:SS,mmm`` format expected by + the Gemini prompt. Segments are separated by a blank line. + """ + lines = [] + for seg in segments: + start_ts = seconds_to_timestamp(seg["start"]) + end_ts = seconds_to_timestamp(seg["end"]) + lines.append(f"{start_ts} --> {end_ts}\n{seg['text']}") + return "\n\n".join(lines) + + +def process_message(data: Dict[str, Any]) -> Dict[str, Any]: + """Process a single video task described in ``data``. + + Returns the payload to be sent to the upload queue. Raises an + exception on failure; the caller is responsible for catching it and + posting an error payload. + """ + filename = data.get("filename") + if not filename: + raise ValueError("Campo 'filename' ausente na mensagem") + url = data.get("url") + video_id = data.get("videoId") + # Determine source video path; n8n stores videos in the 'videos' directory + video_path = os.path.join("videos", filename) + if not os.path.exists(video_path): + raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {video_path}") + # Sanitize the filename to use as directory name + base_no_ext = os.path.splitext(filename)[0] + sanitized = sanitize_filename(base_no_ext) + work_dir = os.path.join("app", "videos", sanitized) + # Transcribe video + segments, words = transcribe(video_path, work_dir) + # Build SRT string + srt_str = build_srt(segments) + # Call Gemini to select highlights + highlights = select_highlights(srt_str) + # Convert start/end times to floats and keep original strings for openrouter + for item in highlights: + item["start"] = item["start"].strip() + item["end"] = item["end"].strip() + # Generate titles + titles = generate_titles(highlights) + # Render clips + output_dir = os.path.join("outputs", sanitized) + processed_files: List[str] = [] + for idx, item in enumerate(titles, start=1): + start_sec = timestamp_to_seconds(item.get("start")) + end_sec = timestamp_to_seconds(item.get("end")) + # Extract relative words for caption + relative_words = [] + for w in words: + # Word must overlap clip interval + if w["end"] <= start_sec or w["start"] >= end_sec: + continue + rel_start = max(0.0, w["start"] - start_sec) + rel_end = max(0.0, w["end"] - start_sec) + relative_words.append({ + "start": rel_start, + "end": rel_end, + "word": w["word"], + }) + # If no words found (e.g. silence), create a dummy word to avoid errors + if not relative_words: + relative_words.append({"start": 0.0, "end": end_sec - start_sec, "word": ""}) + out_path = render_clip( + video_path=video_path, + start=start_sec, + end=end_sec, + top_text=item.get("topText", ""), + words=relative_words, + out_dir=output_dir, + base_name=sanitized, + idx=idx, + ) + processed_files.append(out_path) + # Compose payload + payload = { + "videosProcessedQuantity": len(processed_files), + "filename": filename, + "processedFiles": processed_files, + "url": url, + "videoId": video_id, + "hasError": False, + "error": None, + } + # Clean up working directory and original video + shutil.rmtree(work_dir, ignore_errors=True) + try: + os.remove(video_path) + except FileNotFoundError: + pass + return payload + + +def main(): + print(" [*] Esperando mensagens. Para sair: CTRL+C") + while True: + body = get_next_message() + if body is None: + time.sleep(5) + continue + try: + data = json.loads(body) + except Exception: + print("⚠️ Mensagem inválida recebida (não é JSON)") + continue + try: + result = process_message(data) + except Exception as exc: + # Print stack trace for debugging + traceback.print_exc() + # Attempt to clean up any directories based on filename + filename = data.get("filename") + sanitized = sanitize_filename(os.path.splitext(filename or "")[0]) if filename else "" + work_dir = os.path.join("app", "videos", sanitized) if sanitized else None + output_dir = os.path.join("outputs", sanitized) if sanitized else None + # Remove working and output directories + if work_dir: + shutil.rmtree(work_dir, ignore_errors=True) + if output_dir: + shutil.rmtree(output_dir, ignore_errors=True) + # Remove original video if present + video_path = os.path.join("videos", filename) if filename else None + if video_path and os.path.exists(video_path): + try: + os.remove(video_path) + except Exception: + pass + # Build error payload + error_payload = { + "videosProcessedQuantity": 0, + "filename": filename, + "processedFiles": [], + "url": data.get("url"), + "videoId": data.get("videoId"), + "hasError": True, + "error": str(exc), + } + try: + publish_to_queue(error_payload) + print(f"Mensagem de erro publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") + except Exception as publish_err: + print(f"Erro ao publicar mensagem de erro: {publish_err}") + continue + # On success publish payload + try: + publish_to_queue(result) + print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") + except Exception as publish_err: + print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}") + # Loop continues + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/render.py b/render.py new file mode 100644 index 0000000..539324e --- /dev/null +++ b/render.py @@ -0,0 +1,205 @@ +"""Rendering logic for producing vertical clips with dynamic captions. + +This module defines a single function ``render_clip`` which takes a video +segment and produces a vertical clip suitable for social media. Each clip +contains three regions: + +* A top region (480px high) showing a title generated by an LLM. +* A middle region (960px high) containing the original video, scaled to + fit horizontally while preserving aspect ratio and centred vertically. +* A bottom region (480px high) showing a dynamic caption. The caption + displays a sliding window of three to five words from the transcript, + colouring the currently spoken word differently to draw the viewer's + attention. + +The function uses the MoviePy library to compose the various elements and +writes the resulting video to disk. It returns the path to the created +file. +""" + +from __future__ import annotations + +import os +from typing import Dict, List + +import numpy as np +from moviepy.video.io.VideoFileClip import VideoFileClip +from moviepy.video.VideoClip import ColorClip, VideoClip +from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip +from moviepy.video.VideoClip import TextClip +from PIL import Image, ImageDraw, ImageFont + +from .utils import wrap_text + + +def render_clip( + video_path: str, + start: float, + end: float, + top_text: str, + words: List[Dict[str, float]], + out_dir: str, + base_name: str, + idx: int, + # Use a widely available system font by default. DejaVuSans is installed + # in most Debian-based containers. The caller can override this path. + font_path: str = "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", + final_width: int = 1080, + final_height: int = 1920, + top_h: int = 480, + middle_h: int = 960, + bottom_h: int = 480, + video_codec: str = "libx264", + bitrate: str = "3000k", +) -> str: + """Render a single clip with title and dynamic caption. + + Parameters + ---------- + video_path: str + Path to the source video file. + start: float + Start time of the clip in seconds. + end: float + End time of the clip in seconds. + top_text: str + The title to display in the top region. + words: List[Dict[str, float]] + List of word-level timestamps for this clip. Each dict must have + ``start``, ``end`` and ``word`` keys. The start and end values + should be relative to the beginning of this clip (i.e. start at 0). + out_dir: str + Directory where the output file should be saved. The function + creates this directory if it doesn't exist. + base_name: str + Base name of the original video (sanitized). Used to build the + output filename. + idx: int + Index of the clip. Output will be named ``clip_{idx}.mp4``. + font_path: str + Path to the TrueType font to use for both title and caption. + final_width: int + Width of the final video in pixels. + final_height: int + Height of the final video in pixels. + top_h: int + Height of the title area in pixels. + middle_h: int + Height of the video area in pixels. + bottom_h: int + Height of the caption area in pixels. + video_codec: str + FFmpeg codec to use when writing the video. + bitrate: str + Bitrate for the output video. + + Returns + ------- + str + The path to the rendered video file. + """ + os.makedirs(out_dir, exist_ok=True) + # Extract the segment from the source video + with VideoFileClip(video_path) as clip: + segment = clip.subclip(start, end) + dur = segment.duration + # Background + bg = ColorClip(size=(final_width, final_height), color=(0, 0, 0), duration=dur) + # Resize video to fit width + video_resized = segment.resize(width=final_width) + # Compute vertical position to centre in the middle region + y = top_h + (middle_h - video_resized.h) // 2 + video_resized = video_resized.set_position((0, y)) + + # Build title clip + # Wrap the title to avoid overflow + wrapped_lines = wrap_text(top_text, max_chars=40) + wrapped_title = "\n".join(wrapped_lines) + title_clip = TextClip( + wrapped_title, + font=font_path, + fontsize=70, + color="white", + method="caption", + size=(final_width, top_h), + align="center", + ).set_duration(dur).set_position((0, 0)) + + # Prepare font for caption rendering + pil_font = ImageFont.truetype(font_path, size=60) + default_color = (255, 255, 255) # white + highlight_color = (255, 215, 0) # gold-like yellow + + # Precompute widths of a space and bounding box height for vertical centering + space_width = pil_font.getbbox(" ")[2] - pil_font.getbbox(" ")[0] + bbox = pil_font.getbbox("A") + text_height = bbox[3] - bbox[1] + + def make_caption_frame(t: float): + """Generate an image for the caption at time t.""" + # Determine current word index + idx_cur = 0 + for i, w in enumerate(words): + if w["start"] <= t < w["end"]: + idx_cur = i + break + if t >= w["end"]: + idx_cur = i + # Define window of words to display: show up to 5 words + start_idx = max(0, idx_cur - 2) + end_idx = min(len(words), idx_cur + 3) + window = words[start_idx:end_idx] + # Compute widths for each word + word_sizes = [] + for w in window: + bbox = pil_font.getbbox(w["word"]) + word_width = bbox[2] - bbox[0] + word_sizes.append(word_width) + total_width = sum(word_sizes) + space_width * (len(window) - 1 if window else 0) + # Create blank image for caption area + img = Image.new("RGB", (final_width, bottom_h), color=(0, 0, 0)) + draw = ImageDraw.Draw(img) + x = int((final_width - total_width) / 2) + y_pos = int((bottom_h - text_height) / 2) + for j, w in enumerate(window): + color = highlight_color if (start_idx + j) == idx_cur else default_color + draw.text((x, y_pos), w["word"], font=pil_font, fill=color) + x += word_sizes[j] + space_width + return np.array(img) + + caption_clip = VideoClip(make_frame=make_caption_frame, duration=dur) + caption_clip = caption_clip.set_position((0, final_height - bottom_h)) + + # Compose final clip + final = CompositeVideoClip([ + bg, + video_resized, + title_clip, + caption_clip, + ], size=(final_width, final_height)) + # Use the original audio from the video segment + final_audio = segment.audio + if final_audio is not None: + final = final.set_audio(final_audio) + # Define output path + out_path = os.path.join(out_dir, f"clip_{idx}.mp4") + # Write to disk + final.write_videofile( + out_path, + codec=video_codec, + fps=30, + bitrate=bitrate, + audio_codec="aac", + preset="ultrafast", + ffmpeg_params=[ + "-tune", "zerolatency", + "-pix_fmt", "yuv420p", + "-profile:v", "high", + "-level", "4.1", + ], + threads=4, + ) + # Close clips to free resources + final.close() + segment.close() + return out_path \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f5ce0c5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +pika==1.3.2 +moviepy==2.0.0 +faster-whisper==1.2.0 +openai==1.16.0 +numpy==1.26.4 +Pillow==10.1.0 +unidecode==1.3.6 \ No newline at end of file diff --git a/transcribe.py b/transcribe.py new file mode 100644 index 0000000..8cb4739 --- /dev/null +++ b/transcribe.py @@ -0,0 +1,111 @@ +"""Utilities for extracting audio from video and generating transcriptions. + +This module handles two tasks: + +1. Use FFMPEG to extract the audio track from a video file into a WAV file + suitable for consumption by the Whisper model. The audio is resampled to + 16 kHz mono PCM as required by Whisper. +2. Use the Faster-Whisper implementation to generate a transcription with + word-level timestamps. The transcription is returned both as a list of + segments (for building an SRT) and as a flattened list of words (for + building dynamic subtitles). + +If FFMPEG is not installed or fails, a ``RuntimeError`` is raised. The caller +is responsible for cleaning up the temporary files created in the working +directory. +""" + +from __future__ import annotations + +import os +import subprocess +from typing import Dict, List, Tuple + +from faster_whisper import WhisperModel + + +def extract_audio_ffmpeg(video_path: str, audio_path: str) -> None: + """Use FFMPEG to extract audio from ``video_path`` into ``audio_path``. + + The output will be a 16 kHz mono WAV file in PCM S16LE format. Any + existing file at ``audio_path`` will be overwritten. If ffmpeg returns + a non-zero exit code, a ``RuntimeError`` is raised with the stderr. + """ + cmd = [ + "ffmpeg", + "-y", # overwrite output + "-i", + video_path, + "-vn", # disable video recording + "-acodec", + "pcm_s16le", + "-ar", + "16000", + "-ac", + "1", + audio_path, + ] + proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if proc.returncode != 0: + raise RuntimeError(f"FFMPEG error: {proc.stderr.decode(errors='ignore')}") + + +def load_whisper_model() -> WhisperModel: + """Instantiate and cache a Faster-Whisper model. + + The model name and device can be configured via the ``WHISPER_MODEL`` and + ``WHISPER_DEVICE`` environment variables. The default model is + ``large-v3`` for best accuracy. The device can be ``cuda`` or ``cpu``. + A module-level cache is used to prevent loading the model multiple times. + """ + if hasattr(load_whisper_model, "_cache"): + return load_whisper_model._cache # type: ignore[attr-defined] + model_name = os.environ.get("WHISPER_MODEL", "large-v3") + device = os.environ.get("WHISPER_DEVICE", "cpu") + # Compute type can be set via WHISPER_COMPUTE_TYPE; default to float16 on GPU + compute_type = os.environ.get("WHISPER_COMPUTE_TYPE") + # If not explicitly set, choose sensible defaults + if compute_type is None: + compute_type = "float16" if device == "cuda" else "int8" + model = WhisperModel(model_name, device=device, compute_type=compute_type) + load_whisper_model._cache = model # type: ignore[attr-defined] + return model + + +def transcribe(video_path: str, work_dir: str) -> Tuple[List[Dict[str, float]], List[Dict[str, float]]]: + """Transcribe a video file using Faster-Whisper. + + ``video_path`` is the path to the video to transcribe. ``work_dir`` is a + directory where temporary files will be stored (audio file and + transcription). The function returns a tuple ``(segments, words)`` where + ``segments`` is a list of dictionaries with ``start``, ``end`` and + ``text`` fields, and ``words`` is a flat list of dictionaries with + ``start``, ``end`` and ``word`` fields covering the entire video. + The timestamps are expressed in seconds as floats. + """ + os.makedirs(work_dir, exist_ok=True) + audio_path = os.path.join(work_dir, "audio.wav") + # Extract audio + extract_audio_ffmpeg(video_path, audio_path) + # Load Whisper model + model = load_whisper_model() + # Run transcription with word-level timestamps + segments, info = model.transcribe(audio_path, word_timestamps=True) + seg_list: List[Dict[str, float]] = [] + words_list: List[Dict[str, float]] = [] + for seg in segments: + seg_list.append({ + "start": float(seg.start), + "end": float(seg.end), + "text": seg.text.strip(), + }) + # Each segment may contain words attribute + for w in getattr(seg, "words", []) or []: + words_list.append({ + "start": float(w.start), + "end": float(w.end), + "word": w.word, + }) + # Sort words by start time to be safe + words_list.sort(key=lambda d: d["start"]) + return seg_list, words_list \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..c8f9dbc --- /dev/null +++ b/utils.py @@ -0,0 +1,93 @@ +import re +import unicodedata +from typing import List, Tuple + + +def sanitize_filename(name: str) -> str: + """Return a sanitized version of a filename. + + This helper removes accents, converts to lowercase, replaces spaces + with underscores and removes any non alphanumeric characters except + underscores and dots. This makes the directory names safe to use on + most filesystems and matches the behaviour described in the spec. + """ + if not name: + return "" + # Decompose Unicode characters and strip accents + nfkd_form = unicodedata.normalize("NFKD", name) + no_accents = "".join(c for c in nfkd_form if not unicodedata.combining(c)) + # Replace spaces with underscores + no_spaces = no_accents.replace(" ", "_") + # Lowercase and remove any character that is not a letter, digit, dot or underscore + sanitized = re.sub(r"[^A-Za-z0-9_.]+", "", no_spaces) + return sanitized + + +def timestamp_to_seconds(ts: str) -> float: + """Convert a timestamp in HH:MM:SS,mmm format to seconds. + + The Gemini and OpenRouter prompts use timestamps formatted with a comma + as the decimal separator. This helper splits the string into hours, + minutes and seconds and returns a float expressed in seconds. + """ + if ts is None: + return 0.0 + ts = ts.strip() + if not ts: + return 0.0 + # Replace comma by dot for decimal seconds + ts = ts.replace(",", ".") + parts = ts.split(":") + parts = [float(p) for p in parts] + if len(parts) == 3: + h, m, s = parts + return h * 3600 + m * 60 + s + elif len(parts) == 2: + m, s = parts + return m * 60 + s + else: + # only seconds + return parts[0] + + +def seconds_to_timestamp(seconds: float) -> str: + """Convert a time in seconds to HH:MM:SS,mmm format expected by SRT.""" + if seconds < 0: + seconds = 0 + h = int(seconds // 3600) + m = int((seconds % 3600) // 60) + s = seconds % 60 + # Format with comma as decimal separator and three decimal places + return f"{h:02d}:{m:02d}:{s:06.3f}".replace(".", ",") + + +def wrap_text(text: str, max_chars: int = 80) -> List[str]: + """Simple word-wrap for a string. + + Splits ``text`` into a list of lines, each at most ``max_chars`` + characters long. This does not attempt to hyphenate words – a word + longer than ``max_chars`` will occupy its own line. The return value + is a list of lines without trailing whitespace. + """ + if not text: + return [] + words = text.split() + lines: List[str] = [] + current: List[str] = [] + current_len = 0 + for word in words: + # If adding this word would exceed the max, flush current line + if current and current_len + 1 + len(word) > max_chars: + lines.append(" ".join(current)) + current = [word] + current_len = len(word) + else: + # Add to current line + if current: + current_len += 1 + len(word) + else: + current_len = len(word) + current.append(word) + if current: + lines.append(" ".join(current)) + return lines \ No newline at end of file