22 Commits

Author SHA1 Message Date
LeoMortari
f496663b63 Ajusta presets de render 2026-01-04 03:34:48 -03:00
LeoMortari
e4c5c6adfe Ajusta heartbeat do rabbit 2026-01-03 23:13:27 -03:00
LeoMortari
21d2d19435 Ajusta rabbit config 2026-01-03 19:51:31 -03:00
LeoMortari
3f7329869d Ajusta contexto, falas e foco, tremulação do video e demais bugs 2026-01-03 19:42:23 -03:00
LeoMortari
c1914dad00 Add return de excessao 2026-01-02 11:26:26 -03:00
LeoMortari
07d301f110 Realiza varios ajustes para melhorar o tracking e o render de video 2025-12-18 02:26:25 -03:00
LeoMortari
78e35d65fd Merge branch 'feat' 2025-11-12 11:43:49 -03:00
LeoMortari
c5d3e83a5f #v2 - Inicia testes da v2
- Adiciona rastreamento de objetos
- Facial detection
- Legenda interativa
- Cortes mais precisos
- Refinamento do Prompt
2025-11-12 11:38:09 -03:00
LeoMortari
87c6a5e27c Adiciona limpeza de arquivos apos sucesso ou falha 2025-10-29 23:58:06 -03:00
LeoMortari
ae8b228ea1 Add gemini api key env 2025-10-29 08:34:57 -03:00
LeoMortari
8abb8001d7 Ajusta configs do compose 2025-10-29 08:27:02 -03:00
LeoMortari
c18884e778 Finaliza os ajustes para render de video 2025-10-28 17:34:13 -03:00
LeoMortari
b5a27fa938 Ajustes do Gemini 2025-10-27 14:08:10 -03:00
LeoMortari
2692cc4dfd Ajusta git ignore 2025-10-27 09:15:43 -03:00
LeoMortari
8caa849148 Ajustes de rendering 2025-10-27 09:15:12 -03:00
LeoMortari
ba768cf093 Ajusta demais partes do projeto 2025-10-25 00:54:30 -03:00
LeoMortari
b9e1dcd1e2 Ajustes no dockerfile 2025-10-22 13:14:56 -03:00
LeoMortari
c641fd6331 Ajusta docker 2025-10-22 12:02:38 -03:00
LeoMortari
b090f7c2cb Cria novos components 2025-10-20 17:56:36 -03:00
2b99d2ad78 Eliminar .DS_Store 2025-10-17 14:32:36 +02:00
LeoMortari
7ccc745a5d Add git ignore 2025-10-17 09:32:06 -03:00
LeoMortari
0c0a9c3b5c Inicia novos recursos
Dentre eles estão recurso de adicao do faster-whisper, geração de legenda e integracao com Gemini e Open Router
2025-10-17 09:27:50 -03:00
32 changed files with 4425 additions and 146 deletions

47
.env.example Normal file
View File

@@ -0,0 +1,47 @@
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_USER=admin
RABBITMQ_PASS=your_password_here
RABBITMQ_QUEUE=to-render
RABBITMQ_UPLOAD_QUEUE=to-upload
RABBITMQ_PREFETCH=1
RABBITMQ_HEARTBEAT=60
RABBITMQ_BLOCKED_TIMEOUT=300
OPENROUTER_API_URL=https://openrouter.ai/api/v1/chat/completions
OPENROUTER_API_KEY=your_openrouter_api_key_here
# Model selection - Recommended options:
# - openai/gpt-oss-20b:free (Free tier, good quality)
# - qwen/qwen-2.5-72b-instruct:free (Free, excellent reasoning)
# - google/gemini-pro-1.5 (Best cost-benefit for podcasts)
# - anthropic/claude-3.5-sonnet (Premium quality, best reasoning)
OPENROUTER_MODEL=qwen/qwen-2.5-72b-instruct:free
OPENROUTER_TEMPERATURE=0.6
OPENROUTER_PROMPT_PATH=prompts/generate.txt
FASTER_WHISPER_MODEL_SIZE=medium
FASTER_WHISPER_DEVICE=auto
RENDER_WIDTH=1080
RENDER_HEIGHT=1920
RENDER_FPS=30
RENDER_CODEC=libx264
RENDER_AUDIO_CODEC=aac
RENDER_BITRATE=5000k
RENDER_PRESET=faster
SUBTITLE_HIGHLIGHT_COLOR=#00FF00
SUBTITLE_BASE_COLOR=#FFFFFF
RENDER_FONT_PATH=./Montserrat.ttf
RENDER_TITLE_FONT_SIZE=110
RENDER_SUBTITLE_FONT_SIZE=64
CAPTION_MIN_WORDS=2
CAPTION_MAX_WORDS=2
ENABLE_SMART_FRAMING=true
SMART_FRAMING_MIN_CONFIDENCE=0.5
SMART_FRAMING_SMOOTHING_WINDOW=20
SMART_FRAMING_FRAME_SKIP=2

38
.gitignore vendored
View File

@@ -1,4 +1,34 @@
/videos
/outputs
/temp
/components/__pycache__
# Ignore Python files
*.pyc
*.pyo
*.pyd
/__pycache__/
*.egg-info/
.eggs/
dist/
build/
doc/
videos/
outputs/
.DS_STORE
# Ignore virtual envs
venv/
env/
.claude
# Ignore editor files
.idea/
*.swp
*.swo
# Ignore project files
*.tmproj
*.sublime-project
*.sublime-workspace
# Ignore git itself
.git
# Ignore mypy and pylint cache
.mypy_cache/
.pylint.d/
CLAUDE.MD

View File

@@ -40,14 +40,17 @@ def cut_video_new_clip(input_path: str, start: float, end: float, output_path: s
segment = clip.subclipped(start, end)
fps = clip.fps or 30
if segment.h < 720:
segment = segment.resized(height=720)
segment.write_videofile(
output_path,
codec=video_codec,
remove_temp=True,
fps=fps,
bitrate="3000k",
bitrate="5000k",
ffmpeg_params=[
"-preset", "ultrafast",
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
@@ -98,9 +101,9 @@ def process_segment(input_path: str, top_text: str = "", bottom_text: str = "",
codec=video_codec,
remove_temp=True,
fps=30,
bitrate="3000k",
bitrate="5000k",
ffmpeg_params=[
"-preset", "ultrafast",
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",

View File

@@ -1,32 +1,32 @@
services:
video-render-api:
video-render:
restart: unless-stopped
build: .
container_name: video-render
build:
context: .
no_cache: true
dockerfile: dockerfile
environment:
- RABBITMQ_PASS=${RABBITMQ_PASS}
ports:
- "5000:5000"
- OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OPENROUTER_MODEL=${OPENROUTER_MODEL:-mistralai/mistral-small-3.1-24b-instruct:free}
- OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt}
- FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium}
- SMART_FRAMING_SMOOTHING_WINDOW=${SMART_FRAMING_SMOOTHING_WINDOW:-30}
- SMART_FRAMING_MAX_VELOCITY=${SMART_FRAMING_MAX_VELOCITY:-40}
- SMART_FRAMING_FRAME_SKIP=${SMART_FRAMING_FRAME_SKIP:-2}
- SMART_FRAMING_PERSON_SWITCH_COOLDOWN=${SMART_FRAMING_PERSON_SWITCH_COOLDOWN:-60}
volumes:
- "/root/videos:/app/videos"
- "/root/temp:/app/temp"
- "/root/outputs:/app/outputs"
# gpus: all
# environment:
# - NVIDIA_VISIBLE_DEVICES=all
# - NVIDIA_DRIVER_CAPABILITIES=compute,video,utility
- "/root/prompts:/app/prompts"
# - "./videos:/app/videos"
# - "./outputs:/app/outputs"
# - "./prompts:/app/prompts"
command: "python -u main.py"
# runtime: nvidia
networks:
- dokploy-network
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: all
# capabilities: [gpu]
networks:
dokploy-network:
external: true

View File

@@ -2,35 +2,42 @@ FROM python:3.11-slim
WORKDIR /app
EXPOSE 5000
ENV DEBIAN_FRONTEND=noninteractive
COPY requirements.txt Montserrat.ttf ./
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1
RUN apt-get update && \
apt-get install -qq -y \
build-essential \
xvfb \
xdg-utils \
wget \
unzip \
apt-get install -y --no-install-recommends \
ffmpeg \
libpq-dev \
vim \
libavcodec-dev \
libavdevice-dev \
libavfilter-dev \
libavformat-dev \
libavutil-dev \
libswresample-dev \
libswscale-dev \
libgl1 \
libglib2.0-0 \
libgomp1 \
libmagick++-dev \
imagemagick \
fonts-liberation \
sox \
bc \
gsfonts && \
fc-cache -fv && \
rm -rf /var/lib/apt/lists/*
wget \
libsm6 \
libxext6 \
libxrender-dev \
&& rm -rf /var/lib/apt/lists/*
RUN pip install --no-cache-dir -r requirements.txt
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir setuptools wheel && \
pip install --no-cache-dir -r requirements.txt
COPY . .
VOLUME ["/app"]
RUN mkdir -p /app/videos /app/outputs
VOLUME ["/app/videos", "/app/outputs"]
CMD ["python", "-u", "main.py"]

110
main.py
View File

@@ -1,103 +1,31 @@
import os
import pika
import json
import time
from components.video import process_full_video
import warnings
RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'rabbitmq')
RABBITMQ_PORT = int(os.environ.get('RABBITMQ_PORT', 5672))
RABBITMQ_USER = os.environ.get('RABBITMQ_USER', 'admin')
RABBITMQ_PASS = os.environ.get('RABBITMQ_PASS')
RABBITMQ_QUEUE = os.environ.get('RABBITMQ_QUEUE', 'to-render')
RABBITMQ_UPLOAD_QUEUE = os.environ.get('RABBITMQ_UPLOAD_QUEUE', 'to-upload')
# Suppress FFmpeg/AV1 warnings for cleaner logs
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
if not RABBITMQ_PASS:
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
# Suppress MoviePy verbose logging
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
def get_next_message():
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
method_frame, header_frame, body = channel.basic_get(RABBITMQ_QUEUE)
if method_frame:
channel.basic_ack(method_frame.delivery_tag)
connection.close()
return body
else:
connection.close()
return None
# Filter deprecation warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=UserWarning, module='moviepy')
def publish_to_queue(payload):
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
parameters = pika.ConnectionParameters(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
credentials=credentials,
heartbeat=60,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True)
channel.basic_publish(
exchange='',
routing_key=RABBITMQ_UPLOAD_QUEUE,
body=json.dumps(payload),
properties=pika.BasicProperties(
delivery_mode=2, # persistente
)
)
connection.close()
from video_render.config import load_settings
from video_render.logging_utils import setup_logging
from video_render.messaging import RabbitMQWorker
from video_render.pipeline import VideoPipeline
def main():
print(' [*] Esperando mensagens. Para sair: CTRL+C')
while True:
body = get_next_message()
if body is None:
time.sleep(5)
continue
try:
data = json.loads(body)
filename = data.get("filename")
times = data.get("times", [])
url = data.get("url")
video_id = data.get("videoId")
print(f"Processando vídeo: {filename}")
def main() -> None:
setup_logging()
settings = load_settings()
processed_files = process_full_video(filename, times)
pipeline = VideoPipeline(settings)
worker = RabbitMQWorker(settings)
worker.consume_forever(pipeline.process_message)
payload = {
"videosProcessedQuantity": len(processed_files),
"filename": filename,
"processedFiles": processed_files,
"url": url,
"videoId": video_id,
"error": False,
}
except Exception as e:
payload = {
"videosProcessedQuantity": 0,
"filename": filename if 'filename' in locals() else None,
"processedFiles": [],
"url": url if 'url' in locals() else None,
"videoId": video_id if 'video_id' in locals() else None,
"error": str(e),
}
print(f"Erro no processamento: {e}")
try:
publish_to_queue(payload)
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
except Exception as publish_err:
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
if __name__ == "__main__":
main()

111
prompts/generate.txt Normal file
View File

@@ -0,0 +1,111 @@
# TAREFA: Extrair clips virais de uma transcrição de vídeo
Você é um especialista em conteúdo viral para TikTok, Instagram Reels e YouTube Shorts.
## REGRA MAIS IMPORTANTE - DURAÇÃO DOS CLIPS
**CADA CLIP DEVE TER ENTRE 60 E 120 SEGUNDOS DE DURAÇÃO.**
- MÍNIMO ABSOLUTO: 60 segundos (end - start >= 60)
- MÁXIMO: 120 segundos (end - start <= 120)
- IDEAL: 60-90 segundos
**CLIPS COM MENOS DE 60 SEGUNDOS SERÃO REJEITADOS PELO SISTEMA.**
Antes de incluir um clip, SEMPRE calcule: end - start >= 60
## QUANTIDADE DE CLIPS
Baseado na duração total do vídeo:
- Até 10 min: 2-4 clips
- 10-20 min: 4-6 clips
- 20-30 min: 6-10 clips
- 30+ min: 8-15 clips
## CRITÉRIOS DE SELEÇÃO
Um bom clip viral possui:
1. GANCHO FORTE nos primeiros 3 segundos (pergunta, afirmação chocante, promessa)
2. EMOÇÃO (humor, surpresa, indignação, curiosidade)
3. VALOR (ensina algo, revela segredo, dá dica prática)
4. ESTRUTURA (início, meio e fim coerentes)
5. RITMO (sem pausas longas, dinâmico)
## O QUE EVITAR
- Introduções genéricas ("oi pessoal", "então", "bem")
- Trechos com pausas longas (> 3 segundos de silêncio)
- Segmentos sem contexto ou conclusão
- Explicações técnicas monótonas
## FORMATO DE RESPOSTA
Retorne APENAS um JSON válido, sem texto antes ou depois:
```json
{
"highlights": [
{
"start": 0.0,
"end": 75.0,
"summary": "Descrição do que acontece neste trecho"
},
{
"start": 120.5,
"end": 195.0,
"summary": "Descrição do que acontece neste trecho"
}
]
}
```
## REGRAS DO JSON
- "start" e "end" são números decimais (float) em SEGUNDOS
- Use ponto como separador decimal (60.5, não 60,5)
- "summary" é uma descrição breve do conteúdo (1-2 frases)
- Clips em ordem cronológica (start crescente)
- Clips não podem se sobrepor
## CHECKLIST ANTES DE RESPONDER
Para CADA clip, verifique:
- [ ] end - start >= 60 segundos?
- [ ] end - start <= 120 segundos?
- [ ] Tem gancho forte no início?
- [ ] Faz sentido isolado do resto do vídeo?
- [ ] JSON está válido?
## EXEMPLO
Se o vídeo tem 15 minutos e você encontrou 4 momentos virais:
```json
{
"highlights": [
{
"start": 60.0,
"end": 120.0,
"summary": "Revelação sobre como economizar 50% nas compras"
},
{
"start": 180.0,
"end": 255.0,
"summary": "História engraçada sobre cliente que tentou enganar a loja"
},
{
"start": 400.0,
"end": 480.0,
"summary": "Dica prática de negociação com fornecedores"
},
{
"start": 600.0,
"end": 690.0,
"summary": "Conclusão motivacional sobre empreendedorismo"
}
]
}
```
Agora analise a transcrição fornecida e extraia os clips virais seguindo estas instruções.

View File

@@ -1,4 +1,9 @@
moviepy==2.2.0
pillow==9.5.0
numpy>=1.26.0
requests
pika
faster-whisper==1.2.0
mediapipe==0.10.18
opencv-python==4.10.0.84
scipy>=1.11.0

4
video_render/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
"""
Core package for the revamped video rendering pipeline.
"""

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

99
video_render/config.py Normal file
View File

@@ -0,0 +1,99 @@
from __future__ import annotations
import os
from dataclasses import dataclass
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
VIDEOS_ROOT = BASE_DIR / "videos"
OUTPUTS_ROOT = BASE_DIR / "outputs"
TEMP_ROOT = BASE_DIR / "temp"
@dataclass(frozen=True)
class RabbitMQSettings:
# host: str = os.environ.get("RABBITMQ_HOST", "154.12.229.181")
# port: int = int(os.environ.get("RABBITMQ_PORT", 32790))
host: str = os.environ.get("RABBITMQ_HOST", "rabbitmq")
port: int = int(os.environ.get("RABBITMQ_PORT", 5672))
user: str = os.environ.get("RABBITMQ_USER", "admin")
password: str = os.environ.get("RABBITMQ_PASS")
consume_queue: str = os.environ.get("RABBITMQ_QUEUE", "to-render")
publish_queue: str = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
prefetch_count: int = int(os.environ.get("RABBITMQ_PREFETCH", 1))
heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 600))
blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 7200))
@dataclass(frozen=True)
class OpenRouterSettings:
api_key: str = os.environ.get("OPENROUTER_API_KEY", "https://openrouter.ai/api/v1/chat/completions")
model: str = os.environ.get(
"OPENROUTER_MODEL", "openai/gpt-oss-20b:free"
)
temperature: float = float(os.environ.get("OPENROUTER_TEMPERATURE", 0.6))
prompt_path: str = os.environ.get("OPENROUTER_PROMPT_PATH", "prompts/generate.txt")
@dataclass(frozen=True)
class WhisperSettings:
model_size: str = os.environ.get("FASTER_WHISPER_MODEL_SIZE", "medium")
device: str | None = os.environ.get("FASTER_WHISPER_DEVICE")
compute_type: str | None = os.environ.get("FASTER_WHISPER_COMPUTE_TYPE")
download_root: Path = Path(
os.environ.get("FASTER_WHISPER_DOWNLOAD_ROOT", str(BASE_DIR / ".whisper"))
)
@dataclass(frozen=True)
class RenderingSettings:
frame_width: int = int(os.environ.get("RENDER_WIDTH", 1080))
frame_height: int = int(os.environ.get("RENDER_HEIGHT", 1920))
fps: int = int(os.environ.get("RENDER_FPS", 30))
video_codec: str = os.environ.get("RENDER_CODEC", "libx264")
audio_codec: str = os.environ.get("RENDER_AUDIO_CODEC", "aac")
bitrate: str = os.environ.get("RENDER_BITRATE", "5000k")
preset: str = os.environ.get("RENDER_PRESET", "faster")
highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#00FF00")
base_color: str = os.environ.get("SUBTITLE_BASE_COLOR", "#FFFFFF")
font_path: Path = Path(os.environ.get("RENDER_FONT_PATH", "./Montserrat.ttf"))
title_font_size: int = int(os.environ.get("RENDER_TITLE_FONT_SIZE", 110))
subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64))
caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2))
caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2))
enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes")
smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.3))
smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 30))
smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 1))
smart_framing_max_velocity: int = int(os.environ.get("SMART_FRAMING_MAX_VELOCITY", 25))
smart_framing_person_switch_cooldown: int = int(os.environ.get("SMART_FRAMING_PERSON_SWITCH_COOLDOWN", 30))
smart_framing_response_time: float = float(os.environ.get("SMART_FRAMING_RESPONSE_TIME", 0.6))
smart_framing_group_padding: float = float(os.environ.get("SMART_FRAMING_GROUP_PADDING", 0.15))
smart_framing_max_zoom_out: float = float(os.environ.get("SMART_FRAMING_MAX_ZOOM_OUT", 2.0))
smart_framing_dead_zone: int = int(os.environ.get("SMART_FRAMING_DEAD_ZONE", 60))
@dataclass(frozen=True)
class Settings:
rabbitmq: RabbitMQSettings = RabbitMQSettings()
openrouter: OpenRouterSettings = OpenRouterSettings()
whisper: WhisperSettings = WhisperSettings()
rendering: RenderingSettings = RenderingSettings()
videos_dir: Path = VIDEOS_ROOT
outputs_dir: Path = OUTPUTS_ROOT
temp_dir: Path = TEMP_ROOT
def load_settings() -> Settings:
settings = Settings()
if not settings.rabbitmq.password:
raise RuntimeError("RABBITMQ_PASS must be provided")
settings.videos_dir.mkdir(parents=True, exist_ok=True)
settings.outputs_dir.mkdir(parents=True, exist_ok=True)
settings.temp_dir.mkdir(parents=True, exist_ok=True)
return settings

View File

@@ -0,0 +1,844 @@
"""
Context detection module for video analysis.
This module provides functionality to detect faces, track people,
and identify who is speaking in video content using MediaPipe and audio analysis.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import cv2
import mediapipe as mp
import numpy as np
from scipy import signal
logger = logging.getLogger(__name__)
@dataclass
class FaceDetection:
"""Represents a detected face in a frame."""
x: int
y: int
width: int
height: int
confidence: float
center_x: int
center_y: int
landmarks: Optional[List[Tuple[int, int]]] = None
@dataclass
class PersonTracking:
"""Tracks a person across frames."""
person_id: int
face: FaceDetection
is_speaking: bool
speaking_confidence: float
frame_number: int
@dataclass
class GroupBoundingBox:
"""Bounding box containing all tracked faces."""
x: int
y: int
width: int
height: int
center_x: int
center_y: int
face_count: int
@dataclass
class FrameContext:
"""Context information for a video frame."""
frame_number: int
timestamp: float
detected_faces: List[FaceDetection]
active_speakers: List[int] # indices of speaking faces
primary_focus: Optional[Tuple[int, int]] # (x, y) center point
layout_mode: str # "single", "dual_split", "grid"
selected_people: List[int] = field(default_factory=list) # indices of people selected for display
group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces
class MediaPipeDetector:
"""Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback."""
def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3):
self.min_detection_confidence = min_detection_confidence
self.min_tracking_confidence = min_tracking_confidence
self.mp_face_detection = mp.solutions.face_detection
self.mp_face_mesh = mp.solutions.face_mesh
# MediaPipe detectors with lower confidence for better cartoon detection
self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=min_detection_confidence,
model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons)
)
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=5,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
static_image_mode=False
)
# OpenCV Haar Cascade as fallback for cartoon/anime faces
self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Alternative cascade for profile/side faces
self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)")
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade).
Args:
frame: RGB image array
Returns:
List of detected faces
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Try MediaPipe first
results = self.face_detection.process(frame_rgb)
faces = []
if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
x = int(bbox.xmin * width)
y = int(bbox.ymin * height)
w = int(bbox.width * width)
h = int(bbox.height * height)
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
confidence = detection.score[0] if detection.score else 0.0
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=confidence,
center_x=center_x,
center_y=center_y
))
# Fallback to OpenCV Haar Cascade if MediaPipe found nothing
if not faces:
faces = self._detect_faces_haar_cascade(frame, width, height)
return faces
def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]:
"""
Detect faces using OpenCV Haar Cascade (works better with cartoons/anime).
Args:
frame: Image frame (BGR format)
width: Frame width
height: Frame height
Returns:
List of detected faces
"""
# Convert to grayscale for Haar Cascade
if len(frame.shape) == 3:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
gray = frame
# Detect frontal faces with more sensitive parameters
frontal_faces = self.haar_cascade.detectMultiScale(
gray,
scaleFactor=1.05, # More sensitive to size variations
minNeighbors=3, # Lower threshold for detection (more permissive)
minSize=(30, 30), # Smaller minimum size
flags=cv2.CASCADE_SCALE_IMAGE
)
# Also try profile faces
profile_faces = self.haar_cascade_profile.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=3,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Combine frontal and profile detections
all_faces = []
for (x, y, w, h) in frontal_faces:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value
center_x=center_x,
center_y=center_y
))
for (x, y, w, h) in profile_faces:
# Check if this face overlaps significantly with any frontal face
overlap = False
for existing_face in all_faces:
# Calculate IoU (Intersection over Union)
x1_overlap = max(x, existing_face.x)
y1_overlap = max(y, existing_face.y)
x2_overlap = min(x + w, existing_face.x + existing_face.width)
y2_overlap = min(y + h, existing_face.y + existing_face.height)
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
face_area = w * h
if overlap_area / face_area > 0.3: # 30% overlap threshold
overlap = True
break
if not overlap:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.6, # Slightly lower confidence for profile
center_x=center_x,
center_y=center_y
))
if all_faces:
logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)")
return all_faces
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces with landmarks for lip sync detection.
Args:
frame: RGB image array
Returns:
List of detected faces with landmark information
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(frame_rgb)
faces = []
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
xs = [lm.x for lm in face_landmarks.landmark]
ys = [lm.y for lm in face_landmarks.landmark]
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
x = int(x_min * width)
y = int(y_min * height)
w = int((x_max - x_min) * width)
h = int((y_max - y_min) * height)
center_x = x + w // 2
center_y = y + h // 2
lip_landmarks = []
for idx in [13, 14, 78, 308]:
lm = face_landmarks.landmark[idx]
lip_landmarks.append((int(lm.x * width), int(lm.y * height)))
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=1.0,
center_x=center_x,
center_y=center_y,
landmarks=lip_landmarks
))
return faces
def close(self):
"""Release MediaPipe resources."""
self.face_detection.close()
self.face_mesh.close()
class AudioActivityDetector:
"""Detects speech activity in audio."""
def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30):
self.sample_rate = sample_rate
self.frame_duration_ms = frame_duration_ms
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)")
def detect_speaking_periods(
self,
audio_samples: np.ndarray,
threshold: float = 0.01, # Reduced from 0.02 for better speech detection
min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances
) -> List[Tuple[float, float]]:
"""
Detect periods of speech in audio.
Args:
audio_samples: Audio samples array
threshold: Energy threshold for speech detection
min_speech_duration: Minimum duration of speech in seconds
Returns:
List of (start_time, end_time) tuples in seconds
"""
if audio_samples.ndim > 1:
audio_samples = audio_samples.mean(axis=1)
energies = []
for i in range(0, len(audio_samples), self.frame_size):
frame = audio_samples[i:i + self.frame_size]
if len(frame) > 0:
energy = np.sqrt(np.mean(frame ** 2))
energies.append(energy)
speaking_frames = [e > threshold for e in energies]
periods = []
start_frame = None
for i, is_speaking in enumerate(speaking_frames):
if is_speaking and start_frame is None:
start_frame = i
elif not is_speaking and start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = i * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
start_frame = None
if start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = len(speaking_frames) * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
# Log detected speech periods for debugging
if periods:
total_speech_time = sum(end - start for start, end in periods)
logger.info(f"Audio speech detection: {len(periods)} periods found, "
f"total {total_speech_time:.1f}s of speech (threshold={threshold})")
else:
max_energy = max(energies) if energies else 0
logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} "
f"(try lowering threshold if speech should be present)")
return periods
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
"""Check if there is speech activity at a given time."""
for start, end in speaking_periods:
if start <= time <= end:
return True
return False
class ContextAnalyzer:
"""Analyzes video context to determine focus and layout."""
def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3):
self.detector = MediaPipeDetector()
self.audio_detector = AudioActivityDetector()
self.previous_faces: List[FaceDetection] = []
self.min_face_confidence = min_face_confidence
# Person tracking state
self.current_selected_people: List[int] = [] # Indices of people currently on screen
self.last_switch_frame: int = -999 # Frame when we last switched people
self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching
# Stability tracking to prevent flip-flopping
self.desired_people_history: List[List[int]] = [] # Track recent desired selections
self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability)
self.last_switched_people: List[int] = [] # People we just switched FROM
self.focus_history: List[Tuple[int, int]] = []
self.focus_history_size: int = 20
self.focus_dead_zone: int = 60
# Debug logging
self.frame_log_interval = 30 # Log every N frames
logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})")
def analyze_frame(
self,
frame: np.ndarray,
timestamp: float,
frame_number: int,
speaking_periods: Optional[List[Tuple[float, float]]] = None
) -> FrameContext:
"""
Analyze a single frame to extract context information.
Args:
frame: Video frame (BGR format from OpenCV)
timestamp: Frame timestamp in seconds
frame_number: Frame index
speaking_periods: List of (start, end) times where speech is detected
Returns:
FrameContext with detection results
"""
faces = self.detector.detect_face_landmarks(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
if not faces:
faces = self.detector.detect_faces(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
# Determine who is speaking
active_speakers = []
has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp)
for i, face in enumerate(faces):
is_speaking = False
# Prefer visual cues when multiple faces are present.
if face.landmarks and len(self.previous_faces) > i:
is_speaking = self._detect_lip_movement(face, self.previous_faces[i])
# Audio can confirm speech when there's only one face.
if has_audio_speech and len(faces) == 1:
is_speaking = True
if is_speaking:
active_speakers.append(i)
# Debug: Log speech detection
if frame_number % 30 == 0: # Every second at 30fps
logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, "
f"speakers={active_speakers}, total_faces={len(faces)}")
if active_speakers:
selected_people = active_speakers[:4]
if len(selected_people) == 1:
layout_mode = "single"
elif len(selected_people) == 2:
layout_mode = "dual_split"
else:
layout_mode = "grid"
else:
# Select THE person to focus on (always single person)
# Priority: 1) Who is speaking, 2) Who is most centered
selected_people = self._select_person_to_focus(
faces,
active_speakers,
frame_number,
frame.shape[1], # frame width for center calculation
frame.shape[0] # frame height for center calculation
)
layout_mode = "single"
# Calculate group bounding box for ALL detected faces (multi-person support)
group_bounds = self._calculate_group_bounding_box(faces)
# For multi-person mode, use group center as primary focus
if group_bounds and group_bounds.face_count > 1:
primary_focus = (group_bounds.center_x, group_bounds.center_y)
else:
primary_focus = self._calculate_focus_point(faces, selected_people)
# Debug logging every N frames
if frame_number % self.frame_log_interval == 0:
focus_reason = "speaker" if active_speakers else "no_speech_detected"
group_info = f", group={group_bounds.face_count} faces" if group_bounds else ""
logger.info(f"Frame {frame_number}: {len(faces)} faces, "
f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}")
self.previous_faces = faces
return FrameContext(
frame_number=frame_number,
timestamp=timestamp,
detected_faces=faces,
active_speakers=active_speakers,
primary_focus=primary_focus,
layout_mode=layout_mode,
selected_people=selected_people,
group_bounds=group_bounds
)
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
"""
Detect lip movement by comparing landmarks between frames.
Args:
current_face: Current frame face detection
previous_face: Previous frame face detection
Returns:
True if significant lip movement detected
"""
if not current_face.landmarks or not previous_face.landmarks:
return False
def lip_distance(landmarks):
if len(landmarks) < 4:
return 0
upper = np.array(landmarks[0:2])
lower = np.array(landmarks[2:4])
return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0))
current_dist = lip_distance(current_face.landmarks)
previous_dist = lip_distance(previous_face.landmarks)
threshold = 2.0
return abs(current_dist - previous_dist) > threshold
def _select_person_to_focus(
self,
faces: List[FaceDetection],
active_speakers: List[int],
frame_number: int,
frame_width: int,
frame_height: int
) -> List[int]:
"""
Select THE single person to focus on.
Priority: 1) Who is speaking, 2) Who is most centered in frame
Args:
faces: List of detected faces
active_speakers: Indices of people currently speaking
frame_number: Current frame number
frame_width: Frame width for center calculation
frame_height: Frame height for center calculation
Returns:
List with single person index [idx], or empty list if no faces
"""
if not faces:
self.current_selected_people = []
return []
if len(faces) == 1:
self.current_selected_people = [0]
return [0]
frames_since_last_switch = frame_number - self.last_switch_frame
can_switch = frames_since_last_switch >= self.person_switch_cooldown
desired_person_idx = None
if active_speakers:
if self.current_selected_people and self.current_selected_people[0] in active_speakers:
desired_person_idx = self.current_selected_people[0]
else:
if can_switch or not self.current_selected_people:
desired_person_idx = active_speakers[0]
if self.current_selected_people and desired_person_idx != self.current_selected_people[0]:
logger.info(f"Switching focus to speaker: {desired_person_idx}")
self.last_switch_frame = frame_number
else:
desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0]
else:
if self.current_selected_people and len(self.current_selected_people) > 0:
current_idx = self.current_selected_people[0]
if current_idx < len(faces):
desired_person_idx = current_idx
else:
if self.previous_faces and current_idx < len(self.previous_faces):
prev_face = self.previous_faces[current_idx]
best_match_idx = None
best_match_score = float('inf')
for idx, face in enumerate(faces):
dx = face.center_x - prev_face.center_x
dy = face.center_y - prev_face.center_y
dist = np.sqrt(dx**2 + dy**2)
size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height)
score = dist + size_diff * 0.5
if score < best_match_score:
best_match_score = score
best_match_idx = idx
if best_match_idx is not None and best_match_score < 1000:
desired_person_idx = best_match_idx
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
desired_people = [desired_person_idx] if desired_person_idx is not None else []
if not self.current_selected_people:
self.current_selected_people = desired_people
self.last_switch_frame = frame_number
logger.info(f"Frame {frame_number}: Locked on person {desired_people}")
else:
self.current_selected_people = desired_people
return self.current_selected_people.copy()
def _ensure_distinct_people(
self,
faces: List[FaceDetection],
people_indices: List[int]
) -> List[int]:
"""
Ensure selected people are distinct by checking minimum distance between them.
Prevents showing the same person twice due to duplicate detection.
Args:
faces: List of detected faces
people_indices: Indices of people to validate
Returns:
List of distinct people indices (max 2)
"""
if len(people_indices) <= 1:
return people_indices
distinct_people = []
for idx in people_indices:
if idx >= len(faces):
continue
current_face = faces[idx]
is_distinct = True
# Check if this person is too close to any already selected person
for selected_idx in distinct_people:
selected_face = faces[selected_idx]
# Calculate distance between face centers
dx = current_face.center_x - selected_face.center_x
dy = current_face.center_y - selected_face.center_y
distance = np.sqrt(dx**2 + dy**2)
# Also check overlap via IoU (Intersection over Union)
x1_overlap = max(current_face.x, selected_face.x)
y1_overlap = max(current_face.y, selected_face.y)
x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width)
y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height)
overlap_area = 0
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
# Calculate areas
area1 = current_face.width * current_face.height
area2 = selected_face.width * selected_face.height
min_area = min(area1, area2)
# If faces are very close OR significantly overlapping, they're likely the same person
# Minimum distance: 1/4 of average face width
min_distance = (current_face.width + selected_face.width) / 8
overlap_threshold = 0.3 # 30% overlap
if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold):
is_distinct = False
logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})")
break
if is_distinct:
distinct_people.append(idx)
# Stop at 2 distinct people
if len(distinct_people) >= 2:
break
# If we couldn't find 2 distinct people, return at most 1
if len(distinct_people) < 2 and len(people_indices) >= 2:
logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections")
return distinct_people
def _calculate_focus_point(
self,
faces: List[FaceDetection],
selected_people: List[int]
) -> Optional[Tuple[int, int]]:
"""
Calculate the primary focus point based on selected people with temporal smoothing.
Args:
faces: List of detected faces
selected_people: Indices of people selected for display
Returns:
(x, y) tuple of focus center, or None if no faces
"""
if not faces or not selected_people:
return None
# Calculate raw focus point
raw_focus_x = 0
raw_focus_y = 0
if len(selected_people) == 1:
# Single person - focus on them
if selected_people[0] < len(faces):
primary = faces[selected_people[0]]
raw_focus_x = primary.center_x
raw_focus_y = primary.center_y
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
else:
# Multiple people - focus on the CENTER between them for stability
# This prevents jarring movements when switching focus between people
valid_people = [idx for idx in selected_people if idx < len(faces)]
if valid_people:
centers_x = [faces[idx].center_x for idx in valid_people]
centers_y = [faces[idx].center_y for idx in valid_people]
raw_focus_x = int(np.mean(centers_x))
raw_focus_y = int(np.mean(centers_y))
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
if self.focus_history:
last_x, last_y = self.focus_history[-1]
dx = abs(raw_focus_x - last_x)
dy = abs(raw_focus_y - last_y)
if dx < self.focus_dead_zone and dy < self.focus_dead_zone:
return self.focus_history[-1]
self.focus_history.append((raw_focus_x, raw_focus_y))
if len(self.focus_history) > self.focus_history_size:
self.focus_history.pop(0)
if len(self.focus_history) >= 5:
xs = [x for x, y in self.focus_history]
ys = [y for x, y in self.focus_history]
median_x = int(np.median(xs))
median_y = int(np.median(ys))
return (median_x, median_y)
else:
return (raw_focus_x, raw_focus_y)
def _calculate_group_bounding_box(
self,
faces: List[FaceDetection],
padding_percent: float = 0.15,
max_faces: int = 6
) -> Optional[GroupBoundingBox]:
"""
Calculate bounding box containing all detected faces with padding.
Args:
faces: List of detected faces
padding_percent: Padding around group as percentage of bbox dimensions
max_faces: Maximum faces to include (use most confident if exceeded)
Returns:
GroupBoundingBox or None if no faces
"""
if not faces:
return None
# If too many faces, use most confident ones
if len(faces) > max_faces:
faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces]
# Calculate bounding box containing all faces
min_x = min(f.x for f in faces)
max_x = max(f.x + f.width for f in faces)
min_y = min(f.y for f in faces)
max_y = max(f.y + f.height for f in faces)
# Add padding
width = max_x - min_x
height = max_y - min_y
pad_x = int(width * padding_percent)
pad_y = int(height * padding_percent)
final_x = max(0, min_x - pad_x)
final_y = max(0, min_y - pad_y)
final_width = width + 2 * pad_x
final_height = height + 2 * pad_y
return GroupBoundingBox(
x=final_x,
y=final_y,
width=final_width,
height=final_height,
center_x=final_x + final_width // 2,
center_y=final_y + final_height // 2,
face_count=len(faces)
)
def close(self):
"""Release resources."""
self.detector.close()
# Clear tracking state to free memory
self.previous_faces.clear()
self.current_selected_people.clear()
self.focus_history.clear()

54
video_render/ffmpeg.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import logging
import shlex
import subprocess
from pathlib import Path
from typing import Sequence
logger = logging.getLogger(__name__)
def _run_ffmpeg(args: Sequence[str]) -> None:
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", *args]
logger.debug("Executando ffmpeg: %s", " ".join(shlex.quote(part) for part in cmd))
completed = subprocess.run(cmd, check=False)
if completed.returncode != 0:
raise RuntimeError(f"ffmpeg falhou com exit code {completed.returncode}")
def extract_audio_to_wav(input_video: Path, output_wav: Path) -> Path:
_run_ffmpeg(
[
"-y",
"-i",
str(input_video),
"-ac",
"1",
"-ar",
"16000",
"-vn",
str(output_wav),
]
)
return output_wav
def create_video_segment(input_video: Path, start: float, end: float, output_path: Path) -> Path:
duration = max(0.01, end - start)
_run_ffmpeg(
[
"-y",
"-i",
str(input_video),
"-ss",
f"{start:.3f}",
"-t",
f"{duration:.3f}",
"-c",
"copy",
str(output_path),
]
)
return output_path

258
video_render/llm.py Normal file
View File

@@ -0,0 +1,258 @@
from __future__ import annotations
import json
import logging
import time
import os
from pathlib import Path
from typing import Dict, List
import requests
from video_render.config import BASE_DIR, Settings
from video_render.transcription import TranscriptionResult
logger = logging.getLogger(__name__)
OPENROUTER_ENDPOINT = os.environ.get("OPENROUTER_API_URL", "https://openrouter.ai/api/v1/chat/completions")
class OpenRouterCopywriter:
def __init__(self, settings: Settings) -> None:
if not settings.openrouter.api_key:
raise RuntimeError("OPENROUTER_API_KEY nao foi definido")
self.settings = settings
prompt_path = Path(settings.openrouter.prompt_path)
if not prompt_path.is_absolute():
prompt_path = BASE_DIR / prompt_path
if not prompt_path.exists():
raise FileNotFoundError(f"Prompt nao encontrado: {prompt_path}")
self.highlights_prompt_template = prompt_path.read_text(encoding="utf-8")
def generate_highlights(self, transcription: TranscriptionResult) -> List[Dict]:
"""Generate video highlights using OpenRouter GPT-OSS with retry logic."""
payload = {
"transcript": transcription.full_text,
"segments": [
{
"start": segment.start,
"end": segment.end,
"text": segment.text,
}
for segment in transcription.segments
],
}
body = {
"model": self.settings.openrouter.model,
"temperature": self.settings.openrouter.temperature,
"messages": [
{"role": "system", "content": self.highlights_prompt_template},
{
"role": "user",
"content": json.dumps(payload, ensure_ascii=False),
},
],
}
headers = {
"Authorization": f"Bearer {self.settings.openrouter.api_key}",
"Content-Type": "application/json",
"X-Title": "Video Render - Highlights Detection"
}
logger.info(f"Calling OpenRouter with model: {self.settings.openrouter.model}")
logger.debug(f"Request payload keys: transcript_length={len(payload['transcript'])}, segments_count={len(payload['segments'])}")
# Retry configuration for rate limits (especially free tier)
max_retries = 5
base_delay = 5 # Start with 5s delay
for attempt in range(max_retries):
try:
response = requests.post(
url=OPENROUTER_ENDPOINT,
data=json.dumps(body),
headers=headers,
timeout=120,
)
response.raise_for_status()
data = response.json()
break
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 429:
if attempt < max_retries - 1:
# Exponential backoff: 5s, 10s, 20s, 40s, 80s
delay = base_delay * (2 ** attempt)
logger.warning(f"Rate limit atingido (429). Aguardando {delay}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})")
time.sleep(delay)
continue
else:
logger.error("Rate limit atingido apos todas as tentativas")
logger.error("Solucao: Use um modelo pago ou adicione creditos na OpenRouter")
raise RuntimeError("OpenRouter rate limit excedido") from exc
else:
logger.error(f"OpenRouter API request falhou com status {exc.response.status_code}: {exc}")
raise RuntimeError("OpenRouter API request falhou") from exc
except Exception as exc:
logger.error("OpenRouter API request falhou: %s", exc)
raise RuntimeError("OpenRouter API request falhou") from exc
# Debug: log response structure
logger.info(f"OpenRouter response keys: {list(data.keys())}")
if "error" in data:
logger.error(f"OpenRouter API error: {data.get('error')}")
raise RuntimeError(f"OpenRouter API error: {data.get('error')}")
choices = data.get("choices") or []
if not choices:
logger.error(f"OpenRouter response completa: {json.dumps(data, indent=2)}")
raise RuntimeError("OpenRouter nao retornou escolhas")
message = choices[0].get("message", {}).get("content")
if not message:
raise RuntimeError("Resposta do OpenRouter sem conteudo")
parsed = self._extract_json(message)
highlights = parsed.get("highlights")
if not isinstance(highlights, list):
raise ValueError("Resposta do OpenRouter invalida: campo 'highlights' ausente")
valid_highlights = []
for highlight in highlights:
try:
start = float(highlight.get("start", 0))
end = float(highlight.get("end", 0))
summary = str(highlight.get("summary", "")).strip()
if start < 0 or end < 0:
logger.warning(f"Highlight ignorado: timestamps negativos (start={start}, end={end})")
continue
if end <= start:
logger.warning(f"Highlight ignorado: end <= start (start={start}, end={end})")
continue
duration = end - start
if duration < 60:
logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)")
continue
if duration > 120:
logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 90s)")
continue
if not summary:
logger.warning(f"Highlight ignorado: summary vazio")
continue
valid_highlights.append({
"start": start,
"end": end,
"summary": summary
})
except (TypeError, ValueError) as e:
logger.warning(f"Highlight invalido ignorado: {highlight} - {e}")
continue
if not valid_highlights:
logger.warning("Nenhum highlight valido retornado pelo OpenRouter")
total_duration = 75.0
if transcription.segments:
total_duration = max(seg.end for seg in transcription.segments)
fallback_end = min(75.0, total_duration)
if fallback_end < 60.0:
fallback_end = min(60.0, total_duration)
return [{
"start": 0.0,
"end": fallback_end,
"summary": "Trecho inicial do video (fallback automatico)"
}]
logger.info(f"OpenRouter retornou {len(valid_highlights)} highlights validos")
return valid_highlights
def generate_titles(self, highlights: List[Dict]) -> List[str]:
if not highlights:
return []
prompt = (
"Voce e um copywriter especializado em titulos curtos e virais para reels.\n"
"Recebera uma lista de trechos destacados de um video com resumo e tempo.\n"
"Produza um titulo envolvente (ate 60 caracteres) para cada item.\n"
"Responda apenas em JSON com a seguinte estrutura:\n"
'{"titles": ["titulo 1", "titulo 2"]}\n'
"Titulos devem ser em portugues, usar verbos fortes e refletir o resumo."
)
user_payload = {
"highlights": [
{
"start": item.get("start"),
"end": item.get("end"),
"summary": item.get("summary"),
}
for item in highlights
]
}
body = {
"model": self.settings.openrouter.model,
"temperature": self.settings.openrouter.temperature,
"messages": [
{"role": "system", "content": prompt},
{
"role": "user",
"content": json.dumps(user_payload, ensure_ascii=False),
},
],
}
headers = {
"Authorization": f"Bearer {self.settings.openrouter.api_key}",
"Content-Type": "application/json",
}
response = requests.post(
url=OPENROUTER_ENDPOINT,
data=json.dumps(body),
headers=headers,
timeout=120,
)
response.raise_for_status()
data = response.json()
choices = data.get("choices") or []
if not choices:
raise RuntimeError("OpenRouter nao retornou escolhas")
message = choices[0].get("message", {}).get("content")
if not message:
raise RuntimeError("Resposta do OpenRouter sem conteudo")
parsed = self._extract_json(message)
titles = parsed.get("titles")
if not isinstance(titles, list):
raise ValueError("Resposta do OpenRouter invalida: campo 'titles'")
return [str(title) for title in titles]
@staticmethod
def _extract_json(response_text: str) -> Dict:
try:
return json.loads(response_text)
except json.JSONDecodeError:
start = response_text.find("{")
end = response_text.rfind("}")
if start == -1 or end == -1:
raise
subset = response_text[start : end + 1]
return json.loads(subset)

View File

@@ -0,0 +1,13 @@
from __future__ import annotations
import logging
import os
def setup_logging() -> None:
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
level=log_level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)

88
video_render/media.py Normal file
View File

@@ -0,0 +1,88 @@
from __future__ import annotations
import logging
import shutil
from dataclasses import dataclass
from pathlib import Path
from video_render.config import Settings
from video_render.ffmpeg import extract_audio_to_wav
from video_render.utils import ensure_workspace, remove_paths, sanitize_filename
logger = logging.getLogger(__name__)
@dataclass
class VideoWorkspace:
original_filename: str
sanitized_name: str
workspace_dir: Path
output_dir: Path
source_path: Path
working_video_path: Path
audio_path: Path
class MediaPreparer:
def __init__(self, settings: Settings) -> None:
self.settings = settings
def prepare(self, filename: str) -> VideoWorkspace:
source_path = self.settings.videos_dir / filename
if not source_path.exists():
raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {source_path}")
sanitized_name = sanitize_filename(Path(filename).stem)
workspace_dir = ensure_workspace(self.settings.videos_dir, sanitized_name)
transcription_json = workspace_dir / "transcription.json"
transcription_txt = workspace_dir / "transcription.txt"
temp_transcription_json = None
temp_transcription_txt = None
if transcription_json.exists():
temp_transcription_json = workspace_dir.parent / f".{sanitized_name}_transcription.json.tmp"
shutil.copy2(transcription_json, temp_transcription_json)
if transcription_txt.exists():
temp_transcription_txt = workspace_dir.parent / f".{sanitized_name}_transcription.txt.tmp"
shutil.copy2(transcription_txt, temp_transcription_txt)
existing_children = list(workspace_dir.iterdir())
if existing_children:
logger.info("Limpando workspace existente para %s", sanitized_name)
try:
remove_paths(existing_children)
except Exception as e:
logger.warning(f"Não foi possível limpar workspace (não crítico): {e}")
if temp_transcription_json and temp_transcription_json.exists():
shutil.move(str(temp_transcription_json), str(transcription_json))
logger.info("Transcrição preservada em %s", transcription_json)
if temp_transcription_txt and temp_transcription_txt.exists():
shutil.move(str(temp_transcription_txt), str(transcription_txt))
destination_name = f"{sanitized_name}{source_path.suffix.lower()}"
working_video_path = workspace_dir / destination_name
shutil.copy2(source_path, working_video_path)
logger.info("Cópia do vídeo criada em %s", working_video_path)
output_dir = ensure_workspace(self.settings.outputs_dir, sanitized_name)
existing_outputs = list(output_dir.iterdir())
if existing_outputs:
try:
remove_paths(existing_outputs)
except Exception as e:
logger.warning(f"Não foi possível limpar outputs antigos (não crítico): {e}")
audio_path = workspace_dir / "audio.wav"
extract_audio_to_wav(working_video_path, audio_path)
return VideoWorkspace(
original_filename=filename,
sanitized_name=sanitized_name,
workspace_dir=workspace_dir,
output_dir=output_dir,
source_path=source_path,
working_video_path=working_video_path,
audio_path=audio_path,
)

136
video_render/messaging.py Normal file
View File

@@ -0,0 +1,136 @@
from __future__ import annotations
import json
import logging
from typing import Any, Callable, Dict
import pika
from video_render.config import Settings
logger = logging.getLogger(__name__)
MessageHandler = Callable[[Dict[str, Any]], Dict[str, Any]]
def _safe_ack(
channel: pika.adapters.blocking_connection.BlockingChannel, delivery_tag
) -> bool:
if not channel.is_open:
logger.warning(
"Canal fechado antes do ACK; mensagem sera reprocessada apos reconexao"
)
return False
try:
channel.basic_ack(delivery_tag=delivery_tag)
return True
except Exception:
logger.exception("Falha ao confirmar mensagem")
return False
class RabbitMQWorker:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._params = pika.ConnectionParameters(
host=settings.rabbitmq.host,
port=settings.rabbitmq.port,
credentials=pika.PlainCredentials(
settings.rabbitmq.user, settings.rabbitmq.password
),
heartbeat=settings.rabbitmq.heartbeat,
blocked_connection_timeout=settings.rabbitmq.blocked_timeout,
)
def consume_forever(self, handler: MessageHandler) -> None:
while True:
try:
with pika.BlockingConnection(self._params) as connection:
channel = connection.channel()
channel.queue_declare(
queue=self.settings.rabbitmq.consume_queue, durable=True
)
channel.queue_declare(
queue=self.settings.rabbitmq.publish_queue, durable=True
)
channel.basic_qos(
prefetch_count=self.settings.rabbitmq.prefetch_count
)
def _on_message(
ch: pika.adapters.blocking_connection.BlockingChannel,
method,
properties,
body,
) -> None:
"""Consume message, ACK immediately, then process."""
try:
message = json.loads(body)
except json.JSONDecodeError:
logger.error("Mensagem invalida recebida: %s", body)
_safe_ack(ch, method.delivery_tag)
return
if not _safe_ack(ch, method.delivery_tag):
logger.warning(
"Nao foi possivel confirmar mensagem; abortando processamento"
)
return
logger.info(
"Mensagem recebida: %s",
message.get("filename", "<sem_nome>"),
)
try:
response = handler(message)
except Exception:
logger.exception("Erro nao tratado durante o processamento")
response = {
"hasError": True,
"error": "Erro nao tratado no pipeline",
"filename": message.get("filename"),
"videoId": message.get("videoId"),
"url": message.get("url"),
"processedFiles": [],
}
self._publish_response(response)
channel.basic_consume(
queue=self.settings.rabbitmq.consume_queue,
on_message_callback=_on_message,
auto_ack=False,
)
logger.info("Consumidor iniciado. Aguardando mensagens...")
channel.start_consuming()
except pika.exceptions.AMQPConnectionError:
logger.exception(
"Conexao com RabbitMQ perdida. Tentando reconectar..."
)
except pika.exceptions.AMQPError:
logger.exception("Erro AMQP inesperado. Reiniciando consumo...")
except KeyboardInterrupt:
logger.info("Encerrando consumidor por interrupcao do usuario.")
break
def _publish_response(self, response: Dict[str, Any]) -> None:
payload = json.dumps(response)
try:
with pika.BlockingConnection(self._params) as publish_connection:
publish_channel = publish_connection.channel()
publish_channel.queue_declare(
queue=self.settings.rabbitmq.publish_queue, durable=True
)
publish_channel.basic_publish(
exchange="",
routing_key=self.settings.rabbitmq.publish_queue,
body=payload,
properties=pika.BasicProperties(delivery_mode=2),
)
logger.info(
"Resposta publicada para '%s'",
self.settings.rabbitmq.publish_queue,
)
except Exception:
logger.exception("Falha ao publicar a resposta na fila de upload apos ACK")

260
video_render/pipeline.py Normal file
View File

@@ -0,0 +1,260 @@
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from video_render.config import Settings
from video_render.llm import OpenRouterCopywriter
from video_render.media import MediaPreparer, VideoWorkspace
from video_render.transcription import TranscriptionResult, TranscriptionService
from video_render.utils import remove_paths, sanitize_filename
from video_render.rendering import VideoRenderer
logger = logging.getLogger(__name__)
@dataclass
class JobMessage:
filename: str
url: Optional[str]
video_id: Optional[str]
extras: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HighlightWindow:
start: float
end: float
summary: str
title: Optional[str] = None
@dataclass
class RenderedClip:
path: Path
start: float
end: float
title: str
summary: str
index: int
@dataclass
class PipelineContext:
job: JobMessage
workspace: Optional[VideoWorkspace] = None
transcription: Optional[TranscriptionResult] = None
highlight_windows: List[HighlightWindow] = field(default_factory=list)
rendered_clips: List[RenderedClip] = field(default_factory=list)
class VideoPipeline:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.media_preparer = MediaPreparer(settings)
self.transcriber = TranscriptionService(settings)
self.llm_service = OpenRouterCopywriter(settings) # Using OpenRouter for both highlights and titles
self.renderer = VideoRenderer(settings)
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
context = PipelineContext(job=self._parse_job(message))
try:
self._prepare_workspace(context)
self._generate_transcription(context)
self._determine_highlights(context)
self._render_clips(context)
return self._build_success_payload(context)
except Exception as exc:
logger.exception("Falha ao processar vídeo %s", context.job.filename)
return self._handle_failure(context, exc)
def _parse_job(self, message: Dict[str, Any]) -> JobMessage:
filename = message.get("filename")
if not filename:
raise ValueError("Mensagem inválida: 'filename' é obrigatório")
url = message.get("url")
video_id = message.get("videoId") or message.get("video_id")
extras = {
key: value
for key, value in message.items()
if key not in {"filename", "url", "videoId", "video_id"}
}
return JobMessage(filename=filename, url=url, video_id=video_id, extras=extras)
def _prepare_workspace(self, context: PipelineContext) -> None:
context.workspace = self.media_preparer.prepare(context.job.filename)
def _generate_transcription(self, context: PipelineContext) -> None:
if not context.workspace:
raise RuntimeError("Workspace não preparado")
existing = TranscriptionService.load(context.workspace.workspace_dir)
if existing:
logger.info(
"Transcricao existente encontrada em %s; reutilizando resultado",
context.workspace.workspace_dir,
)
context.transcription = existing
return
transcription = self.transcriber.transcribe(
context.workspace.audio_path,
output_dir=context.workspace.workspace_dir
)
TranscriptionService.persist(transcription, context.workspace.workspace_dir)
context.transcription = transcription
# Unload Whisper model immediately after transcription to free memory (1-3GB)
self.transcriber.unload_model()
def _determine_highlights(self, context: PipelineContext) -> None:
if not context.transcription:
raise RuntimeError("Transcricao nao disponivel")
try:
highlights_raw = self.llm_service.generate_highlights(context.transcription)
except Exception:
logger.exception(
"Falha ao gerar destaques com OpenRouter; aplicando fallback padrao."
)
context.highlight_windows = [self._build_fallback_highlight(context)]
return
windows: List[HighlightWindow] = []
for item in highlights_raw:
try:
start = float(item.get("start", 0)) # type: ignore[arg-type]
end = float(item.get("end", start)) # type: ignore[arg-type]
except (TypeError, ValueError):
logger.warning("Highlight invalido ignorado: %s", item)
continue
summary = str(item.get("summary", "")).strip()
title = str(item.get("title", summary[:60])).strip()
if end <= start:
logger.debug("Highlight com intervalo invalido ignorado: %s", item)
continue
windows.append(HighlightWindow(start=start, end=end, summary=summary, title=title))
if not windows:
windows.append(self._build_fallback_highlight(context))
context.highlight_windows = windows
def _generate_titles(self, context: PipelineContext) -> None:
"""DEPRECATED: Titles are now generated together with highlights.
This method is kept for backwards compatibility but does nothing.
Titles are extracted from highlights in _determine_highlights().
"""
pass
def _build_fallback_highlight(self, context: PipelineContext) -> HighlightWindow:
if not context.transcription:
raise RuntimeError("Transcricao nao disponivel para criar fallback")
last_end = (
context.transcription.segments[-1].end
if context.transcription.segments
else 0.0
)
return HighlightWindow(
start=0.0,
end=max(last_end, 10.0),
summary="Sem destaque identificado; fallback automatico.",
title="Confira este momento",
)
def _render_clips(self, context: PipelineContext) -> None:
if not context.workspace or not context.highlight_windows or not context.transcription:
return
titles = [
window.title or window.summary for window in context.highlight_windows
]
render_results = self.renderer.render(
workspace_path=str(context.workspace.working_video_path),
highlight_windows=context.highlight_windows,
transcription=context.transcription,
titles=titles,
output_dir=context.workspace.output_dir,
)
context.rendered_clips = [
RenderedClip(
path=Path(path),
start=start,
end=end,
title=title,
summary=summary,
index=index,
)
for path, start, end, title, summary, index in render_results
]
def _build_success_payload(self, context: PipelineContext) -> Dict[str, Any]:
return {
"hasError": False,
"videosProcessedQuantity": len(context.rendered_clips),
"filename": context.job.filename,
"videoId": context.job.video_id,
"url": context.job.url,
"workspaceFolder": context.workspace.sanitized_name if context.workspace else None,
"outputDirectory": self._relative_path(context.workspace.output_dir) if context.workspace else None,
"processedFiles": [
{
"path": self._relative_path(clip.path),
"start": clip.start,
"end": clip.end,
"title": clip.title,
"summary": clip.summary,
"clipIndex": clip.index,
}
for clip in context.rendered_clips
],
}
def _handle_failure(self, context: PipelineContext, exc: Exception) -> Dict[str, Any]:
logger.error("Erro na pipeline: %s", exc)
cleanup_targets: List[Path] = []
if context.workspace:
cleanup_targets.append(context.workspace.workspace_dir)
cleanup_targets.append(context.workspace.output_dir)
original_path = context.workspace.source_path
if original_path.exists():
cleanup_targets.append(original_path)
else:
sanitized = sanitize_filename(Path(context.job.filename).stem)
job_output_dir = self.settings.outputs_dir / sanitized
if job_output_dir.exists():
cleanup_targets.append(job_output_dir)
original_path = self.settings.videos_dir / context.job.filename
if original_path.exists():
cleanup_targets.append(original_path)
remove_paths(cleanup_targets)
return {
"hasError": True,
"error": str(exc),
"filename": context.job.filename,
"videoId": context.job.video_id,
"url": context.job.url,
"processedFiles": [],
}
def _relative_path(self, path: Path) -> str:
base = self.settings.videos_dir.parent
try:
return str(path.relative_to(base))
except ValueError:
return str(path)

826
video_render/rendering.py Normal file
View File

@@ -0,0 +1,826 @@
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Dict, Iterable, List, Sequence, Tuple, Optional
import numpy as np
from moviepy.audio.AudioClip import AudioArrayClip, AudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from PIL import Image, ImageColor, ImageDraw, ImageFont
from video_render.config import Settings
from video_render.transcription import TranscriptionResult, WordTiming
from video_render.smart_framing import SmartFramer, extract_audio_samples
logger = logging.getLogger(__name__)
def clamp_time(value: float, minimum: float = 0.0) -> float:
return max(minimum, float(value))
@dataclass
class CaptionClipSet:
base: ImageClip
highlights: List[ImageClip]
class CaptionBuilder:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.font_path = settings.rendering.font_path
if not self.font_path.exists():
raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}")
self.font = ImageFont.truetype(
str(self.font_path), settings.rendering.subtitle_font_size
)
self.base_color = ImageColor.getrgb(settings.rendering.base_color)
self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color)
self.canvas_width = settings.rendering.frame_width - 160
self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2)
self.min_words = settings.rendering.caption_min_words
self.max_words = settings.rendering.caption_max_words
bbox = self.font.getbbox("Ay")
self.text_height = bbox[3] - bbox[1]
self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1]
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
# Filter out empty, whitespace-only, or very short words (likely noise)
valid_words = [
w for w in words
if w.word
and w.word.strip()
and len(w.word.strip()) >= 2 # At least 2 characters
and not w.word.strip() in ['...', '..', '.', ',', '-', 'hmm', 'hm', 'ah', 'eh', 'uh'] # Not just punctuation or filler
]
# Note: We don't filter out words based on gaps here
# Gap detection is handled in _group_words_with_gaps
# This ensures captions disappear during silence naturally
filtered_words = valid_words
# Calculate speech density (words per second)
# If density is too low, it's likely just noise/silence being misinterpreted
if filtered_words:
first_word_time = filtered_words[0].start
last_word_time = filtered_words[-1].end
duration = last_word_time - first_word_time
if duration > 0:
words_per_second = len(filtered_words) / duration
# Typical speech is 2-3 words per second
# If less than 0.5 words/second, it's probably silence/noise
if words_per_second < 0.5:
logger.debug(f"Captions suprimidas: densidade muito baixa ({words_per_second:.2f} palavras/seg)")
return []
# Only show captions if we have at least 3 valid words (reduced from 5 for 2-word groups)
# This prevents showing captions for noise/mumbling
if len(filtered_words) < 3:
return []
grouped = self._group_words_with_gaps(filtered_words)
clip_sets: List[CaptionClipSet] = []
for group in grouped:
group_start = clamp_time(group[0].start, minimum=clip_start)
group_end = clamp_time(group[-1].end, minimum=group_start + 0.05)
duration = max(0.05, group_end - group_start)
start_offset = group_start - clip_start
base_image, highlight_images = self._render_group(group)
base_clip = (
ImageClip(np.array(base_image))
.with_start(start_offset)
.with_duration(duration)
)
highlight_clips: List[ImageClip] = []
for word, image in zip(group, highlight_images):
h_start = clamp_time(word.start, minimum=clip_start) - clip_start
h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start
h_duration = max(0.05, h_end - h_start)
highlight_clip = (
ImageClip(np.array(image))
.with_start(h_start)
.with_duration(h_duration)
)
highlight_clips.append(highlight_clip)
clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips))
return clip_sets
def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]:
texts = [self._clean_word(word.word) for word in group]
widths = []
for text in texts:
bbox = self.font.getbbox(text)
widths.append(bbox[2] - bbox[0])
total_width = sum(widths)
if len(widths) > 1:
total_width += self.space_width * (len(widths) - 1)
# Check if text needs to wrap to multiple lines
# If total width exceeds canvas width, break into 2 lines
needs_wrap = total_width > self.canvas_width
if needs_wrap:
# Split into 2 lines - try to balance the lines
mid_point = len(texts) // 2
line1_texts = texts[:mid_point]
line2_texts = texts[mid_point:]
line1_widths = widths[:mid_point]
line2_widths = widths[mid_point:]
# Calculate widths for each line
line1_width = sum(line1_widths)
if len(line1_widths) > 1:
line1_width += self.space_width * (len(line1_widths) - 1)
line2_width = sum(line2_widths)
if len(line2_widths) > 1:
line2_width += self.space_width * (len(line2_widths) - 1)
# Double the canvas height for 2 lines
canvas_height = self.canvas_height * 2
base_image = Image.new("RGBA", (self.canvas_width, canvas_height), (0, 0, 0, 0))
base_draw = ImageDraw.Draw(base_image)
highlight_images: List[Image.Image] = []
# Stroke settings: 8px black stroke for better readability
stroke_width = 8
stroke_color = (0, 0, 0, 255) # Black
# Draw line 1
x = max(0, (self.canvas_width - line1_width) // 2)
y = self.baseline
for i, (text, width) in enumerate(zip(line1_texts, line1_widths)):
base_draw.text(
(x, y),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, y),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
# Draw line 2
x = max(0, (self.canvas_width - line2_width) // 2)
y = self.baseline + self.text_height + 5 # 5px spacing between lines
for i, (text, width) in enumerate(zip(line2_texts, line2_widths)):
base_draw.text(
(x, y),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, y),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
return base_image, highlight_images
# Single line rendering (original code)
start_x = max(0, (self.canvas_width - total_width) // 2)
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
base_draw = ImageDraw.Draw(base_image)
highlight_images: List[Image.Image] = []
x = start_x
# Stroke settings: 8px black stroke for better readability
stroke_width = 8
stroke_color = (0, 0, 0, 255) # Black
for text, width in zip(texts, widths):
# Draw base text with stroke
base_draw.text(
(x, self.baseline),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
# Draw highlight text with stroke
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, self.baseline),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
return base_image, highlight_images
def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
if not words:
return []
grouped: List[List[WordTiming]] = []
buffer: List[WordTiming] = []
for word in words:
buffer.append(word)
if len(buffer) == self.max_words:
grouped.append(buffer)
buffer = []
if buffer:
if len(buffer) == 1 and grouped:
grouped[-1].extend(buffer)
else:
grouped.append(buffer)
for idx, group in enumerate(grouped[:-1]):
if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words:
deficit = self.min_words - len(group)
transfer = grouped[idx + 1][:deficit]
grouped[idx] = group + transfer
grouped[idx + 1] = grouped[idx + 1][deficit:]
grouped = [grp for grp in grouped if grp]
return grouped
def _group_words_with_gaps(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
"""
Group words into 2-word chunks, respecting silence gaps.
Creates natural breaks where there are pauses > 1.5s
"""
if not words:
return []
grouped: List[List[WordTiming]] = []
buffer: List[WordTiming] = []
for i, word in enumerate(words):
# Check if there's a long pause before this word
if i > 0:
gap = word.start - words[i-1].end
# If gap > 1.5s, finish current buffer and start new group
if gap > 1.5:
if buffer:
grouped.append(buffer)
buffer = []
buffer.append(word)
# Group into 2 words maximum
if len(buffer) == 2:
grouped.append(buffer)
buffer = []
# Handle remaining words
if buffer:
if len(buffer) == 1 and grouped:
# Add single remaining word to last group
grouped[-1].append(buffer[0])
else:
grouped.append(buffer)
return [grp for grp in grouped if grp]
@staticmethod
def _clean_word(text: str) -> str:
text = text.strip()
text = re.sub(r"\s+", " ", text)
return text or "..."
class VideoRenderer:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.captions = CaptionBuilder(settings)
self.smart_framer = SmartFramer(
target_width=settings.rendering.frame_width,
target_height=settings.rendering.frame_height,
frame_skip=settings.rendering.smart_framing_frame_skip,
smoothing_window=settings.rendering.smart_framing_smoothing_window,
max_velocity=settings.rendering.smart_framing_max_velocity,
person_switch_cooldown=settings.rendering.smart_framing_person_switch_cooldown,
response_time=settings.rendering.smart_framing_response_time,
group_padding=settings.rendering.smart_framing_group_padding,
max_zoom_out=settings.rendering.smart_framing_max_zoom_out,
dead_zone=settings.rendering.smart_framing_dead_zone,
min_face_confidence=settings.rendering.smart_framing_min_confidence
)
def render(
self,
workspace_path: str,
highlight_windows: Sequence,
transcription: TranscriptionResult,
titles: Sequence[str],
output_dir,
) -> List[Tuple[str, float, float, str, str, int]]:
results: List[Tuple[str, float, float, str, str, int]] = []
with VideoFileClip(workspace_path) as base_clip:
video_duration = base_clip.duration or 0
for index, window in enumerate(highlight_windows, start=1):
start = clamp_time(window.start)
end = clamp_time(window.end)
start = min(start, video_duration)
end = min(end, video_duration)
if end <= start:
logger.info("Janela ignorada por intervalo invalido: %s", window)
continue
subclip = base_clip.subclipped(start, end)
try:
rendered_path = self._render_single_clip(
subclip=subclip,
start=start,
end=end,
title=titles[index - 1] if index - 1 < len(titles) else window.summary,
summary=window.summary,
index=index,
transcription=transcription,
output_dir=output_dir,
source_path=workspace_path,
)
finally:
subclip.close()
results.append(
(
rendered_path,
float(start),
float(end),
titles[index - 1] if index - 1 < len(titles) else window.summary,
window.summary,
index,
)
)
return results
def _render_single_clip(
self,
subclip: VideoFileClip,
start: float,
end: float,
title: str,
summary: str,
index: int,
transcription: TranscriptionResult,
output_dir,
source_path: str,
) -> str:
duration = end - start
frame_w = self.settings.rendering.frame_width
frame_h = self.settings.rendering.frame_height
# Removed top panel - no longer showing title
bottom_h = int(frame_h * 0.20)
# Use smart framing to create intelligent 9:16 video (if enabled)
if self.settings.rendering.enable_smart_framing:
logger.info(f"Creating smart framing plan for clip {index} ({start:.2f}s - {end:.2f}s)")
try:
# Extract audio for speech detection
audio_samples = extract_audio_samples(source_path, start, end)
# Create framing plan
framing_plan = self.smart_framer.create_framing_plan(
video_path=source_path,
start_time=start,
end_time=end,
audio_samples=audio_samples
)
# Apply smart framing (always single-person focus)
video_clip = self.smart_framer.apply_framing(
video_clip=subclip,
framing_plan=framing_plan
)
logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, "
f"faces_detected={len(framing_plan.frame_contexts[0].detected_faces) if framing_plan.frame_contexts else 0}")
except Exception as exc:
logger.warning(f"Smart framing failed for clip {index}, falling back to center crop: {exc}", exc_info=True)
# Fallback to center crop (maintains aspect ratio, crops to fit)
video_area_h = max(1, frame_h - bottom_h)
# Use MAX to ensure video covers entire area (will crop excess)
scale_factor = max(
frame_w / subclip.w,
video_area_h / subclip.h,
)
# Resize to cover area
resized_clip = subclip.resized(scale_factor)
# Calculate crop region (center crop)
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
crop_x2 = crop_x1 + frame_w
crop_y2 = crop_y1 + video_area_h
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
cropped_clip = resized_clip.cropped(
x1=crop_x1,
y1=crop_y1,
x2=crop_x2,
y2=crop_y2
)
video_clip = cropped_clip.with_position((0, 0))
resized_clip.close()
else:
# Use center crop (smart framing disabled)
logger.info(f"Using center crop for clip {index} (smart framing disabled)")
video_area_h = max(1, frame_h - bottom_h)
# Use MAX to ensure video covers entire area (will crop excess)
scale_factor = max(
frame_w / subclip.w,
video_area_h / subclip.h,
)
# Resize to cover area
resized_clip = subclip.resized(scale_factor)
# Calculate crop region (center crop)
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
crop_x2 = crop_x1 + frame_w
crop_y2 = crop_y1 + video_area_h
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
cropped_clip = resized_clip.cropped(
x1=crop_x1,
y1=crop_y1,
x2=crop_x2,
y2=crop_y2
)
video_clip = cropped_clip.with_position((0, 0))
resized_clip.close()
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
# Removed top panel and title - no longer needed
bottom_panel = (
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
.with_position((0, frame_h - bottom_h))
.with_duration(duration)
.with_opacity(0.85)
)
words = self._collect_words(transcription, start, end)
# Calculate speech coverage: how much of the clip has actual speech?
# If less than 30% of the clip has speech, don't show captions
clip_duration = end - start
if words and clip_duration > 0:
# Calculate total time with speech
total_speech_time = sum(w.end - w.start for w in words)
speech_coverage = total_speech_time / clip_duration
if speech_coverage < 0.3: # Less than 30% speech
logger.debug(f"Captions suprimidas: cobertura de fala baixa ({speech_coverage:.1%})")
words = [] # Clear words to prevent captions
# Only build captions if there are actual words to display
# This prevents empty/placeholder captions from appearing
caption_sets = self.captions.build(words, clip_start=start) if words else []
caption_clips = []
caption_resources: List[ImageClip] = []
# Position captions 120px below center (for 1920px height, center is 960px, so 1080px)
# This ensures they're visible, well-positioned, and don't interfere with faces
# Range: 100-150px as requested, using 120px for optimal positioning
center_y = frame_h // 2
caption_y = center_y + 120
caption_margin = 20
# Ensure captions stay within reasonable bounds (no top panel now)
min_caption_y = caption_margin
max_caption_y = frame_h - bottom_h - self.captions.canvas_height - caption_margin
if max_caption_y < min_caption_y:
caption_y = min_caption_y
else:
caption_y = min(max(caption_y, min_caption_y), max_caption_y)
for clip_set in caption_sets:
base_positioned = clip_set.base.with_position(("center", caption_y))
caption_clips.append(base_positioned)
caption_resources.append(clip_set.base)
for highlight in clip_set.highlights:
positioned = highlight.with_position(("center", caption_y))
caption_clips.append(positioned)
caption_resources.append(highlight)
# No fallback captions - if there are no dynamic captions, show nothing
# This matches Opus Clip behavior where captions only appear when there's actual speech
audio_clip, audio_needs_close = self._materialize_audio(
source_path=source_path,
start=start,
end=end,
duration=duration,
fallback_audio=video_clip.audio or subclip.audio,
)
# Composite with background, bottom panel, video, and captions only (no top panel or title)
composite = CompositeVideoClip(
[background, bottom_panel, video_clip, *caption_clips],
size=(frame_w, frame_h),
)
if audio_clip is not None:
composite = self._with_audio(composite, audio_clip)
output_path = output_dir / f"clip_{index:02d}.mp4"
self._write_with_fallback(
composite=composite,
output_path=output_path,
index=index,
output_dir=output_dir,
)
composite.close()
video_clip.close()
background.close()
bottom_panel.close()
for clip in caption_clips:
clip.close()
for clip in caption_resources:
clip.close()
if audio_clip is not None and audio_needs_close:
audio_clip.close()
# Force garbage collection to free memory after rendering
import gc
gc.collect()
return str(output_path)
def _materialize_audio(
self,
*,
source_path: str,
start: float,
end: float,
duration: float,
fallback_audio,
) -> Tuple[Optional[AudioClip], bool]:
try:
with AudioFileClip(source_path) as audio_file:
segment = audio_file.subclipped(start, end)
fps = (
getattr(segment, "fps", None)
or getattr(audio_file, "fps", None)
or 44100
)
samples = segment.to_soundarray(fps=fps)
except Exception:
logger.warning(
"Falha ao carregar audio independente; utilizando fluxo original",
exc_info=True,
)
return fallback_audio, False
audio_clip = AudioArrayClip(samples, fps=fps).with_duration(duration)
return audio_clip, True
def _collect_words(
self, transcription: TranscriptionResult, start: float, end: float
) -> List[WordTiming]:
collected: List[WordTiming] = []
for segment in transcription.segments:
if segment.end < start or segment.start > end:
continue
if segment.words:
for word in segment.words:
if word.end < start or word.start > end:
continue
collected.append(
WordTiming(
start=max(start, word.start),
end=min(end, word.end),
word=word.word,
)
)
else:
collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end))
collected.sort(key=lambda w: w.start)
return collected
def _fallback_words(
self,
text: str,
segment_start: float,
segment_end: float,
window_start: float,
window_end: float,
) -> Iterable[WordTiming]:
words = [w for w in re.split(r"\s+", text.strip()) if w]
if not words:
return []
seg_start = max(segment_start, window_start)
seg_end = min(segment_end, window_end)
duration = max(0.01, seg_end - seg_start)
step = duration / len(words)
timings: List[WordTiming] = []
for idx, word in enumerate(words):
w_start = seg_start + idx * step
w_end = min(seg_end, w_start + step)
timings.append(WordTiming(start=w_start, end=w_end, word=word))
return timings
@staticmethod
def _wrap_text(text: str, max_width: int) -> str:
text = text.strip()
if not text:
return ""
words = text.split()
lines: List[str] = []
current: List[str] = []
for word in words:
current.append(word)
if len(" ".join(current)) > max_width // 18:
lines.append(" ".join(current[:-1]))
current = [current[-1]]
if current:
lines.append(" ".join(current))
return "\n".join(lines)
def _write_with_fallback(
self,
*,
composite: CompositeVideoClip,
output_path,
index: int,
output_dir,
) -> None:
attempts = self._encoding_attempts()
temp_audio_path = output_dir / f"temp_audio_{index:02d}.m4a"
last_error: Exception | None = None
for attempt in attempts:
codec = attempt["codec"]
bitrate = attempt["bitrate"]
preset = attempt["preset"]
ffmpeg_params = ["-pix_fmt", "yuv420p"]
if preset:
ffmpeg_params = ["-preset", preset, "-pix_fmt", "yuv420p"]
try:
logger.info(
"Renderizando clip %02d com codec %s (bitrate=%s, preset=%s)",
index,
codec,
bitrate,
preset or "default",
)
composite.write_videofile(
str(output_path),
codec=codec,
audio_codec=self.settings.rendering.audio_codec,
fps=self.settings.rendering.fps,
bitrate=bitrate,
ffmpeg_params=ffmpeg_params,
temp_audiofile=str(temp_audio_path),
remove_temp=True,
threads=4,
)
return
except Exception as exc: # noqa: BLE001 - propagate after fallbacks
last_error = exc
logger.warning(
"Falha ao renderizar com codec %s: %s", codec, exc, exc_info=True
)
if output_path.exists():
output_path.unlink(missing_ok=True)
if temp_audio_path.exists():
temp_audio_path.unlink(missing_ok=True)
raise RuntimeError("Todas as tentativas de renderizacao falharam") from last_error
def _encoding_attempts(self) -> List[Dict[str, str | None]]:
settings = self.settings.rendering
attempts: List[Dict[str, str | None]] = []
attempts.append(
{
"codec": settings.video_codec,
"bitrate": settings.bitrate,
"preset": settings.preset,
}
)
deduped: List[Dict[str, str | None]] = []
seen = set()
for attempt in attempts:
key = (attempt["codec"], attempt["bitrate"], attempt["preset"])
if key in seen:
continue
seen.add(key)
deduped.append(attempt)
return deduped
@staticmethod
def _with_audio(
composite: CompositeVideoClip,
audio_clip,
) -> CompositeVideoClip:
"""Attach audio to a composite clip across MoviePy versions."""
if hasattr(composite, "with_audio"):
return composite.with_audio(audio_clip)
if hasattr(composite, "set_audio"):
return composite.set_audio(audio_clip)
raise AttributeError("CompositeVideoClip does not support audio assignment")
@staticmethod
def _make_textclip(
*,
text: str,
font_path,
font_size: int,
color: str,
size: Tuple[int, int],
) -> TextClip:
"""Create a TextClip compatible with MoviePy 1.x and 2.x.
MoviePy 2.x removed the 'align' keyword from TextClip. We try with
'align' for older versions and fall back to a call without it when
unsupported.
"""
kwargs = dict(
text=text,
font=str(font_path),
font_size=font_size,
color=color,
method="caption",
size=size,
)
try:
return TextClip(**kwargs, align="center") # MoviePy 1.x style
except TypeError:
logger.debug("TextClip 'align' not supported; falling back without it")
return TextClip(**kwargs) # MoviePy 2.x style

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,333 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import numpy as np
from faster_whisper import WhisperModel
from video_render.config import Settings
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WordTiming:
start: float
end: float
word: str
@dataclass(frozen=True)
class TranscriptSegment:
id: int
start: float
end: float
text: str
words: List[WordTiming]
@dataclass(frozen=True)
class TranscriptionResult:
segments: List[TranscriptSegment]
full_text: str
class TranscriptionService:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._model: Optional[WhisperModel] = None
def _load_model(self) -> WhisperModel:
if self._model is None:
logger.info(
"Carregando modelo Faster-Whisper '%s' (device=%s, compute_type=%s)",
self.settings.whisper.model_size,
self.settings.whisper.device or "auto",
self.settings.whisper.compute_type or "default",
)
self._model = WhisperModel(
self.settings.whisper.model_size,
device=self.settings.whisper.device or "auto",
compute_type=self.settings.whisper.compute_type or "default",
download_root=str(self.settings.whisper.download_root),
)
return self._model
def unload_model(self) -> None:
"""Unload the Whisper model to free memory (reduces RAM usage by 1-3GB)."""
if self._model is not None:
logger.info("Descarregando modelo Whisper para liberar memória...")
del self._model
self._model = None
# Force garbage collection to immediately free GPU/CPU memory
import gc
gc.collect()
logger.info("Modelo Whisper descarregado com sucesso")
def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult:
if output_dir is not None:
existing_transcription = self.load(output_dir)
if existing_transcription is not None:
logger.info("Transcrição já existe em %s, reutilizando...", output_dir)
return existing_transcription
# Get audio duration to decide if we need chunked processing
audio_duration = self._get_audio_duration(audio_path)
chunk_duration_minutes = 30 # Process in 30-minute chunks for long videos
chunk_duration_seconds = chunk_duration_minutes * 60
# For videos longer than 30 minutes, use chunked processing to avoid OOM
if audio_duration > chunk_duration_seconds:
logger.info(
f"Áudio longo detectado ({audio_duration/60:.1f} min). "
f"Processando em chunks de {chunk_duration_minutes} min para evitar erro de memória..."
)
return self._transcribe_chunked(audio_path, chunk_duration_seconds)
else:
logger.info(f"Iniciando transcrição do áudio ({audio_duration/60:.1f} min) com FasterWhisper...")
return self._transcribe_full(audio_path)
def _get_audio_duration(self, audio_path: Path) -> float:
"""Get audio duration in seconds."""
try:
from moviepy.audio.io.AudioFileClip import AudioFileClip
with AudioFileClip(str(audio_path)) as audio:
return audio.duration or 0.0
except Exception as e:
logger.warning(f"Falha ao obter duração do áudio, assumindo curto: {e}")
return 0.0 # Assume short if we can't determine
def _transcribe_full(self, audio_path: Path) -> TranscriptionResult:
"""Transcribe entire audio at once (for shorter videos)."""
model = self._load_model()
segments, _ = model.transcribe(
str(audio_path),
beam_size=5,
word_timestamps=True,
)
parsed_segments: List[TranscriptSegment] = []
full_text_parts: List[str] = []
for idx, segment in enumerate(segments):
words = [
WordTiming(start=w.start, end=w.end, word=w.word.strip())
for w in segment.words or []
if w.word.strip()
]
text = segment.text.strip()
full_text_parts.append(text)
parsed_segments.append(
TranscriptSegment(
id=idx,
start=segment.start,
end=segment.end,
text=text,
words=words,
)
)
return TranscriptionResult(
segments=parsed_segments,
full_text=" ".join(full_text_parts).strip(),
)
def _transcribe_chunked(self, audio_path: Path, chunk_duration: float) -> TranscriptionResult:
"""Transcribe audio in chunks to avoid OOM on long videos."""
import subprocess
from moviepy.audio.io.AudioFileClip import AudioFileClip
model = self._load_model()
all_segments: List[TranscriptSegment] = []
full_text_parts: List[str] = []
segment_id_counter = 0
# Get total duration
total_duration = self._get_audio_duration(audio_path)
num_chunks = int(np.ceil(total_duration / chunk_duration))
logger.info(f"Processando áudio em {num_chunks} chunks...")
for chunk_idx in range(num_chunks):
start_time = chunk_idx * chunk_duration
end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
logger.info(
f"Processando chunk {chunk_idx + 1}/{num_chunks} "
f"({start_time/60:.1f}min - {end_time/60:.1f}min)..."
)
# Extract chunk using ffmpeg directly (more reliable than moviepy subclip)
temp_chunk_path = audio_path.parent / f"temp_chunk_{chunk_idx}.wav"
try:
# Use ffmpeg to extract the chunk
chunk_duration_actual = end_time - start_time
ffmpeg_cmd = [
'ffmpeg',
'-y', # Overwrite output file
'-ss', str(start_time), # Start time
'-i', str(audio_path), # Input file
'-t', str(chunk_duration_actual), # Duration
'-acodec', 'pcm_s16le', # Audio codec
'-ar', '44100', # Sample rate
'-ac', '2', # Stereo
'-loglevel', 'error', # Only show errors
str(temp_chunk_path)
]
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
# Transcribe chunk
segments, _ = model.transcribe(
str(temp_chunk_path),
beam_size=5,
word_timestamps=True,
)
# Process segments with time offset
for segment in segments:
words = [
WordTiming(
start=w.start + start_time,
end=w.end + start_time,
word=w.word.strip()
)
for w in segment.words or []
if w.word.strip()
]
text = segment.text.strip()
full_text_parts.append(text)
all_segments.append(
TranscriptSegment(
id=segment_id_counter,
start=segment.start + start_time,
end=segment.end + start_time,
text=text,
words=words,
)
)
segment_id_counter += 1
# Force garbage collection after each chunk
import gc
gc.collect()
except subprocess.CalledProcessError as e:
logger.error(f"Erro ao extrair chunk {chunk_idx}: {e.stderr.decode() if e.stderr else str(e)}")
raise
finally:
# Clean up temp chunk
if temp_chunk_path.exists():
temp_chunk_path.unlink()
logger.info(f"Transcrição em chunks concluída: {len(all_segments)} segmentos processados")
return TranscriptionResult(
segments=all_segments,
full_text=" ".join(full_text_parts).strip(),
)
@staticmethod
def persist(result: TranscriptionResult, destination: Path) -> None:
json_path = destination / "transcription.json"
text_path = destination / "transcription.txt"
payload = {
"segments": [
{
"id": segment.id,
"start": segment.start,
"end": segment.end,
"text": segment.text,
"words": [
{"start": word.start, "end": word.end, "text": word.word}
for word in segment.words
],
}
for segment in result.segments
],
"full_text": result.full_text,
}
with json_path.open("w", encoding="utf-8") as fp:
json.dump(payload, fp, ensure_ascii=False, indent=2)
with text_path.open("w", encoding="utf-8") as fp:
fp.write(result.full_text)
logger.info("Transcricao salva em %s", destination)
@staticmethod
def load(source: Path) -> Optional[TranscriptionResult]:
json_path = source / "transcription.json"
if not json_path.exists():
return None
try:
with json_path.open("r", encoding="utf-8") as fp:
payload = json.load(fp)
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
"Falha ao carregar transcricao existente de %s: %s", json_path, exc
)
return None
segments_payload = payload.get("segments", [])
if not isinstance(segments_payload, list):
logger.warning(
"Formato inesperado ao carregar transcricao de %s: 'segments' invalido",
json_path,
)
return None
segments: List[TranscriptSegment] = []
for idx, segment_data in enumerate(segments_payload):
if not isinstance(segment_data, dict):
logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data)
continue
try:
segment_id = int(segment_data.get("id", idx))
start = float(segment_data["start"])
end = float(segment_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data)
continue
text = str(segment_data.get("text", "")).strip()
words_payload = segment_data.get("words", [])
words: List[WordTiming] = []
if isinstance(words_payload, list):
for word_data in words_payload:
if not isinstance(word_data, dict):
continue
try:
w_start = float(word_data["start"])
w_end = float(word_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug(
"Palavra sem dados obrigatorios ignorada: %s", word_data
)
continue
word_text = str(word_data.get("text", "")).strip()
if not word_text:
continue
words.append(WordTiming(start=w_start, end=w_end, word=word_text))
segments.append(
TranscriptSegment(
id=segment_id,
start=start,
end=end,
text=text,
words=words,
)
)
full_text = str(payload.get("full_text", "")).strip()
return TranscriptionResult(segments=segments, full_text=full_text)

80
video_render/utils.py Normal file
View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import re
import unicodedata
from pathlib import Path
from typing import Iterable
def sanitize_filename(name: str) -> str:
normalized = unicodedata.normalize("NFKD", name)
ascii_text = normalized.encode("ASCII", "ignore").decode()
ascii_text = ascii_text.lower()
ascii_text = ascii_text.replace(" ", "_")
ascii_text = re.sub(r"[^a-z0-9_\-\.]", "", ascii_text)
ascii_text = re.sub(r"_+", "_", ascii_text)
return ascii_text.strip("_") or "video"
def ensure_workspace(root: Path, folder_name: str) -> Path:
workspace = root / folder_name
workspace.mkdir(parents=True, exist_ok=True)
return workspace
def remove_paths(paths: Iterable[Path]) -> None:
import logging
import time
logger = logging.getLogger(__name__)
for path in paths:
if not path.exists():
continue
# Try to remove with retries and better error handling
max_retries = 3
for attempt in range(max_retries):
try:
if path.is_file() or path.is_symlink():
path.unlink(missing_ok=True)
else:
for child in sorted(path.rglob("*"), reverse=True):
if child.is_file() or child.is_symlink():
try:
child.unlink(missing_ok=True)
except PermissionError:
logger.warning(f"Não foi possível deletar {child}: sem permissão")
# Try to change permissions and retry
try:
child.chmod(0o777)
child.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Falha ao forçar deleção de {child}: {e}")
elif child.is_dir():
try:
child.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {child}: {e}")
try:
path.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {path}: {e}")
break # Success, exit retry loop
except PermissionError as e:
if attempt < max_retries - 1:
logger.warning(f"Tentativa {attempt + 1}/{max_retries} falhou ao deletar {path}: {e}. Tentando novamente...")
time.sleep(0.5) # Wait a bit before retry
# Try to change permissions
try:
path.chmod(0o777)
except Exception:
pass
else:
logger.error(f"Não foi possível deletar {path} após {max_retries} tentativas: {e}")
except Exception as e:
logger.error(f"Erro inesperado ao deletar {path}: {e}")
break # Don't retry on unexpected errors