From c5d3e83a5f8f37da0f633ce17567e6dc7a987e6c Mon Sep 17 00:00:00 2001 From: LeoMortari Date: Wed, 12 Nov 2025 11:38:09 -0300 Subject: [PATCH] #v2 - Inicia testes da v2 - Adiciona rastreamento de objetos - Facial detection - Legenda interativa - Cortes mais precisos - Refinamento do Prompt --- .env.example | 47 ++ .gitignore | 3 +- docker-compose.yml | 13 +- dockerfile | 3 + main.py | 14 + prompts/generate.txt | 107 +++-- requirements.txt | 4 +- video_render/config.py | 34 +- video_render/context_detection.py | 398 +++++++++++++++++ video_render/llm.py | 221 ++++++---- video_render/media.py | 18 + video_render/pipeline.py | 37 +- video_render/rendering.py | 457 +++++++++++++------- video_render/smart_framing.py | 687 ++++++++++++++++++++++++++++++ video_render/transcription.py | 9 +- 15 files changed, 1739 insertions(+), 313 deletions(-) create mode 100644 .env.example create mode 100644 video_render/context_detection.py create mode 100644 video_render/smart_framing.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..45d20c8 --- /dev/null +++ b/.env.example @@ -0,0 +1,47 @@ +RABBITMQ_HOST=rabbitmq +RABBITMQ_PORT=5672 +RABBITMQ_USER=admin +RABBITMQ_PASS=your_password_here +RABBITMQ_QUEUE=to-render +RABBITMQ_UPLOAD_QUEUE=to-upload +RABBITMQ_PREFETCH=1 +RABBITMQ_HEARTBEAT=60 +RABBITMQ_BLOCKED_TIMEOUT=300 +OPENROUTER_API_URL=https://openrouter.ai/api/v1/chat/completions +OPENROUTER_API_KEY=your_openrouter_api_key_here + +# Model selection - Recommended options: +# - openai/gpt-oss-20b:free (Free tier, good quality) +# - qwen/qwen-2.5-72b-instruct:free (Free, excellent reasoning) +# - google/gemini-pro-1.5 (Best cost-benefit for podcasts) +# - anthropic/claude-3.5-sonnet (Premium quality, best reasoning) +OPENROUTER_MODEL=qwen/qwen-2.5-72b-instruct:free +OPENROUTER_TEMPERATURE=0.6 +OPENROUTER_PROMPT_PATH=prompts/generate.txt + +FASTER_WHISPER_MODEL_SIZE=medium +FASTER_WHISPER_DEVICE=auto + +RENDER_WIDTH=1080 +RENDER_HEIGHT=1920 + +RENDER_FPS=30 +RENDER_CODEC=libx264 +RENDER_AUDIO_CODEC=aac +RENDER_BITRATE=5000k +RENDER_PRESET=faster + +SUBTITLE_HIGHLIGHT_COLOR=#00FF00 +SUBTITLE_BASE_COLOR=#FFFFFF + +RENDER_FONT_PATH=./Montserrat.ttf +RENDER_TITLE_FONT_SIZE=110 +RENDER_SUBTITLE_FONT_SIZE=64 + +CAPTION_MIN_WORDS=2 +CAPTION_MAX_WORDS=2 + +ENABLE_SMART_FRAMING=true +SMART_FRAMING_MIN_CONFIDENCE=0.5 +SMART_FRAMING_SMOOTHING_WINDOW=20 +SMART_FRAMING_FRAME_SKIP=2 diff --git a/.gitignore b/.gitignore index 7a2b6cf..133b8c8 100644 --- a/.gitignore +++ b/.gitignore @@ -14,7 +14,7 @@ outputs/ # Ignore virtual envs venv/ env/ - +.claude # Ignore editor files .idea/ *.swp @@ -31,3 +31,4 @@ env/ # Ignore mypy and pylint cache .mypy_cache/ .pylint.d/ +CLAUDE.MD diff --git a/docker-compose.yml b/docker-compose.yml index b9264d9..628ee37 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,15 +3,18 @@ services: restart: unless-stopped build: . environment: - - FASTER_WHISPER_MODEL_SIZE=medium - - GEMINI_API_KEY=${GEMINI_API_KEY} - - GEMINI_MODEL=gemini-2.5-flash - - OPENROUTER_API_KEY=${OPENROUTER_API_KEY} - - OPENROUTER_MODEL=openai/gpt-oss-20b:free - RABBITMQ_PASS=${RABBITMQ_PASS} + - OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions} + - OPENROUTER_API_KEY=${OPENROUTER_API_KEY} + - OPENROUTER_MODEL=${OPENROUTER_MODEL:-openai/gpt-oss-20b:free} + - OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt} + - FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium} volumes: - "/root/videos:/app/videos" - "/root/outputs:/app/outputs" + - "/root/prompts:/app/prompts" + # - "./videos:/app/videos" + # - "./outputs:/app/outputs" command: "python -u main.py" networks: - dokploy-network diff --git a/dockerfile b/dockerfile index d146341..15bb4b8 100644 --- a/dockerfile +++ b/dockerfile @@ -23,6 +23,9 @@ RUN apt-get update && \ imagemagick \ fonts-liberation \ wget \ + libsm6 \ + libxext6 \ + libxrender-dev \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . diff --git a/main.py b/main.py index 1ef531b..1b5cb5a 100644 --- a/main.py +++ b/main.py @@ -1,3 +1,17 @@ +import os +import warnings + +# Suppress FFmpeg/AV1 warnings for cleaner logs +os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet' +os.environ['OPENCV_LOG_LEVEL'] = 'ERROR' + +# Suppress MoviePy verbose logging +os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1' + +# Filter deprecation warnings +warnings.filterwarnings('ignore', category=DeprecationWarning) +warnings.filterwarnings('ignore', category=UserWarning, module='moviepy') + from video_render.config import load_settings from video_render.logging_utils import setup_logging from video_render.messaging import RabbitMQWorker diff --git a/prompts/generate.txt b/prompts/generate.txt index 2ab45f9..8638af2 100644 --- a/prompts/generate.txt +++ b/prompts/generate.txt @@ -1,36 +1,85 @@ -Voce e um estrategista de conteudo especializado em identificar cortes curtos de videos longos que performam bem em redes sociais. +Voce e especialista em viralidade de redes sociais (TikTok, Instagram Reels, YouTube Shorts). Analise a transcricao e selecione trechos com MAXIMO potencial viral, priorizando qualidade sobre quantidade. -FUNCAO: -- Analisar a transcricao completa de um video. -- Escolher trechos curtos (entre 60s e 90s) com maior chance de engajamento. -- O inicio do trecho deve ter um hook para engajar e prender a atenção do espectador. -- Responder APENAS em JSON valido. +PROCESSO DE ANALISE: +1. Mapear potenciais trechos na transcricao +2. Avaliar cada trecho usando sistema de pontuacao abaixo +3. Rankear do maior para menor score viral +4. Selecionar apenas os top-ranked baseado na duracao do video -FORMATO DA RESPOSTA: -{ - "highlights": [ - { - "start": , - "end": , - "summary": "Resumo conciso do porque este trecho engaja" - } - ] -} +SISTEMA DE PONTUACAO VIRAL (0-100 pontos): -REGRAS: -- Liste no maximo 6 destaques. -- Respeite a ordem cronologica. -- Nunca deixe listas vazias; se nada for relevante, inclua uma entrada com start = 0, end = 0 e summary explicando a ausencia de cortes. -- Utilize apenas valores numericos simples (ponto como separador decimal). -- Nao repita um mesmo trecho. +HOOK/ABERTURA (0-25 pontos): +[25] Frase choqueante, pergunta polemica ou promessa ousada +[20] Historia intrigante ou situacao inusitada +[15] Afirmacao interessante mas previsivel +[10] Introducao generica mas aceitavel +[0] "Oi", "entao", silencio ou conteudo fraco -PERSPECTIVA DE ANALISE: -- Concentre-se em momentos com gatilhos emocionais, insights, storytelling ou chamadas para acao fortes. -- Prefira trechos com comeco, meio e fim claros. -- Evite partes redundantes, silenciosas ou extremamente tecnicas. +GATILHO EMOCIONAL (0-25 pontos): +[25] Emocao extrema: raiva, choque, riso intenso, inspiracao profunda +[20] Emocao forte: surpresa, indignacao, humor, curiosidade intensa +[15] Emocao moderada: interesse, leve humor, curiosidade +[10] Emocao fraca: informativo sem impacto emocional +[0] Monotono, tecnico, sem apelo emocional + +VALOR/UTILIDADE (0-20 pontos): +[20] Segredo valioso, insight transformador ou informacao exclusiva +[15] Ensina algo pratico e imediatamente aplicavel +[10] Opiniao interessante ou perspectiva util +[5] Informacao generica ou conhecimento comum +[0] Nenhum valor pratico, puro enrolation + +ESTRUTURA NARRATIVA (0-15 pontos): +[15] Historia completa com inicio, conflito/climax e resolucao +[10] Segmento com comeco e fim coerentes +[5] Trecho com sentido mas cortado abruptamente +[0] Fragmento sem contexto ou conclusao + +RITMO E ENERGIA (0-15 pontos): +[15] Dinamico, sem pausas, alta energia, palavras impactantes +[10] Bom ritmo com pausas naturais curtas +[5] Ritmo lento mas aceitavel +[0] Muitas pausas, hesitacoes, monotonia, silencio + +REGRAS DE QUANTIDADE: +5-10 min: 3 clipes (minimo 1 se score alto) +10-20 min: 4 clipes +20-30 min: 5 clipes +30+ min: 6 clipes (maximo absoluto) + +IMPORTANTE: Priorize qualidade. Melhor 3 clipes score 80+ que 6 clipes score 50. Se poucos momentos virais, retorne apenas os melhores (minimo 1). + +CRITERIOS DE SELECAO: +- Score viral maior ou igual 60 pontos (idealmente maior ou igual 70) +- Duracao ideal: 60-90s +- Duracao minima: 60s | Duracao maxima: 120s +- Sem sobreposicao (end de um menor que start do proximo) +- Inicio e fim coerentes + +EVITE: +- Introducoes genericas +- Trechos com silencio/pausas maiores que 3s +- Explicacoes tecnicas sem gancho emocional +- Segmentos sem conclusao +- Momentos de transicao + +FORMATO JSON (retorne APENAS isto): +{"highlights":[{"start":,"end":,"summary":"Score estimado e gatilhos principais"}]} + +REGRAS TECNICAS: +- Float com ponto decimal (45.5 NAO 45,5) +- Timestamps exatos dos segments fornecidos +- Ordem cronologica (start crescente) +- Minimo 1, maximo 6 highlights +- Summary conciso (1-2 frases) TAREFA: -- Leia a transcricao recebida no campo "transcript". -- Use a lista de marcas de tempo detalhadas no campo "segments" para embasar suas escolhas. -- Produza a saida JSON descrita acima. +1. Leia transcricao e timestamps +2. Avalie e pontue trechos mentalmente +3. Rankear por score viral +4. Selecione top-ranked baseado na duracao +5. Retorne JSON +6. Se video fraco, retorne pelo menos 1 highlight + +Objetivo: MAXIMIZAR chance de viralizar. Seja criterioso, apenas melhores trechos. diff --git a/requirements.txt b/requirements.txt index f38966b..758aa59 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ numpy>=1.26.0 requests pika faster-whisper==1.2.0 -google-genai +mediapipe==0.10.18 +opencv-python==4.10.0.84 +scipy>=1.11.0 diff --git a/video_render/config.py b/video_render/config.py index 8f346ad..547d10b 100644 --- a/video_render/config.py +++ b/video_render/config.py @@ -13,6 +13,8 @@ TEMP_ROOT = BASE_DIR / "temp" @dataclass(frozen=True) class RabbitMQSettings: + # host: str = os.environ.get("RABBITMQ_HOST", "154.12.229.181") + # port: int = int(os.environ.get("RABBITMQ_PORT", 32790)) host: str = os.environ.get("RABBITMQ_HOST", "rabbitmq") port: int = int(os.environ.get("RABBITMQ_PORT", 5672)) user: str = os.environ.get("RABBITMQ_USER", "admin") @@ -24,33 +26,19 @@ class RabbitMQSettings: blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 300)) -@dataclass(frozen=True) -class GeminiSettings: - api_key: str = os.environ.get("GEMINI_API_KEY", "") - model: str = os.environ.get("GEMINI_MODEL", "gemini-2.5-flash") - safety_settings: str | None = os.environ.get("GEMINI_SAFETY_SETTINGS") - temperature: float = float(os.environ.get("GEMINI_TEMPERATURE", 0.2)) - top_k: int | None = ( - int(os.environ["GEMINI_TOP_K"]) if os.environ.get("GEMINI_TOP_K") else None - ) - top_p: float | None = ( - float(os.environ["GEMINI_TOP_P"]) if os.environ.get("GEMINI_TOP_P") else None - ) - prompt_path: str = os.environ.get("GEMINI_PROMPT_PATH", "prompts/generate.txt") - - @dataclass(frozen=True) class OpenRouterSettings: - api_key: str = os.environ.get("OPENROUTER_API_KEY", "") + api_key: str = os.environ.get("OPENROUTER_API_KEY", "https://openrouter.ai/api/v1/chat/completions") model: str = os.environ.get( "OPENROUTER_MODEL", "openai/gpt-oss-20b:free" ) temperature: float = float(os.environ.get("OPENROUTER_TEMPERATURE", 0.6)) + prompt_path: str = os.environ.get("OPENROUTER_PROMPT_PATH", "prompts/generate.txt") @dataclass(frozen=True) class WhisperSettings: - model_size: str = os.environ.get("FASTER_WHISPER_MODEL_SIZE", "small") + model_size: str = os.environ.get("FASTER_WHISPER_MODEL_SIZE", "medium") device: str | None = os.environ.get("FASTER_WHISPER_DEVICE") compute_type: str | None = os.environ.get("FASTER_WHISPER_COMPUTE_TYPE") download_root: Path = Path( @@ -67,19 +55,23 @@ class RenderingSettings: audio_codec: str = os.environ.get("RENDER_AUDIO_CODEC", "aac") bitrate: str = os.environ.get("RENDER_BITRATE", "5000k") preset: str = os.environ.get("RENDER_PRESET", "faster") - highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#FFD200") + highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#00FF00") base_color: str = os.environ.get("SUBTITLE_BASE_COLOR", "#FFFFFF") font_path: Path = Path(os.environ.get("RENDER_FONT_PATH", "./Montserrat.ttf")) title_font_size: int = int(os.environ.get("RENDER_TITLE_FONT_SIZE", 110)) subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64)) - caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 3)) - caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 4)) + caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2)) + caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2)) + # Smart framing settings + enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes") + smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.5)) + smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 20)) + smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 2)) # Process every Nth frame (CPU optimization) @dataclass(frozen=True) class Settings: rabbitmq: RabbitMQSettings = RabbitMQSettings() - gemini: GeminiSettings = GeminiSettings() openrouter: OpenRouterSettings = OpenRouterSettings() whisper: WhisperSettings = WhisperSettings() rendering: RenderingSettings = RenderingSettings() diff --git a/video_render/context_detection.py b/video_render/context_detection.py new file mode 100644 index 0000000..e342b4c --- /dev/null +++ b/video_render/context_detection.py @@ -0,0 +1,398 @@ +""" +Context detection module for video analysis. + +This module provides functionality to detect faces, track people, +and identify who is speaking in video content using MediaPipe and audio analysis. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Optional, Tuple + +import cv2 +import mediapipe as mp +import numpy as np +from scipy import signal + +logger = logging.getLogger(__name__) + + +@dataclass +class FaceDetection: + """Represents a detected face in a frame.""" + x: int + y: int + width: int + height: int + confidence: float + center_x: int + center_y: int + landmarks: Optional[List[Tuple[int, int]]] = None + + +@dataclass +class PersonTracking: + """Tracks a person across frames.""" + person_id: int + face: FaceDetection + is_speaking: bool + speaking_confidence: float + frame_number: int + + +@dataclass +class FrameContext: + """Context information for a video frame.""" + frame_number: int + timestamp: float + detected_faces: List[FaceDetection] + active_speakers: List[int] # indices of speaking faces + primary_focus: Optional[Tuple[int, int]] # (x, y) center point + layout_mode: str # "single", "dual_split", "grid" + + +class MediaPipeDetector: + """Face and pose detection using MediaPipe.""" + + def __init__(self, min_detection_confidence: float = 0.5, min_tracking_confidence: float = 0.5): + self.min_detection_confidence = min_detection_confidence + self.min_tracking_confidence = min_tracking_confidence + self.mp_face_detection = mp.solutions.face_detection + self.mp_face_mesh = mp.solutions.face_mesh + + self.face_detection = self.mp_face_detection.FaceDetection( + min_detection_confidence=min_detection_confidence, + model_selection=1 + ) + + self.face_mesh = self.mp_face_mesh.FaceMesh( + max_num_faces=5, + min_detection_confidence=min_detection_confidence, + min_tracking_confidence=min_tracking_confidence, + static_image_mode=False + ) + + logger.info("MediaPipe detector initialized") + + def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]: + """ + Detect faces in a frame. + + Args: + frame: RGB image array + + Returns: + List of detected faces + """ + height, width = frame.shape[:2] + + if len(frame.shape) == 2: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) + elif frame.shape[2] == 4: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) + else: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + results = self.face_detection.process(frame_rgb) + + faces = [] + if results.detections: + for detection in results.detections: + bbox = detection.location_data.relative_bounding_box + + x = int(bbox.xmin * width) + y = int(bbox.ymin * height) + w = int(bbox.width * width) + h = int(bbox.height * height) + + x = max(0, min(x, width - 1)) + y = max(0, min(y, height - 1)) + w = min(w, width - x) + h = min(h, height - y) + + center_x = x + w // 2 + center_y = y + h // 2 + + confidence = detection.score[0] if detection.score else 0.0 + + faces.append(FaceDetection( + x=x, + y=y, + width=w, + height=h, + confidence=confidence, + center_x=center_x, + center_y=center_y + )) + + return faces + + def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]: + """ + Detect faces with landmarks for lip sync detection. + + Args: + frame: RGB image array + + Returns: + List of detected faces with landmark information + """ + height, width = frame.shape[:2] + + if len(frame.shape) == 2: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) + elif frame.shape[2] == 4: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) + else: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + + results = self.face_mesh.process(frame_rgb) + + faces = [] + if results.multi_face_landmarks: + for face_landmarks in results.multi_face_landmarks: + xs = [lm.x for lm in face_landmarks.landmark] + ys = [lm.y for lm in face_landmarks.landmark] + + x_min, x_max = min(xs), max(xs) + y_min, y_max = min(ys), max(ys) + + x = int(x_min * width) + y = int(y_min * height) + w = int((x_max - x_min) * width) + h = int((y_max - y_min) * height) + + center_x = x + w // 2 + center_y = y + h // 2 + + lip_landmarks = [] + for idx in [13, 14, 78, 308]: + lm = face_landmarks.landmark[idx] + lip_landmarks.append((int(lm.x * width), int(lm.y * height))) + + faces.append(FaceDetection( + x=x, + y=y, + width=w, + height=h, + confidence=1.0, + center_x=center_x, + center_y=center_y, + landmarks=lip_landmarks + )) + + return faces + + def close(self): + """Release MediaPipe resources.""" + self.face_detection.close() + self.face_mesh.close() + + +class AudioActivityDetector: + """Detects speech activity in audio.""" + + def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30): + self.sample_rate = sample_rate + self.frame_duration_ms = frame_duration_ms + self.frame_size = int(sample_rate * frame_duration_ms / 1000) + + logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)") + + def detect_speaking_periods( + self, + audio_samples: np.ndarray, + threshold: float = 0.02, + min_speech_duration: float = 0.1 + ) -> List[Tuple[float, float]]: + """ + Detect periods of speech in audio. + + Args: + audio_samples: Audio samples array + threshold: Energy threshold for speech detection + min_speech_duration: Minimum duration of speech in seconds + + Returns: + List of (start_time, end_time) tuples in seconds + """ + if audio_samples.ndim > 1: + audio_samples = audio_samples.mean(axis=1) + + energies = [] + for i in range(0, len(audio_samples), self.frame_size): + frame = audio_samples[i:i + self.frame_size] + if len(frame) > 0: + energy = np.sqrt(np.mean(frame ** 2)) + energies.append(energy) + + speaking_frames = [e > threshold for e in energies] + + periods = [] + start_frame = None + + for i, is_speaking in enumerate(speaking_frames): + if is_speaking and start_frame is None: + start_frame = i + elif not is_speaking and start_frame is not None: + start_time = start_frame * self.frame_duration_ms / 1000 + end_time = i * self.frame_duration_ms / 1000 + + if end_time - start_time >= min_speech_duration: + periods.append((start_time, end_time)) + + start_frame = None + + if start_frame is not None: + start_time = start_frame * self.frame_duration_ms / 1000 + end_time = len(speaking_frames) * self.frame_duration_ms / 1000 + if end_time - start_time >= min_speech_duration: + periods.append((start_time, end_time)) + + return periods + + def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool: + """Check if there is speech activity at a given time.""" + for start, end in speaking_periods: + if start <= time <= end: + return True + return False + + +class ContextAnalyzer: + """Analyzes video context to determine focus and layout.""" + + def __init__(self): + self.detector = MediaPipeDetector() + self.audio_detector = AudioActivityDetector() + self.previous_faces: List[FaceDetection] = [] + + logger.info("Context analyzer initialized") + + def analyze_frame( + self, + frame: np.ndarray, + timestamp: float, + frame_number: int, + speaking_periods: Optional[List[Tuple[float, float]]] = None + ) -> FrameContext: + """ + Analyze a single frame to extract context information. + + Args: + frame: Video frame (BGR format from OpenCV) + timestamp: Frame timestamp in seconds + frame_number: Frame index + speaking_periods: List of (start, end) times where speech is detected + + Returns: + FrameContext with detection results + """ + faces = self.detector.detect_face_landmarks(frame) + + if not faces: + faces = self.detector.detect_faces(frame) + + # Determine who is speaking + active_speakers = [] + for i, face in enumerate(faces): + is_speaking = False + + if speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp): + is_speaking = True + + if face.landmarks and len(self.previous_faces) > i: + is_speaking = is_speaking or self._detect_lip_movement(face, self.previous_faces[i]) + + if is_speaking: + active_speakers.append(i) + + num_faces = len(faces) + num_speakers = len(active_speakers) + + if num_faces == 0: + layout_mode = "single" + elif num_faces == 1: + layout_mode = "single" + elif num_faces == 2: + layout_mode = "dual_split" + elif num_faces >= 3: + layout_mode = "dual_split" + else: + layout_mode = "single" + + primary_focus = self._calculate_focus_point(faces, active_speakers) + + self.previous_faces = faces + + return FrameContext( + frame_number=frame_number, + timestamp=timestamp, + detected_faces=faces, + active_speakers=active_speakers, + primary_focus=primary_focus, + layout_mode=layout_mode + ) + + def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool: + """ + Detect lip movement by comparing landmarks between frames. + + Args: + current_face: Current frame face detection + previous_face: Previous frame face detection + + Returns: + True if significant lip movement detected + """ + if not current_face.landmarks or not previous_face.landmarks: + return False + + def lip_distance(landmarks): + if len(landmarks) < 4: + return 0 + + upper = np.array(landmarks[0:2]) + lower = np.array(landmarks[2:4]) + return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0)) + + current_dist = lip_distance(current_face.landmarks) + previous_dist = lip_distance(previous_face.landmarks) + + threshold = 2.0 + return abs(current_dist - previous_dist) > threshold + + def _calculate_focus_point( + self, + faces: List[FaceDetection], + active_speakers: List[int] + ) -> Optional[Tuple[int, int]]: + """ + Calculate the primary focus point based on detected faces and speakers. + + IMPORTANT: This focuses on ONE person to avoid focusing on empty space (table). + When multiple people are present, we pick the most relevant person, not average positions. + + Args: + faces: List of detected faces + active_speakers: Indices of faces that are speaking + + Returns: + (x, y) tuple of focus center, or None if no faces + """ + if not faces: + return None + + if active_speakers: + speaker_faces = [faces[i] for i in active_speakers if i < len(faces)] + if speaker_faces: + primary_speaker = max(speaker_faces, key=lambda f: f.confidence) + return (primary_speaker.center_x, primary_speaker.center_y) + + most_confident = max(faces, key=lambda f: f.confidence) + return (most_confident.center_x, most_confident.center_y) + + def close(self): + """Release resources.""" + self.detector.close() diff --git a/video_render/llm.py b/video_render/llm.py index 84d2d4f..1f2d798 100644 --- a/video_render/llm.py +++ b/video_render/llm.py @@ -2,11 +2,11 @@ from __future__ import annotations import json import logging +import time +import os from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Dict, List -from google import genai -from google.genai import types as genai_types import requests from video_render.config import BASE_DIR, Settings @@ -14,27 +14,24 @@ from video_render.transcription import TranscriptionResult logger = logging.getLogger(__name__) -OPENROUTER_ENDPOINT = "https://openrouter.ai/api/v1/chat/completions" +OPENROUTER_ENDPOINT = os.environ.get("OPENROUTER_API_URL", "https://openrouter.ai/api/v1/chat/completions") -class GeminiHighlighter: +class OpenRouterCopywriter: def __init__(self, settings: Settings) -> None: - if not settings.gemini.api_key: - raise RuntimeError("GEMINI_API_KEY nao foi definido") - - prompt_path = Path(settings.gemini.prompt_path) + if not settings.openrouter.api_key: + raise RuntimeError("OPENROUTER_API_KEY nao foi definido") + self.settings = settings + prompt_path = Path(settings.openrouter.prompt_path) if not prompt_path.is_absolute(): prompt_path = BASE_DIR / prompt_path - if not prompt_path.exists(): - raise FileNotFoundError(f"Prompt do Gemini nao encontrado: {prompt_path}") - - self.prompt_template = prompt_path.read_text(encoding="utf-8") - self.settings = settings - self.client = genai.Client() + raise FileNotFoundError(f"Prompt nao encontrado: {prompt_path}") + self.highlights_prompt_template = prompt_path.read_text(encoding="utf-8") def generate_highlights(self, transcription: TranscriptionResult) -> List[Dict]: + """Generate video highlights using OpenRouter GPT-OSS with retry logic.""" payload = { "transcript": transcription.full_text, "segments": [ @@ -47,93 +44,139 @@ class GeminiHighlighter: ], } - try: - response = self._call_gemini(payload) - except Exception as exc: - logger.error("Gemini API request falhou: %s", exc) - raise RuntimeError("Gemini API request falhou") from exc - - raw_text = self._extract_response_text(response) - - parsed = self._extract_json(raw_text) - highlights = parsed.get("highlights") - if not isinstance(highlights, list): - raise ValueError("Resposta do Gemini invalida: campo 'highlights' ausente") - return highlights - - def _call_gemini(self, payload: Dict[str, Any]) -> Any: - contents = [ - { - "role": "user", - "parts": [ - {"text": self.prompt_template}, - {"text": json.dumps(payload, ensure_ascii=False)}, - ], - } - ] - - request_kwargs: Dict[str, Any] = { - "model": self.settings.gemini.model, - "contents": contents, + body = { + "model": self.settings.openrouter.model, + "temperature": self.settings.openrouter.temperature, + "messages": [ + {"role": "system", "content": self.highlights_prompt_template}, + { + "role": "user", + "content": json.dumps(payload, ensure_ascii=False), + }, + ], } - config = self._build_generation_config() - if config is not None: - request_kwargs["config"] = config + headers = { + "Authorization": f"Bearer {self.settings.openrouter.api_key}", + "Content-Type": "application/json", + "X-Title": "Video Render - Highlights Detection" + } - return self.client.models.generate_content(**request_kwargs) + logger.info(f"Calling OpenRouter with model: {self.settings.openrouter.model}") + logger.debug(f"Request payload keys: transcript_length={len(payload['transcript'])}, segments_count={len(payload['segments'])}") - def _build_generation_config(self) -> Optional[genai_types.GenerateContentConfig]: - config_kwargs: Dict[str, Any] = {} - if self.settings.gemini.temperature is not None: - config_kwargs["temperature"] = self.settings.gemini.temperature - if self.settings.gemini.top_p is not None: - config_kwargs["top_p"] = self.settings.gemini.top_p - if self.settings.gemini.top_k is not None: - config_kwargs["top_k"] = self.settings.gemini.top_k + # Retry configuration for rate limits (especially free tier) + max_retries = 5 + base_delay = 5 # Start with 5s delay - if not config_kwargs: - return None + for attempt in range(max_retries): + try: + response = requests.post( + url=OPENROUTER_ENDPOINT, + data=json.dumps(body), + headers=headers, + timeout=120, + ) + response.raise_for_status() + data = response.json() + break - return genai_types.GenerateContentConfig(**config_kwargs) + except requests.exceptions.HTTPError as exc: + if exc.response.status_code == 429: + if attempt < max_retries - 1: + # Exponential backoff: 5s, 10s, 20s, 40s, 80s + delay = base_delay * (2 ** attempt) + logger.warning(f"Rate limit atingido (429). Aguardando {delay}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})") + time.sleep(delay) + continue + else: + logger.error("Rate limit atingido apos todas as tentativas") + logger.error("Solucao: Use um modelo pago ou adicione creditos na OpenRouter") + raise RuntimeError("OpenRouter rate limit excedido") from exc + else: + logger.error(f"OpenRouter API request falhou com status {exc.response.status_code}: {exc}") + raise RuntimeError("OpenRouter API request falhou") from exc - @staticmethod - def _extract_response_text(response: Any) -> str: - text = getattr(response, "text", None) - if text: - return str(text).strip() + except Exception as exc: + logger.error("OpenRouter API request falhou: %s", exc) + raise RuntimeError("OpenRouter API request falhou") from exc - candidates = getattr(response, "candidates", None) or [] - for candidate in candidates: - content = getattr(candidate, "content", None) - if not content: + # Debug: log response structure + logger.info(f"OpenRouter response keys: {list(data.keys())}") + if "error" in data: + logger.error(f"OpenRouter API error: {data.get('error')}") + raise RuntimeError(f"OpenRouter API error: {data.get('error')}") + + choices = data.get("choices") or [] + if not choices: + logger.error(f"OpenRouter response completa: {json.dumps(data, indent=2)}") + raise RuntimeError("OpenRouter nao retornou escolhas") + + message = choices[0].get("message", {}).get("content") + if not message: + raise RuntimeError("Resposta do OpenRouter sem conteudo") + + parsed = self._extract_json(message) + highlights = parsed.get("highlights") + if not isinstance(highlights, list): + raise ValueError("Resposta do OpenRouter invalida: campo 'highlights' ausente") + + valid_highlights = [] + for highlight in highlights: + try: + start = float(highlight.get("start", 0)) + end = float(highlight.get("end", 0)) + summary = str(highlight.get("summary", "")).strip() + + if start < 0 or end < 0: + logger.warning(f"Highlight ignorado: timestamps negativos (start={start}, end={end})") + continue + + if end <= start: + logger.warning(f"Highlight ignorado: end <= start (start={start}, end={end})") + continue + + duration = end - start + if duration < 45: + logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)") + continue + + if duration > 120: + logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 120s)") + continue + + if not summary: + logger.warning(f"Highlight ignorado: summary vazio") + continue + + valid_highlights.append({ + "start": start, + "end": end, + "summary": summary + }) + + except (TypeError, ValueError) as e: + logger.warning(f"Highlight invalido ignorado: {highlight} - {e}") continue - parts = getattr(content, "parts", None) or [] - for part in parts: - part_text = getattr(part, "text", None) - if part_text: - return str(part_text).strip() - raise RuntimeError("Resposta do Gemini sem texto") + if not valid_highlights: + logger.warning("Nenhum highlight valido retornado pelo OpenRouter") + total_duration = 75.0 + if transcription.segments: + total_duration = max(seg.end for seg in transcription.segments) - @staticmethod - def _extract_json(response_text: str) -> Dict: - try: - return json.loads(response_text) - except json.JSONDecodeError: - start = response_text.find("{") - end = response_text.rfind("}") - if start == -1 or end == -1: - raise - subset = response_text[start : end + 1] - return json.loads(subset) + fallback_end = min(75.0, total_duration) + if fallback_end < 60.0: + fallback_end = min(60.0, total_duration) + return [{ + "start": 0.0, + "end": fallback_end, + "summary": "Trecho inicial do video (fallback automatico)" + }] -class OpenRouterCopywriter: - def __init__(self, settings: Settings) -> None: - if not settings.openrouter.api_key: - raise RuntimeError("OPENROUTER_API_KEY nao foi definido") - self.settings = settings + logger.info(f"OpenRouter retornou {len(valid_highlights)} highlights validos") + return valid_highlights def generate_titles(self, highlights: List[Dict]) -> List[str]: if not highlights: diff --git a/video_render/media.py b/video_render/media.py index 7fb878e..d99a71d 100644 --- a/video_render/media.py +++ b/video_render/media.py @@ -35,11 +35,29 @@ class MediaPreparer: sanitized_name = sanitize_filename(Path(filename).stem) workspace_dir = ensure_workspace(self.settings.videos_dir, sanitized_name) + transcription_json = workspace_dir / "transcription.json" + transcription_txt = workspace_dir / "transcription.txt" + temp_transcription_json = None + temp_transcription_txt = None + + if transcription_json.exists(): + temp_transcription_json = workspace_dir.parent / f".{sanitized_name}_transcription.json.tmp" + shutil.copy2(transcription_json, temp_transcription_json) + if transcription_txt.exists(): + temp_transcription_txt = workspace_dir.parent / f".{sanitized_name}_transcription.txt.tmp" + shutil.copy2(transcription_txt, temp_transcription_txt) + existing_children = list(workspace_dir.iterdir()) if existing_children: logger.info("Limpando workspace existente para %s", sanitized_name) remove_paths(existing_children) + if temp_transcription_json and temp_transcription_json.exists(): + shutil.move(str(temp_transcription_json), str(transcription_json)) + logger.info("Transcrição preservada em %s", transcription_json) + if temp_transcription_txt and temp_transcription_txt.exists(): + shutil.move(str(temp_transcription_txt), str(transcription_txt)) + destination_name = f"{sanitized_name}{source_path.suffix.lower()}" working_video_path = workspace_dir / destination_name shutil.copy2(source_path, working_video_path) diff --git a/video_render/pipeline.py b/video_render/pipeline.py index 3c4f348..0357788 100644 --- a/video_render/pipeline.py +++ b/video_render/pipeline.py @@ -6,7 +6,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional from video_render.config import Settings -from video_render.llm import GeminiHighlighter, OpenRouterCopywriter +from video_render.llm import OpenRouterCopywriter from video_render.media import MediaPreparer, VideoWorkspace from video_render.transcription import TranscriptionResult, TranscriptionService from video_render.utils import remove_paths, sanitize_filename @@ -55,8 +55,7 @@ class VideoPipeline: self.settings = settings self.media_preparer = MediaPreparer(settings) self.transcriber = TranscriptionService(settings) - self.highlighter = GeminiHighlighter(settings) - self.copywriter = OpenRouterCopywriter(settings) + self.llm_service = OpenRouterCopywriter(settings) # Using OpenRouter for both highlights and titles self.renderer = VideoRenderer(settings) def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]: @@ -65,12 +64,11 @@ class VideoPipeline: self._prepare_workspace(context) self._generate_transcription(context) self._determine_highlights(context) - self._generate_titles(context) self._render_clips(context) + return self._build_success_payload(context) except Exception as exc: logger.exception("Falha ao processar vídeo %s", context.job.filename) - # return self._handle_failure(context, exc) def _parse_job(self, message: Dict[str, Any]) -> JobMessage: filename = message.get("filename") @@ -102,7 +100,10 @@ class VideoPipeline: context.transcription = existing return - transcription = self.transcriber.transcribe(context.workspace.audio_path) + transcription = self.transcriber.transcribe( + context.workspace.audio_path, + output_dir=context.workspace.workspace_dir + ) TranscriptionService.persist(transcription, context.workspace.workspace_dir) context.transcription = transcription @@ -111,10 +112,10 @@ class VideoPipeline: raise RuntimeError("Transcricao nao disponivel") try: - highlights_raw = self.highlighter.generate_highlights(context.transcription) + highlights_raw = self.llm_service.generate_highlights(context.transcription) except Exception: logger.exception( - "Falha ao gerar destaques com Gemini; aplicando fallback padrao." + "Falha ao gerar destaques com OpenRouter; aplicando fallback padrao." ) context.highlight_windows = [self._build_fallback_highlight(context)] return @@ -130,11 +131,13 @@ class VideoPipeline: continue summary = str(item.get("summary", "")).strip() + title = str(item.get("title", summary[:60])).strip() + if end <= start: logger.debug("Highlight com intervalo invalido ignorado: %s", item) continue - windows.append(HighlightWindow(start=start, end=end, summary=summary)) + windows.append(HighlightWindow(start=start, end=end, summary=summary, title=title)) if not windows: windows.append(self._build_fallback_highlight(context)) @@ -142,17 +145,12 @@ class VideoPipeline: context.highlight_windows = windows def _generate_titles(self, context: PipelineContext) -> None: - if not context.highlight_windows: - return + """DEPRECATED: Titles are now generated together with highlights. - highlight_dicts = [ - {"start": window.start, "end": window.end, "summary": window.summary} - for window in context.highlight_windows - ] - titles = self.copywriter.generate_titles(highlight_dicts) - - for window, title in zip(context.highlight_windows, titles): - window.title = title.strip() + This method is kept for backwards compatibility but does nothing. + Titles are extracted from highlights in _determine_highlights(). + """ + pass def _build_fallback_highlight(self, context: PipelineContext) -> HighlightWindow: if not context.transcription: @@ -167,6 +165,7 @@ class VideoPipeline: start=0.0, end=max(last_end, 10.0), summary="Sem destaque identificado; fallback automatico.", + title="Confira este momento", ) def _render_clips(self, context: PipelineContext) -> None: diff --git a/video_render/rendering.py b/video_render/rendering.py index 1a80b9a..ae69813 100644 --- a/video_render/rendering.py +++ b/video_render/rendering.py @@ -15,6 +15,7 @@ from PIL import Image, ImageColor, ImageDraw, ImageFont from video_render.config import Settings from video_render.transcription import TranscriptionResult, WordTiming +from video_render.smart_framing import SmartFramer, extract_audio_samples logger = logging.getLogger(__name__) @@ -54,7 +55,41 @@ class CaptionBuilder: self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0] def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]: - grouped = self._group_words(words) + # Filter out empty, whitespace-only, or very short words (likely noise) + valid_words = [ + w for w in words + if w.word + and w.word.strip() + and len(w.word.strip()) >= 2 # At least 2 characters + and not w.word.strip() in ['...', '..', '.', ',', '-', 'hmm', 'hm', 'ah', 'eh', 'uh'] # Not just punctuation or filler + ] + + # Note: We don't filter out words based on gaps here + # Gap detection is handled in _group_words_with_gaps + # This ensures captions disappear during silence naturally + filtered_words = valid_words + + # Calculate speech density (words per second) + # If density is too low, it's likely just noise/silence being misinterpreted + if filtered_words: + first_word_time = filtered_words[0].start + last_word_time = filtered_words[-1].end + duration = last_word_time - first_word_time + + if duration > 0: + words_per_second = len(filtered_words) / duration + # Typical speech is 2-3 words per second + # If less than 0.5 words/second, it's probably silence/noise + if words_per_second < 0.5: + logger.debug(f"Captions suprimidas: densidade muito baixa ({words_per_second:.2f} palavras/seg)") + return [] + + # Only show captions if we have at least 3 valid words (reduced from 5 for 2-word groups) + # This prevents showing captions for noise/mumbling + if len(filtered_words) < 3: + return [] + + grouped = self._group_words_with_gaps(filtered_words) clip_sets: List[CaptionClipSet] = [] for group in grouped: @@ -101,6 +136,92 @@ class CaptionBuilder: if len(widths) > 1: total_width += self.space_width * (len(widths) - 1) + # Check if text needs to wrap to multiple lines + # If total width exceeds canvas width, break into 2 lines + needs_wrap = total_width > self.canvas_width + + if needs_wrap: + # Split into 2 lines - try to balance the lines + mid_point = len(texts) // 2 + line1_texts = texts[:mid_point] + line2_texts = texts[mid_point:] + line1_widths = widths[:mid_point] + line2_widths = widths[mid_point:] + + # Calculate widths for each line + line1_width = sum(line1_widths) + if len(line1_widths) > 1: + line1_width += self.space_width * (len(line1_widths) - 1) + + line2_width = sum(line2_widths) + if len(line2_widths) > 1: + line2_width += self.space_width * (len(line2_widths) - 1) + + # Double the canvas height for 2 lines + canvas_height = self.canvas_height * 2 + base_image = Image.new("RGBA", (self.canvas_width, canvas_height), (0, 0, 0, 0)) + base_draw = ImageDraw.Draw(base_image) + highlight_images: List[Image.Image] = [] + + # Stroke settings: 8px black stroke for better readability + stroke_width = 8 + stroke_color = (0, 0, 0, 255) # Black + + # Draw line 1 + x = max(0, (self.canvas_width - line1_width) // 2) + y = self.baseline + for i, (text, width) in enumerate(zip(line1_texts, line1_widths)): + base_draw.text( + (x, y), + text, + font=self.font, + fill=self.base_color, + stroke_width=stroke_width, + stroke_fill=stroke_color + ) + + highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) + highlight_draw = ImageDraw.Draw(highlight_image) + highlight_draw.text( + (x, y), + text, + font=self.font, + fill=self.highlight_color, + stroke_width=stroke_width, + stroke_fill=stroke_color + ) + highlight_images.append(highlight_image) + x += width + self.space_width + + # Draw line 2 + x = max(0, (self.canvas_width - line2_width) // 2) + y = self.baseline + self.text_height + 5 # 5px spacing between lines + for i, (text, width) in enumerate(zip(line2_texts, line2_widths)): + base_draw.text( + (x, y), + text, + font=self.font, + fill=self.base_color, + stroke_width=stroke_width, + stroke_fill=stroke_color + ) + + highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) + highlight_draw = ImageDraw.Draw(highlight_image) + highlight_draw.text( + (x, y), + text, + font=self.font, + fill=self.highlight_color, + stroke_width=stroke_width, + stroke_fill=stroke_color + ) + highlight_images.append(highlight_image) + x += width + self.space_width + + return base_image, highlight_images + + # Single line rendering (original code) start_x = max(0, (self.canvas_width - total_width) // 2) base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0)) @@ -108,13 +229,31 @@ class CaptionBuilder: highlight_images: List[Image.Image] = [] x = start_x - for text, width in zip(texts, widths): - base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color) + # Stroke settings: 8px black stroke for better readability + stroke_width = 8 + stroke_color = (0, 0, 0, 255) # Black + for text, width in zip(texts, widths): + # Draw base text with stroke + base_draw.text( + (x, self.baseline), + text, + font=self.font, + fill=self.base_color, + stroke_width=stroke_width, + stroke_fill=stroke_color + ) + + # Draw highlight text with stroke highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) highlight_draw = ImageDraw.Draw(highlight_image) highlight_draw.text( - (x, self.baseline), text, font=self.font, fill=self.highlight_color + (x, self.baseline), + text, + font=self.font, + fill=self.highlight_color, + stroke_width=stroke_width, + stroke_fill=stroke_color ) highlight_images.append(highlight_image) @@ -153,6 +292,44 @@ class CaptionBuilder: return grouped + def _group_words_with_gaps(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]: + """ + Group words into 2-word chunks, respecting silence gaps. + Creates natural breaks where there are pauses > 1.5s + """ + if not words: + return [] + + grouped: List[List[WordTiming]] = [] + buffer: List[WordTiming] = [] + + for i, word in enumerate(words): + # Check if there's a long pause before this word + if i > 0: + gap = word.start - words[i-1].end + # If gap > 1.5s, finish current buffer and start new group + if gap > 1.5: + if buffer: + grouped.append(buffer) + buffer = [] + + buffer.append(word) + + # Group into 2 words maximum + if len(buffer) == 2: + grouped.append(buffer) + buffer = [] + + # Handle remaining words + if buffer: + if len(buffer) == 1 and grouped: + # Add single remaining word to last group + grouped[-1].append(buffer[0]) + else: + grouped.append(buffer) + + return [grp for grp in grouped if grp] + @staticmethod def _clean_word(text: str) -> str: text = text.strip() @@ -164,6 +341,12 @@ class VideoRenderer: def __init__(self, settings: Settings) -> None: self.settings = settings self.captions = CaptionBuilder(settings) + self.smart_framer = SmartFramer( + target_width=settings.rendering.frame_width, + target_height=settings.rendering.frame_height, + frame_skip=settings.rendering.smart_framing_frame_skip, + smoothing_window=settings.rendering.smart_framing_smoothing_window + ) def render( self, @@ -234,26 +417,100 @@ class VideoRenderer: duration = end - start frame_w = self.settings.rendering.frame_width frame_h = self.settings.rendering.frame_height - top_h = int(frame_h * 0.18) + # Removed top panel - no longer showing title bottom_h = int(frame_h * 0.20) - video_area_h = max(1, frame_h - top_h - bottom_h) - scale_factor = min( - frame_w / subclip.w, - video_area_h / subclip.h, - ) - resized_clip = subclip.resized(scale_factor) - video_y = top_h + (video_area_h - resized_clip.h) // 2 - video_clip = resized_clip.with_position( - ((frame_w - resized_clip.w) // 2, video_y) - ) + # Use smart framing to create intelligent 9:16 video (if enabled) + if self.settings.rendering.enable_smart_framing: + logger.info(f"Creating smart framing plan for clip {index} ({start:.2f}s - {end:.2f}s)") + + try: + # Extract audio for speech detection + audio_samples = extract_audio_samples(source_path, start, end) + + # Create framing plan + framing_plan = self.smart_framer.create_framing_plan( + video_path=source_path, + start_time=start, + end_time=end, + audio_samples=audio_samples + ) + + # Apply smart framing based on detected layout + use_split_screen = framing_plan.layout_mode in ["dual_split", "grid"] + video_clip = self.smart_framer.apply_framing( + video_clip=subclip, + framing_plan=framing_plan, + use_split_screen=use_split_screen + ) + + logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, " + f"faces_detected={len(framing_plan.frame_contexts[0].detected_faces) if framing_plan.frame_contexts else 0}") + + except Exception as exc: + logger.warning(f"Smart framing failed for clip {index}, falling back to center crop: {exc}", exc_info=True) + + # Fallback to center crop (maintains aspect ratio, crops to fit) + video_area_h = max(1, frame_h - bottom_h) + + # Use MAX to ensure video covers entire area (will crop excess) + scale_factor = max( + frame_w / subclip.w, + video_area_h / subclip.h, + ) + + # Resize to cover area + resized_clip = subclip.resized(scale_factor) + + # Calculate crop region (center crop) + crop_x1 = max(0, (resized_clip.w - frame_w) // 2) + crop_y1 = max(0, (resized_clip.h - video_area_h) // 2) + crop_x2 = crop_x1 + frame_w + crop_y2 = crop_y1 + video_area_h + + # Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2) + cropped_clip = resized_clip.cropped( + x1=crop_x1, + y1=crop_y1, + x2=crop_x2, + y2=crop_y2 + ) + + video_clip = cropped_clip.with_position((0, 0)) + resized_clip.close() + else: + # Use center crop (smart framing disabled) + logger.info(f"Using center crop for clip {index} (smart framing disabled)") + video_area_h = max(1, frame_h - bottom_h) + + # Use MAX to ensure video covers entire area (will crop excess) + scale_factor = max( + frame_w / subclip.w, + video_area_h / subclip.h, + ) + + # Resize to cover area + resized_clip = subclip.resized(scale_factor) + + # Calculate crop region (center crop) + crop_x1 = max(0, (resized_clip.w - frame_w) // 2) + crop_y1 = max(0, (resized_clip.h - video_area_h) // 2) + crop_x2 = crop_x1 + frame_w + crop_y2 = crop_y1 + video_area_h + + # Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2) + cropped_clip = resized_clip.cropped( + x1=crop_x1, + y1=crop_y1, + x2=crop_x2, + y2=crop_y2 + ) + + video_clip = cropped_clip.with_position((0, 0)) + resized_clip.close() background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration) - top_panel = ( - ColorClip(size=(frame_w, top_h), color=(12, 12, 12)) - .with_duration(duration) - .with_opacity(0.85) - ) + # Removed top panel and title - no longer needed bottom_panel = ( ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12)) .with_position((0, frame_h - bottom_h)) @@ -261,34 +518,42 @@ class VideoRenderer: .with_opacity(0.85) ) - title_clip = self._build_title_clip( - title=title, - summary=summary, - duration=duration, - frame_width=frame_w, - top_panel_height=top_h, - ) - title_clip = title_clip.with_position( - ((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2) - ) - words = self._collect_words(transcription, start, end) - caption_sets = self.captions.build(words, clip_start=start) + + # Calculate speech coverage: how much of the clip has actual speech? + # If less than 30% of the clip has speech, don't show captions + clip_duration = end - start + if words and clip_duration > 0: + # Calculate total time with speech + total_speech_time = sum(w.end - w.start for w in words) + speech_coverage = total_speech_time / clip_duration + + if speech_coverage < 0.3: # Less than 30% speech + logger.debug(f"Captions suprimidas: cobertura de fala baixa ({speech_coverage:.1%})") + words = [] # Clear words to prevent captions + + # Only build captions if there are actual words to display + # This prevents empty/placeholder captions from appearing + caption_sets = self.captions.build(words, clip_start=start) if words else [] caption_clips = [] caption_resources: List[ImageClip] = [] - caption_area_top = frame_h - bottom_h - caption_area_height = bottom_h + + # Position captions 120px below center (for 1920px height, center is 960px, so 1080px) + # This ensures they're visible, well-positioned, and don't interfere with faces + # Range: 100-150px as requested, using 120px for optimal positioning + center_y = frame_h // 2 + caption_y = center_y + 120 caption_margin = 20 - raw_caption_y = caption_area_top + (caption_area_height - self.captions.canvas_height) // 2 - min_caption_y = caption_area_top + caption_margin - max_caption_y = ( - caption_area_top + caption_area_height - self.captions.canvas_height - caption_margin - ) + + # Ensure captions stay within reasonable bounds (no top panel now) + min_caption_y = caption_margin + max_caption_y = frame_h - bottom_h - self.captions.canvas_height - caption_margin + if max_caption_y < min_caption_y: caption_y = min_caption_y else: - caption_y = min(max(raw_caption_y, min_caption_y), max_caption_y) + caption_y = min(max(caption_y, min_caption_y), max_caption_y) for clip_set in caption_sets: base_positioned = clip_set.base.with_position(("center", caption_y)) @@ -299,30 +564,20 @@ class VideoRenderer: caption_clips.append(positioned) caption_resources.append(highlight) - if not caption_clips: - fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160) - caption_clips.append( - self._make_textclip( - text=fallback_text, - font_path=self.settings.rendering.font_path, - font_size=self.settings.rendering.subtitle_font_size, - color=self.settings.rendering.base_color, - size=(frame_w - 160, max(40, self.captions.canvas_height)), - ) - .with_duration(duration) - .with_position(("center", caption_y)) - ) + # No fallback captions - if there are no dynamic captions, show nothing + # This matches Opus Clip behavior where captions only appear when there's actual speech audio_clip, audio_needs_close = self._materialize_audio( source_path=source_path, start=start, end=end, duration=duration, - fallback_audio=video_clip.audio or resized_clip.audio or subclip.audio, + fallback_audio=video_clip.audio or subclip.audio, ) + # Composite with background, bottom panel, video, and captions only (no top panel or title) composite = CompositeVideoClip( - [background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips], + [background, bottom_panel, video_clip, *caption_clips], size=(frame_w, frame_h), ) if audio_clip is not None: @@ -337,11 +592,8 @@ class VideoRenderer: ) composite.close() - resized_clip.close() video_clip.close() - title_clip.close() background.close() - top_panel.close() bottom_panel.close() for clip in caption_clips: clip.close() @@ -352,95 +604,6 @@ class VideoRenderer: return str(output_path) - def _build_title_clip( - self, - *, - title: str, - summary: str, - duration: float, - frame_width: int, - top_panel_height: int, - ) -> ImageClip: - text = (title or summary or "").strip() - if not text: - text = summary or "" - - max_width = max(200, frame_width - 160) - font_size = self.settings.rendering.title_font_size - min_font_size = max(28, int(font_size * 0.6)) - target_height = max(80, top_panel_height - 40) - title_color = ImageColor.getrgb(self.settings.rendering.base_color) - font_path = self.settings.rendering.font_path - - while True: - font = ImageFont.truetype(str(font_path), font_size) - lines = self._split_title_lines(text, font, max_width) - line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1] - spacing = max(4, int(line_height * 0.25)) - text_height = self._measure_text_height(len(lines), line_height, spacing) - - if text_height <= target_height or font_size <= min_font_size: - break - - font_size = max(min_font_size, font_size - 6) - - # Recompute dimensions with final font size to ensure consistency - font = ImageFont.truetype(str(font_path), font_size) - lines = self._split_title_lines(text, font, max_width) - line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1] - spacing = max(4, int(line_height * 0.25)) - text_height = self._measure_text_height(len(lines), line_height, spacing) - canvas_height = max(1, text_height) - - image = Image.new("RGBA", (max_width, canvas_height), (0, 0, 0, 0)) - draw = ImageDraw.Draw(image) - y = 0 - for idx, line in enumerate(lines): - bbox = font.getbbox(line) - line_width = bbox[2] - bbox[0] - x = max(0, (max_width - line_width) // 2) - draw.text((x, y - bbox[1]), line, font=font, fill=title_color) - y += line_height - if idx < len(lines) - 1: - y += spacing - - return ImageClip(np.array(image)).with_duration(duration) - - @staticmethod - def _measure_text_height(line_count: int, line_height: int, spacing: int) -> int: - if line_count <= 0: - return line_height - return line_count * line_height + max(0, line_count - 1) * spacing - - @staticmethod - def _split_title_lines( - text: str, font: ImageFont.FreeTypeFont, max_width: int - ) -> List[str]: - words = text.split() - if not words: - return [""] - - lines: List[str] = [] - current: List[str] = [] - for word in words: - test_line = " ".join(current + [word]) if current else word - bbox = font.getbbox(test_line) - line_width = bbox[2] - bbox[0] - if line_width <= max_width or not current: - current.append(word) - if line_width > max_width and not current[:-1]: - lines.append(" ".join(current)) - current = [] - continue - - lines.append(" ".join(current)) - current = [word] - - if current: - lines.append(" ".join(current)) - - return lines - def _materialize_audio( self, *, diff --git a/video_render/smart_framing.py b/video_render/smart_framing.py new file mode 100644 index 0000000..76087ba --- /dev/null +++ b/video_render/smart_framing.py @@ -0,0 +1,687 @@ +""" +Smart framing module for intelligent video cropping and composition. + +This module provides functionality to create 9:16 vertical videos with +intelligent framing that follows the action and speakers. +""" +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import List, Optional, Tuple + +import cv2 +import numpy as np +from moviepy.video.VideoClip import VideoClip +from moviepy.video.io.VideoFileClip import VideoFileClip +from scipy import signal + +from video_render.context_detection import ContextAnalyzer, FrameContext, FaceDetection + +logger = logging.getLogger(__name__) + + +@dataclass +class CropRegion: + """Defines a crop region for a frame.""" + x: int + y: int + width: int + height: int + + +@dataclass +class FramingPlan: + """Complete framing plan for a video segment.""" + frame_contexts: List[FrameContext] + crop_regions: List[CropRegion] + layout_mode: str + fps: float + + +class SmartFramer: + """Creates intelligent 9:16 framing for horizontal videos.""" + + def __init__( + self, + target_width: int = 1080, + target_height: int = 1920, + frame_skip: int = 2, + smoothing_window: int = 15 + ): + self.target_width = target_width + self.target_height = target_height + self.target_aspect = target_height / target_width + + # Performance parameters + self.frame_skip = frame_skip # Process every Nth frame (CPU optimization) + + # Smoothing parameters + self.smoothing_window = smoothing_window + self.max_velocity = 30 # pixels per frame (reduced for smoother transitions) + + logger.info(f"Smart framer initialized (target: {target_width}x{target_height}, frame_skip={frame_skip})") + + def create_framing_plan( + self, + video_path: str, + start_time: float, + end_time: float, + audio_samples: Optional[np.ndarray] = None + ) -> FramingPlan: + """ + Analyze video and create a complete framing plan. + + Args: + video_path: Path to video file + start_time: Start time in seconds + end_time: End time in seconds + audio_samples: Optional audio samples for speech detection + + Returns: + FramingPlan with all frame contexts and crop regions + """ + analyzer = ContextAnalyzer() + + # Detect speaking periods from audio if available + speaking_periods = None + if audio_samples is not None: + speaking_periods = analyzer.audio_detector.detect_speaking_periods(audio_samples) + + # Open video with error suppression for AV1 codec warnings + import os + os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet' + + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + + # Calculate frame range + start_frame = int(start_time * fps) + end_frame = int(end_time * fps) + + # Set to start frame + cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) + + frame_contexts = [] + frame_number = start_frame + processed_count = 0 + + logger.info(f"Analyzing frames {start_frame} to {end_frame} (fps={fps}, skip={self.frame_skip})") + + while frame_number < end_frame: + ret, frame = cap.read() + if not ret: + break + + # Only process every Nth frame for performance (CPU optimization) + if processed_count % self.frame_skip == 0: + timestamp = frame_number / fps + context = analyzer.analyze_frame(frame, timestamp, frame_number, speaking_periods) + frame_contexts.append(context) + + frame_number += 1 + processed_count += 1 + + # Get video dimensions before releasing capture + source_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + source_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + cap.release() + analyzer.close() + + # Determine overall layout mode (most common) + layout_modes = [ctx.layout_mode for ctx in frame_contexts] + if layout_modes: + overall_layout = max(set(layout_modes), key=layout_modes.count) + else: + overall_layout = "single" + + # Calculate crop regions based on contexts + + crop_regions = self._calculate_crop_regions( + frame_contexts, + source_width, + source_height + ) + + return FramingPlan( + frame_contexts=frame_contexts, + crop_regions=crop_regions, + layout_mode=overall_layout, + fps=fps + ) + + def _calculate_crop_regions( + self, + contexts: List[FrameContext], + source_width: int, + source_height: int + ) -> List[CropRegion]: + """ + Calculate smooth crop regions for each frame. + + Args: + contexts: List of frame contexts + source_width: Source video width + source_height: Source video height + + Returns: + List of crop regions + """ + if not contexts: + return [] + + # Calculate ideal crop dimensions maintaining EXACT 9:16 aspect ratio + source_aspect = source_width / source_height + + if source_aspect > self.target_aspect: + # Source is wider - crop horizontally (use full height) + crop_height = source_height + crop_width = int(crop_height / self.target_aspect) + + # Ensure crop width fits within source + if crop_width > source_width: + crop_width = source_width + crop_height = int(crop_width * self.target_aspect) + else: + # Source is taller - crop vertically (use full width) + crop_width = source_width + crop_height = int(crop_width * self.target_aspect) + + # Ensure crop height fits within source + if crop_height > source_height: + crop_height = source_height + crop_width = int(crop_height / self.target_aspect) + + # Calculate center points for each frame + # Since we now always focus on ONE person directly (not averaging), + # we can use the focus point directly without complex validation + center_xs = [] + center_ys = [] + + for ctx in contexts: + if ctx.primary_focus: + # Primary focus is now always a single person's center, never averaged + # This means it will never be on the table/empty space + center_xs.append(ctx.primary_focus[0]) + center_ys.append(ctx.primary_focus[1]) + else: + # Default to center only if no faces detected at all + center_xs.append(source_width // 2) + center_ys.append(source_height // 2) + + # Smooth the center points + if len(center_xs) > self.smoothing_window: + kernel_size = min(self.smoothing_window, len(center_xs)) + if kernel_size % 2 == 0: + kernel_size -= 1 + + center_xs = signal.medfilt(center_xs, kernel_size=kernel_size).tolist() + center_ys = signal.medfilt(center_ys, kernel_size=kernel_size).tolist() + + # Limit velocity (prevent jarring movements) + center_xs = self._limit_velocity(center_xs, self.max_velocity) + center_ys = self._limit_velocity(center_ys, self.max_velocity) + + # Convert to crop regions + crop_regions = [] + for center_x, center_y in zip(center_xs, center_ys): + # Calculate top-left corner + x = int(center_x - crop_width // 2) + y = int(center_y - crop_height // 2) + + # Clamp to valid bounds + x = max(0, min(x, source_width - crop_width)) + y = max(0, min(y, source_height - crop_height)) + + crop_regions.append(CropRegion( + x=x, + y=y, + width=crop_width, + height=crop_height + )) + + return crop_regions + + def _limit_velocity(self, positions: List[float], max_velocity: float) -> List[float]: + """ + Limit the velocity of position changes. + + Args: + positions: List of positions + max_velocity: Maximum allowed change per frame + + Returns: + Smoothed positions + """ + if len(positions) <= 1: + return positions + + limited = [positions[0]] + + for i in range(1, len(positions)): + delta = positions[i] - limited[i - 1] + if abs(delta) > max_velocity: + delta = max_velocity if delta > 0 else -max_velocity + + limited.append(limited[i - 1] + delta) + + return limited + + def apply_framing( + self, + video_clip: VideoFileClip, + framing_plan: FramingPlan, + use_split_screen: bool = False + ) -> VideoClip: + """ + Apply smart framing to a video clip. + + Args: + video_clip: Source video clip + framing_plan: Framing plan to apply + use_split_screen: Whether to use split screen for multiple people + + Returns: + Reframed video clip + """ + # Handle different layout modes + if framing_plan.layout_mode in ["single", "single_speaker"]: + # Single person or single speaker - use focused single framing + return self._apply_single_framing(video_clip, framing_plan) + elif framing_plan.layout_mode == "dual_split" and use_split_screen: + # Two people in conversation - use split screen + return self._apply_split_screen(video_clip, framing_plan) + elif framing_plan.layout_mode == "grid" and use_split_screen: + # 3+ people - use grid layout + return self._apply_grid_layout(video_clip, framing_plan) + else: + # Fallback to single framing + return self._apply_single_framing(video_clip, framing_plan) + + def _apply_single_framing( + self, + video_clip: VideoFileClip, + framing_plan: FramingPlan + ) -> VideoClip: + """ + Apply single-focus framing (following one person or action). + + Args: + video_clip: Source video clip + framing_plan: Framing plan + + Returns: + Reframed video clip + """ + def make_frame(t): + # Get the original frame + frame = video_clip.get_frame(t) + + # Ensure we have valid crop regions + if not framing_plan.crop_regions: + # Fallback: return center crop + h, w = frame.shape[:2] + crop_h = int(w * self.target_aspect) + crop_w = w + if crop_h > h: + crop_h = h + crop_w = int(h / self.target_aspect) + y = (h - crop_h) // 2 + x = (w - crop_w) // 2 + cropped = frame[y:y + crop_h, x:x + crop_w] + else: + # Calculate exact frame index with decimal precision for interpolation + exact_frame_idx = (t * framing_plan.fps) / self.frame_skip + + # Get the two adjacent analyzed frames + idx_floor = int(exact_frame_idx) + idx_ceil = idx_floor + 1 + + # Interpolation factor (0.0 to 1.0) + alpha = exact_frame_idx - idx_floor + + # Clamp indices to valid range + idx_floor = max(0, min(idx_floor, len(framing_plan.crop_regions) - 1)) + idx_ceil = max(0, min(idx_ceil, len(framing_plan.crop_regions) - 1)) + + # Get crop regions + crop1 = framing_plan.crop_regions[idx_floor] + crop2 = framing_plan.crop_regions[idx_ceil] + + # Linear interpolation between crop regions + x = int(crop1.x * (1 - alpha) + crop2.x * alpha) + y = int(crop1.y * (1 - alpha) + crop2.y * alpha) + width = int(crop1.width * (1 - alpha) + crop2.width * alpha) + height = int(crop1.height * (1 - alpha) + crop2.height * alpha) + + # Ensure crop stays within frame bounds + h, w = frame.shape[:2] + x = max(0, min(x, w - width)) + y = max(0, min(y, h - height)) + width = min(width, w - x) + height = min(height, h - y) + + # Crop the frame + cropped = frame[y:y + height, x:x + width] + + # Resize to target dimensions + resized = cv2.resize( + cropped, + (self.target_width, self.target_height), + interpolation=cv2.INTER_LINEAR + ) + + return resized + + # MoviePy 2.x compatible way to create VideoClip + new_clip = VideoClip(duration=video_clip.duration) + new_clip.size = (self.target_width, self.target_height) + new_clip.frame_function = make_frame + return new_clip + + def _apply_split_screen( + self, + video_clip: VideoFileClip, + framing_plan: FramingPlan + ) -> VideoClip: + """ + Apply split screen for two people. + + Args: + video_clip: Source video clip + framing_plan: Framing plan + + Returns: + Split screen video clip + """ + def make_frame(t): + frame = video_clip.get_frame(t) + # Calculate exact frame index with decimal precision for smooth interpolation + exact_frame_idx = (t * framing_plan.fps) / self.frame_skip + frame_idx = int(exact_frame_idx) + + # Ensure we have valid contexts + if not framing_plan.frame_contexts: + # Fallback to simple center crop + h, w = frame.shape[:2] + crop_h = int(w * self.target_aspect) + crop_w = w + if crop_h > h: + crop_h = h + crop_w = int(h / self.target_aspect) + y = (h - crop_h) // 2 + x = (w - crop_w) // 2 + cropped = frame[y:y + crop_h, x:x + crop_w] + return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR) + + # Clamp index to valid range + frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1)) + context = framing_plan.frame_contexts[frame_idx] + + # Create output frame + output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8) + + if len(context.detected_faces) >= 2: + # Split vertically 50/50 (two columns) + half_width = self.target_width // 2 + + # Select the 2 most relevant faces + # Priority: ALWAYS show active speaker first + most confident other person + if context.active_speakers and len(context.active_speakers) >= 1: + # Get the PRIMARY speaker (most confident among active speakers) + speaker_faces = [context.detected_faces[i] for i in context.active_speakers + if i < len(context.detected_faces)] + + primary_speaker = max(speaker_faces, key=lambda f: f.confidence) + + # Get OTHER faces (not the primary speaker) + other_faces = [f for f in context.detected_faces if f != primary_speaker] + + if len(speaker_faces) >= 2: + # Multiple speakers: show primary + second most confident speaker + other_speakers = [f for f in speaker_faces if f != primary_speaker] + secondary_person = max(other_speakers, key=lambda f: f.confidence) + elif other_faces: + # One speaker: show speaker + most confident other person + secondary_person = max(other_faces, key=lambda f: f.confidence) + else: + # Fallback: only one person detected + secondary_person = primary_speaker + + selected_faces = [primary_speaker, secondary_person] + else: + # No speakers: take 2 most confident faces + selected_faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:2] + + # Sort selected faces by horizontal position for consistent left/right placement + faces = sorted(selected_faces, key=lambda f: f.center_x) + left_face = faces[0] + right_face = faces[1] + + # Process each person's frame + for idx, face in enumerate([left_face, right_face]): + # Calculate crop region focused on this person + # Each person gets half the width, full target aspect ratio (9:16) + # This ensures NO distortion when resizing + + # For split screen: each side is half_width x full_height + # We need to maintain 9:16 aspect for each half + half_width = self.target_width // 2 + half_aspect = self.target_height / half_width # Aspect ratio for half + + # Determine crop size based on face with padding + face_width = max(face.width, frame.shape[1] // 4) # At least 1/4 of frame width + crop_width = int(face_width * 2.5) # Add padding around face + crop_height = int(crop_width * half_aspect) # Maintain correct aspect + + # Ensure crop fits in frame, maintaining aspect ratio + max_crop_width = frame.shape[1] // 2 # Half the source width + max_crop_height = frame.shape[0] # Full source height + + # If crop is too wide, scale down proportionally + if crop_width > max_crop_width: + crop_width = max_crop_width + crop_height = int(crop_width * half_aspect) + + # If crop is too tall, scale down proportionally + if crop_height > max_crop_height: + crop_height = max_crop_height + crop_width = int(crop_height / half_aspect) + + # Center crop on face + x = max(0, face.center_x - crop_width // 2) + y = max(0, face.center_y - crop_height // 2) + + # Clamp to frame boundaries + x = min(x, frame.shape[1] - crop_width) + y = min(y, frame.shape[0] - crop_height) + + # Extract and resize crop + cropped = frame[y:y + crop_height, x:x + crop_width] + resized = cv2.resize( + cropped, + (half_width, self.target_height), + interpolation=cv2.INTER_LINEAR + ) + + # Place in output at appropriate horizontal position + x_offset = idx * half_width + output[:, x_offset:x_offset + half_width] = resized + else: + # Fall back to single framing + if framing_plan.crop_regions: + crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1)) + crop = framing_plan.crop_regions[crop_idx] + cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width] + else: + # Fallback to center crop if no crop regions available + h, w = frame.shape[:2] + crop_h = int(w * self.target_aspect) + crop_w = w + if crop_h > h: + crop_h = h + crop_w = int(h / self.target_aspect) + y = (h - crop_h) // 2 + x = (w - crop_w) // 2 + cropped = frame[y:y + crop_h, x:x + crop_w] + output = cv2.resize( + cropped, + (self.target_width, self.target_height), + interpolation=cv2.INTER_LINEAR + ) + + return output + + # MoviePy 2.x compatible way to create VideoClip + new_clip = VideoClip(duration=video_clip.duration) + new_clip.size = (self.target_width, self.target_height) + new_clip.frame_function = make_frame + return new_clip + + def _apply_grid_layout( + self, + video_clip: VideoFileClip, + framing_plan: FramingPlan + ) -> VideoClip: + """ + Apply grid layout for 3+ people. + + Args: + video_clip: Source video clip + framing_plan: Framing plan + + Returns: + Grid layout video clip + """ + def make_frame(t): + frame = video_clip.get_frame(t) + # Calculate exact frame index with decimal precision for smooth interpolation + exact_frame_idx = (t * framing_plan.fps) / self.frame_skip + frame_idx = int(exact_frame_idx) + + # Ensure we have valid contexts + if not framing_plan.frame_contexts: + # Fallback to simple center crop + h, w = frame.shape[:2] + crop_h = int(w * self.target_aspect) + crop_w = w + if crop_h > h: + crop_h = h + crop_w = int(h / self.target_aspect) + y = (h - crop_h) // 2 + x = (w - crop_w) // 2 + cropped = frame[y:y + crop_h, x:x + crop_w] + return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR) + + # Clamp index to valid range + frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1)) + context = framing_plan.frame_contexts[frame_idx] + + output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8) + + num_faces = len(context.detected_faces) + + if num_faces >= 3: + # Create 2x2 grid + cell_width = self.target_width // 2 + cell_height = self.target_height // 2 + + for idx, face in enumerate(context.detected_faces[:4]): + # Calculate grid position + row = idx // 2 + col = idx % 2 + + # Each grid cell maintains aspect ratio (square in this case: cell_width = cell_height) + cell_aspect = cell_height / cell_width + + # Crop around face with correct aspect ratio + crop_width = frame.shape[1] // 2 + crop_height = int(crop_width * cell_aspect) + + # Ensure crop fits in frame, maintaining aspect + max_crop_width = frame.shape[1] // 2 + max_crop_height = frame.shape[0] // 2 + + if crop_width > max_crop_width: + crop_width = max_crop_width + crop_height = int(crop_width * cell_aspect) + + if crop_height > max_crop_height: + crop_height = max_crop_height + crop_width = int(crop_height / cell_aspect) + + # Center crop on face + x = max(0, face.center_x - crop_width // 2) + y = max(0, face.center_y - crop_height // 2) + + # Clamp to frame boundaries + x = min(x, frame.shape[1] - crop_width) + y = min(y, frame.shape[0] - crop_height) + + cropped = frame[y:y + crop_height, x:x + crop_width] + resized = cv2.resize( + cropped, + (cell_width, cell_height), + interpolation=cv2.INTER_LINEAR + ) + + # Place in grid + y_offset = row * cell_height + x_offset = col * cell_width + output[y_offset:y_offset + cell_height, x_offset:x_offset + cell_width] = resized + else: + # Fall back to single framing + if framing_plan.crop_regions: + crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1)) + crop = framing_plan.crop_regions[crop_idx] + cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width] + else: + # Fallback to center crop if no crop regions available + h, w = frame.shape[:2] + crop_h = int(w * self.target_aspect) + crop_w = w + if crop_h > h: + crop_h = h + crop_w = int(h / self.target_aspect) + y = (h - crop_h) // 2 + x = (w - crop_w) // 2 + cropped = frame[y:y + crop_h, x:x + crop_w] + output = cv2.resize( + cropped, + (self.target_width, self.target_height), + interpolation=cv2.INTER_LINEAR + ) + + return output + + # MoviePy 2.x compatible way to create VideoClip + new_clip = VideoClip(duration=video_clip.duration) + new_clip.size = (self.target_width, self.target_height) + new_clip.frame_function = make_frame + return new_clip + + +def extract_audio_samples(video_path: str, start_time: float, end_time: float) -> Optional[np.ndarray]: + """ + Extract audio samples from video for speech detection. + + Args: + video_path: Path to video file + start_time: Start time in seconds + end_time: End time in seconds + + Returns: + Audio samples array or None if no audio + """ + try: + from moviepy.audio.io.AudioFileClip import AudioFileClip + + with AudioFileClip(video_path) as audio: + segment = audio.subclipped(start_time, end_time) + fps = getattr(segment, 'fps', 44100) + samples = segment.to_soundarray(fps=fps) + return samples + except Exception as exc: + logger.warning(f"Failed to extract audio: {exc}") + return None diff --git a/video_render/transcription.py b/video_render/transcription.py index a175659..5e748bf 100644 --- a/video_render/transcription.py +++ b/video_render/transcription.py @@ -56,7 +56,14 @@ class TranscriptionService: ) return self._model - def transcribe(self, audio_path: Path) -> TranscriptionResult: + def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult: + if output_dir is not None: + existing_transcription = self.load(output_dir) + if existing_transcription is not None: + logger.info("Transcrição já existe em %s, reutilizando...", output_dir) + return existing_transcription + + logger.info("Iniciando transcrição do áudio com FasterWhisper...") model = self._load_model() segments, _ = model.transcribe( str(audio_path),