"""Entry point for the video processing pipeline. This script listens to a RabbitMQ queue for new video processing tasks. When a message arrives, it performs the following steps: 1. Creates a working directory for the video based off of its filename. 2. Extracts the audio track with FFMPEG and runs Faster-Whisper to produce a transcription with word-level timestamps. 3. Uses the Gemini model to determine which parts of the video have the highest potential for engagement. These highlight segments are represented as a list of objects containing start/end timestamps and text. 4. Uses the OpenRouter model to generate a sensational title for each highlight. Only the ``topText`` field is kept; the description is intentionally omitted since the caption will be burned into the video. 5. Cuts the original video into individual clips corresponding to each highlight and renders them vertically with a title above and a dynamic caption below. 6. Publishes a message to the upload queue with information about the generated clips. On success, this message contains the list of output files. On failure, ``hasError`` will be set to ``True`` and the ``error`` field will describe what went wrong. 7. Cleans up temporary files (audio, transcript, working directory) and deletes the original source video from the ``videos`` directory to conserve disk space. The queue names and RabbitMQ credentials are configured via environment variables. See the accompanying ``docker-compose.yml`` for defaults. """ from __future__ import annotations import json import os import shutil import time import traceback from typing import Any, Dict, List import pika from .utils import sanitize_filename, seconds_to_timestamp, timestamp_to_seconds from .transcribe import transcribe from .llm import LLMError, select_highlights, generate_titles from .render import render_clip # Environment variables with sensible defaults RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "rabbitmq") RABBITMQ_PORT = int(os.environ.get("RABBITMQ_PORT", 5672)) RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "admin") RABBITMQ_PASS = os.environ.get("RABBITMQ_PASS") RABBITMQ_QUEUE = os.environ.get("RABBITMQ_QUEUE", "to-render") RABBITMQ_UPLOAD_QUEUE = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload") if not RABBITMQ_PASS: raise RuntimeError("RABBITMQ_PASS não definido no ambiente") def get_next_message() -> Any: """Retrieve a single message from the RABBITMQ_QUEUE. Returns ``None`` if no messages are available. This helper opens a new connection for each call to avoid keeping stale connections alive. """ credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, heartbeat=60, blocked_connection_timeout=300, ) connection = pika.BlockingConnection(parameters) channel = connection.channel() method_frame, _, body = channel.basic_get(RABBITMQ_QUEUE) if method_frame: channel.basic_ack(method_frame.delivery_tag) connection.close() return body connection.close() return None def publish_to_queue(payload: Dict[str, Any]) -> None: """Publish a JSON-serialisable payload to the RABBITMQ_UPLOAD_QUEUE.""" credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, heartbeat=60, blocked_connection_timeout=300, ) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True) channel.basic_publish( exchange="", routing_key=RABBITMQ_UPLOAD_QUEUE, body=json.dumps(payload), properties=pika.BasicProperties(delivery_mode=2), ) connection.close() def build_srt(segments: List[Dict[str, Any]]) -> str: """Build an SRT-like string from a list of segments. Each segment should have ``start``, ``end`` and ``text`` fields. The timestamps are converted to the ``HH:MM:SS,mmm`` format expected by the Gemini prompt. Segments are separated by a blank line. """ lines = [] for seg in segments: start_ts = seconds_to_timestamp(seg["start"]) end_ts = seconds_to_timestamp(seg["end"]) lines.append(f"{start_ts} --> {end_ts}\n{seg['text']}") return "\n\n".join(lines) def process_message(data: Dict[str, Any]) -> Dict[str, Any]: """Process a single video task described in ``data``. Returns the payload to be sent to the upload queue. Raises an exception on failure; the caller is responsible for catching it and posting an error payload. """ filename = data.get("filename") if not filename: raise ValueError("Campo 'filename' ausente na mensagem") url = data.get("url") video_id = data.get("videoId") # Determine source video path; n8n stores videos in the 'videos' directory video_path = os.path.join("videos", filename) if not os.path.exists(video_path): raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {video_path}") # Sanitize the filename to use as directory name base_no_ext = os.path.splitext(filename)[0] sanitized = sanitize_filename(base_no_ext) work_dir = os.path.join("app", "videos", sanitized) # Transcribe video segments, words = transcribe(video_path, work_dir) # Build SRT string srt_str = build_srt(segments) # Call Gemini to select highlights highlights = select_highlights(srt_str) # Convert start/end times to floats and keep original strings for openrouter for item in highlights: item["start"] = item["start"].strip() item["end"] = item["end"].strip() # Generate titles titles = generate_titles(highlights) # Render clips output_dir = os.path.join("outputs", sanitized) processed_files: List[str] = [] for idx, item in enumerate(titles, start=1): start_sec = timestamp_to_seconds(item.get("start")) end_sec = timestamp_to_seconds(item.get("end")) # Extract relative words for caption relative_words = [] for w in words: # Word must overlap clip interval if w["end"] <= start_sec or w["start"] >= end_sec: continue rel_start = max(0.0, w["start"] - start_sec) rel_end = max(0.0, w["end"] - start_sec) relative_words.append({ "start": rel_start, "end": rel_end, "word": w["word"], }) # If no words found (e.g. silence), create a dummy word to avoid errors if not relative_words: relative_words.append({"start": 0.0, "end": end_sec - start_sec, "word": ""}) out_path = render_clip( video_path=video_path, start=start_sec, end=end_sec, top_text=item.get("topText", ""), words=relative_words, out_dir=output_dir, base_name=sanitized, idx=idx, ) processed_files.append(out_path) # Compose payload payload = { "videosProcessedQuantity": len(processed_files), "filename": filename, "processedFiles": processed_files, "url": url, "videoId": video_id, "hasError": False, "error": None, } # Clean up working directory and original video shutil.rmtree(work_dir, ignore_errors=True) try: os.remove(video_path) except FileNotFoundError: pass return payload def main(): print(" [*] Esperando mensagens. Para sair: CTRL+C") while True: body = get_next_message() if body is None: time.sleep(5) continue try: data = json.loads(body) except Exception: print("⚠️ Mensagem inválida recebida (não é JSON)") continue try: result = process_message(data) except Exception as exc: # Print stack trace for debugging traceback.print_exc() # Attempt to clean up any directories based on filename filename = data.get("filename") sanitized = sanitize_filename(os.path.splitext(filename or "")[0]) if filename else "" work_dir = os.path.join("app", "videos", sanitized) if sanitized else None output_dir = os.path.join("outputs", sanitized) if sanitized else None # Remove working and output directories if work_dir: shutil.rmtree(work_dir, ignore_errors=True) if output_dir: shutil.rmtree(output_dir, ignore_errors=True) # Remove original video if present video_path = os.path.join("videos", filename) if filename else None if video_path and os.path.exists(video_path): try: os.remove(video_path) except Exception: pass # Build error payload error_payload = { "videosProcessedQuantity": 0, "filename": filename, "processedFiles": [], "url": data.get("url"), "videoId": data.get("videoId"), "hasError": True, "error": str(exc), } try: publish_to_queue(error_payload) print(f"Mensagem de erro publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") except Exception as publish_err: print(f"Erro ao publicar mensagem de erro: {publish_err}") continue # On success publish payload try: publish_to_queue(result) print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") except Exception as publish_err: print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}") # Loop continues if __name__ == "__main__": main()