Dentre eles estão recurso de adicao do faster-whisper, geração de legenda e integracao com Gemini e Open Router
266 lines
10 KiB
Python
266 lines
10 KiB
Python
"""Entry point for the video processing pipeline.
|
|
|
|
This script listens to a RabbitMQ queue for new video processing tasks. When
|
|
a message arrives, it performs the following steps:
|
|
|
|
1. Creates a working directory for the video based off of its filename.
|
|
2. Extracts the audio track with FFMPEG and runs Faster-Whisper to produce
|
|
a transcription with word-level timestamps.
|
|
3. Uses the Gemini model to determine which parts of the video have the
|
|
highest potential for engagement. These highlight segments are
|
|
represented as a list of objects containing start/end timestamps and
|
|
text.
|
|
4. Uses the OpenRouter model to generate a sensational title for each
|
|
highlight. Only the ``topText`` field is kept; the description is
|
|
intentionally omitted since the caption will be burned into the video.
|
|
5. Cuts the original video into individual clips corresponding to each
|
|
highlight and renders them vertically with a title above and a dynamic
|
|
caption below.
|
|
6. Publishes a message to the upload queue with information about the
|
|
generated clips. On success, this message contains the list of output
|
|
files. On failure, ``hasError`` will be set to ``True`` and the
|
|
``error`` field will describe what went wrong.
|
|
7. Cleans up temporary files (audio, transcript, working directory) and
|
|
deletes the original source video from the ``videos`` directory to
|
|
conserve disk space.
|
|
|
|
The queue names and RabbitMQ credentials are configured via environment
|
|
variables. See the accompanying ``docker-compose.yml`` for defaults.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import time
|
|
import traceback
|
|
from typing import Any, Dict, List
|
|
|
|
import pika
|
|
|
|
from .utils import sanitize_filename, seconds_to_timestamp, timestamp_to_seconds
|
|
from .transcribe import transcribe
|
|
from .llm import LLMError, select_highlights, generate_titles
|
|
from .render import render_clip
|
|
|
|
|
|
# Environment variables with sensible defaults
|
|
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "rabbitmq")
|
|
RABBITMQ_PORT = int(os.environ.get("RABBITMQ_PORT", 5672))
|
|
RABBITMQ_USER = os.environ.get("RABBITMQ_USER", "admin")
|
|
RABBITMQ_PASS = os.environ.get("RABBITMQ_PASS")
|
|
RABBITMQ_QUEUE = os.environ.get("RABBITMQ_QUEUE", "to-render")
|
|
RABBITMQ_UPLOAD_QUEUE = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
|
|
|
|
if not RABBITMQ_PASS:
|
|
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
|
|
|
|
|
|
def get_next_message() -> Any:
|
|
"""Retrieve a single message from the RABBITMQ_QUEUE.
|
|
|
|
Returns ``None`` if no messages are available. This helper opens a new
|
|
connection for each call to avoid keeping stale connections alive.
|
|
"""
|
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
|
parameters = pika.ConnectionParameters(
|
|
host=RABBITMQ_HOST,
|
|
port=RABBITMQ_PORT,
|
|
credentials=credentials,
|
|
heartbeat=60,
|
|
blocked_connection_timeout=300,
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
method_frame, _, body = channel.basic_get(RABBITMQ_QUEUE)
|
|
if method_frame:
|
|
channel.basic_ack(method_frame.delivery_tag)
|
|
connection.close()
|
|
return body
|
|
connection.close()
|
|
return None
|
|
|
|
|
|
def publish_to_queue(payload: Dict[str, Any]) -> None:
|
|
"""Publish a JSON-serialisable payload to the RABBITMQ_UPLOAD_QUEUE."""
|
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
|
parameters = pika.ConnectionParameters(
|
|
host=RABBITMQ_HOST,
|
|
port=RABBITMQ_PORT,
|
|
credentials=credentials,
|
|
heartbeat=60,
|
|
blocked_connection_timeout=300,
|
|
)
|
|
connection = pika.BlockingConnection(parameters)
|
|
channel = connection.channel()
|
|
channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True)
|
|
channel.basic_publish(
|
|
exchange="",
|
|
routing_key=RABBITMQ_UPLOAD_QUEUE,
|
|
body=json.dumps(payload),
|
|
properties=pika.BasicProperties(delivery_mode=2),
|
|
)
|
|
connection.close()
|
|
|
|
|
|
def build_srt(segments: List[Dict[str, Any]]) -> str:
|
|
"""Build an SRT-like string from a list of segments.
|
|
|
|
Each segment should have ``start``, ``end`` and ``text`` fields. The
|
|
timestamps are converted to the ``HH:MM:SS,mmm`` format expected by
|
|
the Gemini prompt. Segments are separated by a blank line.
|
|
"""
|
|
lines = []
|
|
for seg in segments:
|
|
start_ts = seconds_to_timestamp(seg["start"])
|
|
end_ts = seconds_to_timestamp(seg["end"])
|
|
lines.append(f"{start_ts} --> {end_ts}\n{seg['text']}")
|
|
return "\n\n".join(lines)
|
|
|
|
|
|
def process_message(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Process a single video task described in ``data``.
|
|
|
|
Returns the payload to be sent to the upload queue. Raises an
|
|
exception on failure; the caller is responsible for catching it and
|
|
posting an error payload.
|
|
"""
|
|
filename = data.get("filename")
|
|
if not filename:
|
|
raise ValueError("Campo 'filename' ausente na mensagem")
|
|
url = data.get("url")
|
|
video_id = data.get("videoId")
|
|
# Determine source video path; n8n stores videos in the 'videos' directory
|
|
video_path = os.path.join("videos", filename)
|
|
if not os.path.exists(video_path):
|
|
raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {video_path}")
|
|
# Sanitize the filename to use as directory name
|
|
base_no_ext = os.path.splitext(filename)[0]
|
|
sanitized = sanitize_filename(base_no_ext)
|
|
work_dir = os.path.join("app", "videos", sanitized)
|
|
# Transcribe video
|
|
segments, words = transcribe(video_path, work_dir)
|
|
# Build SRT string
|
|
srt_str = build_srt(segments)
|
|
# Call Gemini to select highlights
|
|
highlights = select_highlights(srt_str)
|
|
# Convert start/end times to floats and keep original strings for openrouter
|
|
for item in highlights:
|
|
item["start"] = item["start"].strip()
|
|
item["end"] = item["end"].strip()
|
|
# Generate titles
|
|
titles = generate_titles(highlights)
|
|
# Render clips
|
|
output_dir = os.path.join("outputs", sanitized)
|
|
processed_files: List[str] = []
|
|
for idx, item in enumerate(titles, start=1):
|
|
start_sec = timestamp_to_seconds(item.get("start"))
|
|
end_sec = timestamp_to_seconds(item.get("end"))
|
|
# Extract relative words for caption
|
|
relative_words = []
|
|
for w in words:
|
|
# Word must overlap clip interval
|
|
if w["end"] <= start_sec or w["start"] >= end_sec:
|
|
continue
|
|
rel_start = max(0.0, w["start"] - start_sec)
|
|
rel_end = max(0.0, w["end"] - start_sec)
|
|
relative_words.append({
|
|
"start": rel_start,
|
|
"end": rel_end,
|
|
"word": w["word"],
|
|
})
|
|
# If no words found (e.g. silence), create a dummy word to avoid errors
|
|
if not relative_words:
|
|
relative_words.append({"start": 0.0, "end": end_sec - start_sec, "word": ""})
|
|
out_path = render_clip(
|
|
video_path=video_path,
|
|
start=start_sec,
|
|
end=end_sec,
|
|
top_text=item.get("topText", ""),
|
|
words=relative_words,
|
|
out_dir=output_dir,
|
|
base_name=sanitized,
|
|
idx=idx,
|
|
)
|
|
processed_files.append(out_path)
|
|
# Compose payload
|
|
payload = {
|
|
"videosProcessedQuantity": len(processed_files),
|
|
"filename": filename,
|
|
"processedFiles": processed_files,
|
|
"url": url,
|
|
"videoId": video_id,
|
|
"hasError": False,
|
|
"error": None,
|
|
}
|
|
# Clean up working directory and original video
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
try:
|
|
os.remove(video_path)
|
|
except FileNotFoundError:
|
|
pass
|
|
return payload
|
|
|
|
|
|
def main():
|
|
print(" [*] Esperando mensagens. Para sair: CTRL+C")
|
|
while True:
|
|
body = get_next_message()
|
|
if body is None:
|
|
time.sleep(5)
|
|
continue
|
|
try:
|
|
data = json.loads(body)
|
|
except Exception:
|
|
print("⚠️ Mensagem inválida recebida (não é JSON)")
|
|
continue
|
|
try:
|
|
result = process_message(data)
|
|
except Exception as exc:
|
|
# Print stack trace for debugging
|
|
traceback.print_exc()
|
|
# Attempt to clean up any directories based on filename
|
|
filename = data.get("filename")
|
|
sanitized = sanitize_filename(os.path.splitext(filename or "")[0]) if filename else ""
|
|
work_dir = os.path.join("app", "videos", sanitized) if sanitized else None
|
|
output_dir = os.path.join("outputs", sanitized) if sanitized else None
|
|
# Remove working and output directories
|
|
if work_dir:
|
|
shutil.rmtree(work_dir, ignore_errors=True)
|
|
if output_dir:
|
|
shutil.rmtree(output_dir, ignore_errors=True)
|
|
# Remove original video if present
|
|
video_path = os.path.join("videos", filename) if filename else None
|
|
if video_path and os.path.exists(video_path):
|
|
try:
|
|
os.remove(video_path)
|
|
except Exception:
|
|
pass
|
|
# Build error payload
|
|
error_payload = {
|
|
"videosProcessedQuantity": 0,
|
|
"filename": filename,
|
|
"processedFiles": [],
|
|
"url": data.get("url"),
|
|
"videoId": data.get("videoId"),
|
|
"hasError": True,
|
|
"error": str(exc),
|
|
}
|
|
try:
|
|
publish_to_queue(error_payload)
|
|
print(f"Mensagem de erro publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
|
|
except Exception as publish_err:
|
|
print(f"Erro ao publicar mensagem de erro: {publish_err}")
|
|
continue
|
|
# On success publish payload
|
|
try:
|
|
publish_to_queue(result)
|
|
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
|
|
except Exception as publish_err:
|
|
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
|
|
# Loop continues
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |