28 Commits
feat ... master

Author SHA1 Message Date
LeoMortari
f496663b63 Ajusta presets de render 2026-01-04 03:34:48 -03:00
LeoMortari
e4c5c6adfe Ajusta heartbeat do rabbit 2026-01-03 23:13:27 -03:00
LeoMortari
21d2d19435 Ajusta rabbit config 2026-01-03 19:51:31 -03:00
LeoMortari
3f7329869d Ajusta contexto, falas e foco, tremulação do video e demais bugs 2026-01-03 19:42:23 -03:00
LeoMortari
c1914dad00 Add return de excessao 2026-01-02 11:26:26 -03:00
LeoMortari
07d301f110 Realiza varios ajustes para melhorar o tracking e o render de video 2025-12-18 02:26:25 -03:00
LeoMortari
78e35d65fd Merge branch 'feat' 2025-11-12 11:43:49 -03:00
d737177eab Ajusta 3000k bitrate 2025-08-05 21:23:29 +02:00
6420a02090 revert 2be19ee02c
revert remove bitrate
2025-08-05 21:19:31 +02:00
2be19ee02c remove bitrate 2025-08-05 20:32:07 +02:00
98613a0002 Implementa desacoplamento de I/O 2025-08-05 14:58:44 +02:00
501c45cad7 Ajusta callback 2025-08-05 14:43:12 +02:00
0fd0cda460 Ajusta rabbit 2025-08-05 04:39:03 +02:00
dd4f9fc51c Ajusta rabbitmq 2025-08-05 03:59:08 +02:00
6288d77d46 Ajusta FPS e bitrate de render 2025-08-05 00:02:00 +02:00
Leonardo Mortari
8f5934d576 Add param 2025-08-04 13:17:42 -03:00
Leonardo Mortari
a941eb6b98 Adjusta vars de font e videocodec 2025-08-04 13:08:57 -03:00
Leonardo Mortari
503f2817d2 Merge branch 'master' of https://gitea.leolitas.work.gd/admin/video-render-api 2025-08-04 09:04:55 -03:00
Leonardo Mortari
85b5717595 Adiciona vars faltantes 2025-08-04 09:04:51 -03:00
Leonardo Mortari
9c626a1e4a Altera background de branco para preto, altera cor da letra para branco, cria um auto-resize para formatar os textos com quebras de linhas 2025-08-04 09:03:34 -03:00
ad84469037 Remove parametro de audio false 2025-08-03 23:29:35 +02:00
561be6a182 Adjust queue 2025-08-02 21:45:52 +02:00
Leonardo Mortari
1e15544687 Muda nome do environment 2025-08-02 14:09:28 -03:00
Leonardo Mortari
927eabb2d5 Remove webhook e adiciona push na fila 2025-08-02 14:09:06 -03:00
LeoMortari
1425f852e6 Adjust compose 2025-08-02 12:29:35 -03:00
LeoMortari
95d287bafc Ajusta projeto para consumir uma fila 2025-08-02 12:27:26 -03:00
Leonardo Mortari
5bb58c98e5 Adjusts in project 2025-08-02 01:45:36 -03:00
Leonardo Mortari
55c7ccf316 Init repo 2025-07-31 19:29:14 -03:00
14 changed files with 1683 additions and 370 deletions

167
components/video.py Normal file
View File

@@ -0,0 +1,167 @@
import os
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.VideoClip import ColorClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy import TextClip
font = "./Montserrat.ttf"
font_size = 70
video_codec = "libx264"
def auto_wrap_text(text, max_width):
if not text:
return ""
words = text.split()
lines = []
line = ""
for word in words:
test_line = f"{line} {word}".strip()
test_clip = TextClip(text=test_line, font=font, font_size=font_size, color='white', method='label')
if test_clip.w > max_width and line != "":
lines.append(line)
line = word
else:
line = test_line
test_clip.close()
lines.append(line)
return "\n".join(lines)
def cut_video_new_clip(input_path: str, start: float, end: float, output_path: str):
with VideoFileClip(input_path) as clip:
segment = clip.subclipped(start, end)
fps = clip.fps or 30
if segment.h < 720:
segment = segment.resized(height=720)
segment.write_videofile(
output_path,
codec=video_codec,
remove_temp=True,
fps=fps,
bitrate="5000k",
ffmpeg_params=[
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
"-level", "4.1"
]
)
def process_segment(input_path: str, top_text: str = "", bottom_text: str = "", filename="", idx=1) -> str:
os.makedirs("outputs", exist_ok=True)
os.makedirs(f"outputs/{filename}", exist_ok=True)
final_width, final_height = 1080, 1920
top_h, middle_h, bottom_h = 480, 960, 480
with VideoFileClip(input_path) as clip:
dur = clip.duration
bg = ColorClip(size=(final_width, final_height), color=(0, 0, 0), duration=dur)
video_resized = clip.resized(width=final_width)
y = top_h + (middle_h - video_resized.h) // 2
video_resized = video_resized.with_position((0, y))
wrapped_top_text = auto_wrap_text(top_text, final_width - 40)
wrapped_bottom_text = auto_wrap_text(bottom_text, final_width - 40)
txt_top = TextClip(
text=wrapped_top_text,
font_size=70,
color="white",
font=font,
method="label",
size=(final_width, top_h)
).with_duration(dur).with_position((0, 0))
txt_bot = TextClip(
text=wrapped_bottom_text,
font_size=70,
color="white",
font=font,
method="label",
size=(final_width, bottom_h),
).with_duration(dur).with_position((0, final_height - bottom_h))
final = CompositeVideoClip([bg, video_resized, txt_top, txt_bot], size=(final_width, final_height))
output_path = f"outputs/{filename}/clip_{idx}.mp4"
final.write_videofile(
output_path,
codec=video_codec,
remove_temp=True,
fps=30,
bitrate="5000k",
ffmpeg_params=[
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
"-level", "4.1"
]
)
final.close()
return output_path
def timestamp_to_seconds(ts):
if isinstance(ts, (int, float)):
return ts
parts = ts.split(":")
parts = [float(p) for p in parts]
if len(parts) == 3:
h, m, s = parts
return h * 3600 + m * 60 + s
elif len(parts) == 2:
m, s = parts
return m * 60 + s
elif len(parts) == 1:
return parts[0]
else:
raise ValueError(f"Timestamp inválido: {ts}")
def process_full_video(filename: str, times: list = None) -> list:
os.makedirs("temp", exist_ok=True)
times = times or []
video_path = f"videos/{filename}"
processed = []
print(f"Total de trechos: {len(times)}")
print(f"Codec de render: {video_codec}")
for idx, interval in enumerate(times, start=1):
start = timestamp_to_seconds(interval.get("start", 0))
end_raw = interval.get("end", None)
end = timestamp_to_seconds(end_raw) if end_raw is not None else None
top_text = interval.get("topText", "")
bottom_text = interval.get("bottomText", "")
if end is None:
with VideoFileClip(video_path) as clip:
end = clip.duration
print(f"Cortando trecho {idx}: {start}s a {end}s")
temp_path = f"temp/{os.path.splitext(filename)[0]}_{idx}.mp4"
cut_video_new_clip(video_path, start, end, temp_path)
out = process_segment(temp_path, top_text, bottom_text, filename, idx)
processed.append(out)
return processed

View File

@@ -1,20 +1,28 @@
services: services:
video-render: video-render:
restart: unless-stopped restart: unless-stopped
build: . build:
context: .
no_cache: true
dockerfile: dockerfile
environment: environment:
- RABBITMQ_PASS=${RABBITMQ_PASS} - RABBITMQ_PASS=${RABBITMQ_PASS}
- OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions} - OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY} - OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OPENROUTER_MODEL=${OPENROUTER_MODEL:-openai/gpt-oss-20b:free} - OPENROUTER_MODEL=${OPENROUTER_MODEL:-mistralai/mistral-small-3.1-24b-instruct:free}
- OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt} - OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt}
- FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium} - FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium}
- SMART_FRAMING_SMOOTHING_WINDOW=${SMART_FRAMING_SMOOTHING_WINDOW:-30}
- SMART_FRAMING_MAX_VELOCITY=${SMART_FRAMING_MAX_VELOCITY:-40}
- SMART_FRAMING_FRAME_SKIP=${SMART_FRAMING_FRAME_SKIP:-2}
- SMART_FRAMING_PERSON_SWITCH_COOLDOWN=${SMART_FRAMING_PERSON_SWITCH_COOLDOWN:-60}
volumes: volumes:
- "/root/videos:/app/videos" - "/root/videos:/app/videos"
- "/root/outputs:/app/outputs" - "/root/outputs:/app/outputs"
- "/root/prompts:/app/prompts" - "/root/prompts:/app/prompts"
# - "./videos:/app/videos" # - "./videos:/app/videos"
# - "./outputs:/app/outputs" # - "./outputs:/app/outputs"
# - "./prompts:/app/prompts"
command: "python -u main.py" command: "python -u main.py"
networks: networks:
- dokploy-network - dokploy-network

View File

@@ -40,4 +40,4 @@ RUN mkdir -p /app/videos /app/outputs
VOLUME ["/app/videos", "/app/outputs"] VOLUME ["/app/videos", "/app/outputs"]
CMD ["python", "-u", "main.py"] CMD ["python", "-u", "main.py"]

View File

@@ -28,4 +28,4 @@ def main() -> None:
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,85 +1,111 @@
Voce e especialista em viralidade de redes sociais (TikTok, Instagram Reels, YouTube Shorts). Analise a transcricao e selecione trechos com MAXIMO potencial viral, priorizando qualidade sobre quantidade. # TAREFA: Extrair clips virais de uma transcrição de vídeo
PROCESSO DE ANALISE: Você é um especialista em conteúdo viral para TikTok, Instagram Reels e YouTube Shorts.
1. Mapear potenciais trechos na transcricao
2. Avaliar cada trecho usando sistema de pontuacao abaixo
3. Rankear do maior para menor score viral
4. Selecionar apenas os top-ranked baseado na duracao do video
SISTEMA DE PONTUACAO VIRAL (0-100 pontos): ## REGRA MAIS IMPORTANTE - DURAÇÃO DOS CLIPS
HOOK/ABERTURA (0-25 pontos): **CADA CLIP DEVE TER ENTRE 60 E 120 SEGUNDOS DE DURAÇÃO.**
[25] Frase choqueante, pergunta polemica ou promessa ousada
[20] Historia intrigante ou situacao inusitada
[15] Afirmacao interessante mas previsivel
[10] Introducao generica mas aceitavel
[0] "Oi", "entao", silencio ou conteudo fraco
GATILHO EMOCIONAL (0-25 pontos): - MÍNIMO ABSOLUTO: 60 segundos (end - start >= 60)
[25] Emocao extrema: raiva, choque, riso intenso, inspiracao profunda - MÁXIMO: 120 segundos (end - start <= 120)
[20] Emocao forte: surpresa, indignacao, humor, curiosidade intensa - IDEAL: 60-90 segundos
[15] Emocao moderada: interesse, leve humor, curiosidade
[10] Emocao fraca: informativo sem impacto emocional
[0] Monotono, tecnico, sem apelo emocional
VALOR/UTILIDADE (0-20 pontos): **CLIPS COM MENOS DE 60 SEGUNDOS SERÃO REJEITADOS PELO SISTEMA.**
[20] Segredo valioso, insight transformador ou informacao exclusiva
[15] Ensina algo pratico e imediatamente aplicavel
[10] Opiniao interessante ou perspectiva util
[5] Informacao generica ou conhecimento comum
[0] Nenhum valor pratico, puro enrolation
ESTRUTURA NARRATIVA (0-15 pontos): Antes de incluir um clip, SEMPRE calcule: end - start >= 60
[15] Historia completa com inicio, conflito/climax e resolucao
[10] Segmento com comeco e fim coerentes
[5] Trecho com sentido mas cortado abruptamente
[0] Fragmento sem contexto ou conclusao
RITMO E ENERGIA (0-15 pontos): ## QUANTIDADE DE CLIPS
[15] Dinamico, sem pausas, alta energia, palavras impactantes
[10] Bom ritmo com pausas naturais curtas
[5] Ritmo lento mas aceitavel
[0] Muitas pausas, hesitacoes, monotonia, silencio
REGRAS DE QUANTIDADE: Baseado na duração total do vídeo:
5-10 min: 3 clipes (minimo 1 se score alto) - Até 10 min: 2-4 clips
10-20 min: 4 clipes - 10-20 min: 4-6 clips
20-30 min: 5 clipes - 20-30 min: 6-10 clips
30+ min: 6 clipes (maximo absoluto) - 30+ min: 8-15 clips
IMPORTANTE: Priorize qualidade. Melhor 3 clipes score 80+ que 6 clipes score 50. Se poucos momentos virais, retorne apenas os melhores (minimo 1). ## CRITÉRIOS DE SELEÇÃO
CRITERIOS DE SELECAO: Um bom clip viral possui:
- Score viral maior ou igual 60 pontos (idealmente maior ou igual 70)
- Duracao ideal: 60-90s
- Duracao minima: 60s | Duracao maxima: 120s
- Sem sobreposicao (end de um menor que start do proximo)
- Inicio e fim coerentes
EVITE: 1. GANCHO FORTE nos primeiros 3 segundos (pergunta, afirmação chocante, promessa)
- Introducoes genericas 2. EMOÇÃO (humor, surpresa, indignação, curiosidade)
- Trechos com silencio/pausas maiores que 3s 3. VALOR (ensina algo, revela segredo, dá dica prática)
- Explicacoes tecnicas sem gancho emocional 4. ESTRUTURA (início, meio e fim coerentes)
- Segmentos sem conclusao 5. RITMO (sem pausas longas, dinâmico)
- Momentos de transicao
FORMATO JSON (retorne APENAS isto): ## O QUE EVITAR
{"highlights":[{"start":<float>,"end":<float>,"summary":"Score estimado e gatilhos principais"}]}
REGRAS TECNICAS: - Introduções genéricas ("oi pessoal", "então", "bem")
- Float com ponto decimal (45.5 NAO 45,5) - Trechos com pausas longas (> 3 segundos de silêncio)
- Timestamps exatos dos segments fornecidos - Segmentos sem contexto ou conclusão
- Ordem cronologica (start crescente) - Explicações técnicas monótonas
- Minimo 1, maximo 6 highlights
- Summary conciso (1-2 frases)
TAREFA: ## FORMATO DE RESPOSTA
1. Leia transcricao e timestamps
2. Avalie e pontue trechos mentalmente
3. Rankear por score viral
4. Selecione top-ranked baseado na duracao
5. Retorne JSON
6. Se video fraco, retorne pelo menos 1 highlight
Objetivo: MAXIMIZAR chance de viralizar. Seja criterioso, apenas melhores trechos. Retorne APENAS um JSON válido, sem texto antes ou depois:
```json
{
"highlights": [
{
"start": 0.0,
"end": 75.0,
"summary": "Descrição do que acontece neste trecho"
},
{
"start": 120.5,
"end": 195.0,
"summary": "Descrição do que acontece neste trecho"
}
]
}
```
## REGRAS DO JSON
- "start" e "end" são números decimais (float) em SEGUNDOS
- Use ponto como separador decimal (60.5, não 60,5)
- "summary" é uma descrição breve do conteúdo (1-2 frases)
- Clips em ordem cronológica (start crescente)
- Clips não podem se sobrepor
## CHECKLIST ANTES DE RESPONDER
Para CADA clip, verifique:
- [ ] end - start >= 60 segundos?
- [ ] end - start <= 120 segundos?
- [ ] Tem gancho forte no início?
- [ ] Faz sentido isolado do resto do vídeo?
- [ ] JSON está válido?
## EXEMPLO
Se o vídeo tem 15 minutos e você encontrou 4 momentos virais:
```json
{
"highlights": [
{
"start": 60.0,
"end": 120.0,
"summary": "Revelação sobre como economizar 50% nas compras"
},
{
"start": 180.0,
"end": 255.0,
"summary": "História engraçada sobre cliente que tentou enganar a loja"
},
{
"start": 400.0,
"end": 480.0,
"summary": "Dica prática de negociação com fornecedores"
},
{
"start": 600.0,
"end": 690.0,
"summary": "Conclusão motivacional sobre empreendedorismo"
}
]
}
```
Agora analise a transcrição fornecida e extraia os clips virais seguindo estas instruções.

View File

@@ -22,8 +22,8 @@ class RabbitMQSettings:
consume_queue: str = os.environ.get("RABBITMQ_QUEUE", "to-render") consume_queue: str = os.environ.get("RABBITMQ_QUEUE", "to-render")
publish_queue: str = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload") publish_queue: str = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
prefetch_count: int = int(os.environ.get("RABBITMQ_PREFETCH", 1)) prefetch_count: int = int(os.environ.get("RABBITMQ_PREFETCH", 1))
heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 60)) heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 600))
blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 300)) blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 7200))
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -62,11 +62,16 @@ class RenderingSettings:
subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64)) subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64))
caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2)) caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2))
caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2)) caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2))
# Smart framing settings
enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes") enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes")
smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.5)) smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.3))
smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 20)) smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 30))
smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 2)) # Process every Nth frame (CPU optimization) smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 1))
smart_framing_max_velocity: int = int(os.environ.get("SMART_FRAMING_MAX_VELOCITY", 25))
smart_framing_person_switch_cooldown: int = int(os.environ.get("SMART_FRAMING_PERSON_SWITCH_COOLDOWN", 30))
smart_framing_response_time: float = float(os.environ.get("SMART_FRAMING_RESPONSE_TIME", 0.6))
smart_framing_group_padding: float = float(os.environ.get("SMART_FRAMING_GROUP_PADDING", 0.15))
smart_framing_max_zoom_out: float = float(os.environ.get("SMART_FRAMING_MAX_ZOOM_OUT", 2.0))
smart_framing_dead_zone: int = int(os.environ.get("SMART_FRAMING_DEAD_ZONE", 60))
@dataclass(frozen=True) @dataclass(frozen=True)

View File

@@ -7,7 +7,7 @@ and identify who is speaking in video content using MediaPipe and audio analysis
from __future__ import annotations from __future__ import annotations
import logging import logging
from dataclasses import dataclass from dataclasses import dataclass, field
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
import cv2 import cv2
@@ -41,6 +41,18 @@ class PersonTracking:
frame_number: int frame_number: int
@dataclass
class GroupBoundingBox:
"""Bounding box containing all tracked faces."""
x: int
y: int
width: int
height: int
center_x: int
center_y: int
face_count: int
@dataclass @dataclass
class FrameContext: class FrameContext:
"""Context information for a video frame.""" """Context information for a video frame."""
@@ -50,20 +62,23 @@ class FrameContext:
active_speakers: List[int] # indices of speaking faces active_speakers: List[int] # indices of speaking faces
primary_focus: Optional[Tuple[int, int]] # (x, y) center point primary_focus: Optional[Tuple[int, int]] # (x, y) center point
layout_mode: str # "single", "dual_split", "grid" layout_mode: str # "single", "dual_split", "grid"
selected_people: List[int] = field(default_factory=list) # indices of people selected for display
group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces
class MediaPipeDetector: class MediaPipeDetector:
"""Face and pose detection using MediaPipe.""" """Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback."""
def __init__(self, min_detection_confidence: float = 0.5, min_tracking_confidence: float = 0.5): def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3):
self.min_detection_confidence = min_detection_confidence self.min_detection_confidence = min_detection_confidence
self.min_tracking_confidence = min_tracking_confidence self.min_tracking_confidence = min_tracking_confidence
self.mp_face_detection = mp.solutions.face_detection self.mp_face_detection = mp.solutions.face_detection
self.mp_face_mesh = mp.solutions.face_mesh self.mp_face_mesh = mp.solutions.face_mesh
# MediaPipe detectors with lower confidence for better cartoon detection
self.face_detection = self.mp_face_detection.FaceDetection( self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=min_detection_confidence, min_detection_confidence=min_detection_confidence,
model_selection=1 model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons)
) )
self.face_mesh = self.mp_face_mesh.FaceMesh( self.face_mesh = self.mp_face_mesh.FaceMesh(
@@ -73,11 +88,17 @@ class MediaPipeDetector:
static_image_mode=False static_image_mode=False
) )
logger.info("MediaPipe detector initialized") # OpenCV Haar Cascade as fallback for cartoon/anime faces
self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Alternative cascade for profile/side faces
self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)")
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]: def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
""" """
Detect faces in a frame. Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade).
Args: Args:
frame: RGB image array frame: RGB image array
@@ -94,6 +115,7 @@ class MediaPipeDetector:
else: else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Try MediaPipe first
results = self.face_detection.process(frame_rgb) results = self.face_detection.process(frame_rgb)
faces = [] faces = []
@@ -126,8 +148,111 @@ class MediaPipeDetector:
center_y=center_y center_y=center_y
)) ))
# Fallback to OpenCV Haar Cascade if MediaPipe found nothing
if not faces:
faces = self._detect_faces_haar_cascade(frame, width, height)
return faces return faces
def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]:
"""
Detect faces using OpenCV Haar Cascade (works better with cartoons/anime).
Args:
frame: Image frame (BGR format)
width: Frame width
height: Frame height
Returns:
List of detected faces
"""
# Convert to grayscale for Haar Cascade
if len(frame.shape) == 3:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
gray = frame
# Detect frontal faces with more sensitive parameters
frontal_faces = self.haar_cascade.detectMultiScale(
gray,
scaleFactor=1.05, # More sensitive to size variations
minNeighbors=3, # Lower threshold for detection (more permissive)
minSize=(30, 30), # Smaller minimum size
flags=cv2.CASCADE_SCALE_IMAGE
)
# Also try profile faces
profile_faces = self.haar_cascade_profile.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=3,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Combine frontal and profile detections
all_faces = []
for (x, y, w, h) in frontal_faces:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value
center_x=center_x,
center_y=center_y
))
for (x, y, w, h) in profile_faces:
# Check if this face overlaps significantly with any frontal face
overlap = False
for existing_face in all_faces:
# Calculate IoU (Intersection over Union)
x1_overlap = max(x, existing_face.x)
y1_overlap = max(y, existing_face.y)
x2_overlap = min(x + w, existing_face.x + existing_face.width)
y2_overlap = min(y + h, existing_face.y + existing_face.height)
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
face_area = w * h
if overlap_area / face_area > 0.3: # 30% overlap threshold
overlap = True
break
if not overlap:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.6, # Slightly lower confidence for profile
center_x=center_x,
center_y=center_y
))
if all_faces:
logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)")
return all_faces
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]: def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
""" """
Detect faces with landmarks for lip sync detection. Detect faces with landmarks for lip sync detection.
@@ -203,8 +328,8 @@ class AudioActivityDetector:
def detect_speaking_periods( def detect_speaking_periods(
self, self,
audio_samples: np.ndarray, audio_samples: np.ndarray,
threshold: float = 0.02, threshold: float = 0.01, # Reduced from 0.02 for better speech detection
min_speech_duration: float = 0.1 min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances
) -> List[Tuple[float, float]]: ) -> List[Tuple[float, float]]:
""" """
Detect periods of speech in audio. Detect periods of speech in audio.
@@ -250,6 +375,16 @@ class AudioActivityDetector:
if end_time - start_time >= min_speech_duration: if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time)) periods.append((start_time, end_time))
# Log detected speech periods for debugging
if periods:
total_speech_time = sum(end - start for start, end in periods)
logger.info(f"Audio speech detection: {len(periods)} periods found, "
f"total {total_speech_time:.1f}s of speech (threshold={threshold})")
else:
max_energy = max(energies) if energies else 0
logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} "
f"(try lowering threshold if speech should be present)")
return periods return periods
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool: def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
@@ -263,12 +398,30 @@ class AudioActivityDetector:
class ContextAnalyzer: class ContextAnalyzer:
"""Analyzes video context to determine focus and layout.""" """Analyzes video context to determine focus and layout."""
def __init__(self): def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3):
self.detector = MediaPipeDetector() self.detector = MediaPipeDetector()
self.audio_detector = AudioActivityDetector() self.audio_detector = AudioActivityDetector()
self.previous_faces: List[FaceDetection] = [] self.previous_faces: List[FaceDetection] = []
self.min_face_confidence = min_face_confidence
logger.info("Context analyzer initialized") # Person tracking state
self.current_selected_people: List[int] = [] # Indices of people currently on screen
self.last_switch_frame: int = -999 # Frame when we last switched people
self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching
# Stability tracking to prevent flip-flopping
self.desired_people_history: List[List[int]] = [] # Track recent desired selections
self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability)
self.last_switched_people: List[int] = [] # People we just switched FROM
self.focus_history: List[Tuple[int, int]] = []
self.focus_history_size: int = 20
self.focus_dead_zone: int = 60
# Debug logging
self.frame_log_interval = 30 # Log every N frames
logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})")
def analyze_frame( def analyze_frame(
self, self,
@@ -290,39 +443,70 @@ class ContextAnalyzer:
FrameContext with detection results FrameContext with detection results
""" """
faces = self.detector.detect_face_landmarks(frame) faces = self.detector.detect_face_landmarks(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
if not faces: if not faces:
faces = self.detector.detect_faces(frame) faces = self.detector.detect_faces(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
# Determine who is speaking # Determine who is speaking
active_speakers = [] active_speakers = []
has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp)
for i, face in enumerate(faces): for i, face in enumerate(faces):
is_speaking = False is_speaking = False
if speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp): # Prefer visual cues when multiple faces are present.
is_speaking = True
if face.landmarks and len(self.previous_faces) > i: if face.landmarks and len(self.previous_faces) > i:
is_speaking = is_speaking or self._detect_lip_movement(face, self.previous_faces[i]) is_speaking = self._detect_lip_movement(face, self.previous_faces[i])
# Audio can confirm speech when there's only one face.
if has_audio_speech and len(faces) == 1:
is_speaking = True
if is_speaking: if is_speaking:
active_speakers.append(i) active_speakers.append(i)
num_faces = len(faces) # Debug: Log speech detection
num_speakers = len(active_speakers) if frame_number % 30 == 0: # Every second at 30fps
logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, "
f"speakers={active_speakers}, total_faces={len(faces)}")
if num_faces == 0: if active_speakers:
layout_mode = "single" selected_people = active_speakers[:4]
elif num_faces == 1: if len(selected_people) == 1:
layout_mode = "single" layout_mode = "single"
elif num_faces == 2: elif len(selected_people) == 2:
layout_mode = "dual_split" layout_mode = "dual_split"
elif num_faces >= 3: else:
layout_mode = "dual_split" layout_mode = "grid"
else: else:
# Select THE person to focus on (always single person)
# Priority: 1) Who is speaking, 2) Who is most centered
selected_people = self._select_person_to_focus(
faces,
active_speakers,
frame_number,
frame.shape[1], # frame width for center calculation
frame.shape[0] # frame height for center calculation
)
layout_mode = "single" layout_mode = "single"
primary_focus = self._calculate_focus_point(faces, active_speakers) # Calculate group bounding box for ALL detected faces (multi-person support)
group_bounds = self._calculate_group_bounding_box(faces)
# For multi-person mode, use group center as primary focus
if group_bounds and group_bounds.face_count > 1:
primary_focus = (group_bounds.center_x, group_bounds.center_y)
else:
primary_focus = self._calculate_focus_point(faces, selected_people)
# Debug logging every N frames
if frame_number % self.frame_log_interval == 0:
focus_reason = "speaker" if active_speakers else "no_speech_detected"
group_info = f", group={group_bounds.face_count} faces" if group_bounds else ""
logger.info(f"Frame {frame_number}: {len(faces)} faces, "
f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}")
self.previous_faces = faces self.previous_faces = faces
@@ -332,7 +516,9 @@ class ContextAnalyzer:
detected_faces=faces, detected_faces=faces,
active_speakers=active_speakers, active_speakers=active_speakers,
primary_focus=primary_focus, primary_focus=primary_focus,
layout_mode=layout_mode layout_mode=layout_mode,
selected_people=selected_people,
group_bounds=group_bounds
) )
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool: def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
@@ -363,36 +549,296 @@ class ContextAnalyzer:
threshold = 2.0 threshold = 2.0
return abs(current_dist - previous_dist) > threshold return abs(current_dist - previous_dist) > threshold
def _calculate_focus_point( def _select_person_to_focus(
self, self,
faces: List[FaceDetection], faces: List[FaceDetection],
active_speakers: List[int] active_speakers: List[int],
) -> Optional[Tuple[int, int]]: frame_number: int,
frame_width: int,
frame_height: int
) -> List[int]:
""" """
Calculate the primary focus point based on detected faces and speakers. Select THE single person to focus on.
Priority: 1) Who is speaking, 2) Who is most centered in frame
IMPORTANT: This focuses on ONE person to avoid focusing on empty space (table).
When multiple people are present, we pick the most relevant person, not average positions.
Args: Args:
faces: List of detected faces faces: List of detected faces
active_speakers: Indices of faces that are speaking active_speakers: Indices of people currently speaking
frame_number: Current frame number
frame_width: Frame width for center calculation
frame_height: Frame height for center calculation
Returns:
List with single person index [idx], or empty list if no faces
"""
if not faces:
self.current_selected_people = []
return []
if len(faces) == 1:
self.current_selected_people = [0]
return [0]
frames_since_last_switch = frame_number - self.last_switch_frame
can_switch = frames_since_last_switch >= self.person_switch_cooldown
desired_person_idx = None
if active_speakers:
if self.current_selected_people and self.current_selected_people[0] in active_speakers:
desired_person_idx = self.current_selected_people[0]
else:
if can_switch or not self.current_selected_people:
desired_person_idx = active_speakers[0]
if self.current_selected_people and desired_person_idx != self.current_selected_people[0]:
logger.info(f"Switching focus to speaker: {desired_person_idx}")
self.last_switch_frame = frame_number
else:
desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0]
else:
if self.current_selected_people and len(self.current_selected_people) > 0:
current_idx = self.current_selected_people[0]
if current_idx < len(faces):
desired_person_idx = current_idx
else:
if self.previous_faces and current_idx < len(self.previous_faces):
prev_face = self.previous_faces[current_idx]
best_match_idx = None
best_match_score = float('inf')
for idx, face in enumerate(faces):
dx = face.center_x - prev_face.center_x
dy = face.center_y - prev_face.center_y
dist = np.sqrt(dx**2 + dy**2)
size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height)
score = dist + size_diff * 0.5
if score < best_match_score:
best_match_score = score
best_match_idx = idx
if best_match_idx is not None and best_match_score < 1000:
desired_person_idx = best_match_idx
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
desired_people = [desired_person_idx] if desired_person_idx is not None else []
if not self.current_selected_people:
self.current_selected_people = desired_people
self.last_switch_frame = frame_number
logger.info(f"Frame {frame_number}: Locked on person {desired_people}")
else:
self.current_selected_people = desired_people
return self.current_selected_people.copy()
def _ensure_distinct_people(
self,
faces: List[FaceDetection],
people_indices: List[int]
) -> List[int]:
"""
Ensure selected people are distinct by checking minimum distance between them.
Prevents showing the same person twice due to duplicate detection.
Args:
faces: List of detected faces
people_indices: Indices of people to validate
Returns:
List of distinct people indices (max 2)
"""
if len(people_indices) <= 1:
return people_indices
distinct_people = []
for idx in people_indices:
if idx >= len(faces):
continue
current_face = faces[idx]
is_distinct = True
# Check if this person is too close to any already selected person
for selected_idx in distinct_people:
selected_face = faces[selected_idx]
# Calculate distance between face centers
dx = current_face.center_x - selected_face.center_x
dy = current_face.center_y - selected_face.center_y
distance = np.sqrt(dx**2 + dy**2)
# Also check overlap via IoU (Intersection over Union)
x1_overlap = max(current_face.x, selected_face.x)
y1_overlap = max(current_face.y, selected_face.y)
x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width)
y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height)
overlap_area = 0
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
# Calculate areas
area1 = current_face.width * current_face.height
area2 = selected_face.width * selected_face.height
min_area = min(area1, area2)
# If faces are very close OR significantly overlapping, they're likely the same person
# Minimum distance: 1/4 of average face width
min_distance = (current_face.width + selected_face.width) / 8
overlap_threshold = 0.3 # 30% overlap
if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold):
is_distinct = False
logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})")
break
if is_distinct:
distinct_people.append(idx)
# Stop at 2 distinct people
if len(distinct_people) >= 2:
break
# If we couldn't find 2 distinct people, return at most 1
if len(distinct_people) < 2 and len(people_indices) >= 2:
logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections")
return distinct_people
def _calculate_focus_point(
self,
faces: List[FaceDetection],
selected_people: List[int]
) -> Optional[Tuple[int, int]]:
"""
Calculate the primary focus point based on selected people with temporal smoothing.
Args:
faces: List of detected faces
selected_people: Indices of people selected for display
Returns: Returns:
(x, y) tuple of focus center, or None if no faces (x, y) tuple of focus center, or None if no faces
""" """
if not faces or not selected_people:
return None
# Calculate raw focus point
raw_focus_x = 0
raw_focus_y = 0
if len(selected_people) == 1:
# Single person - focus on them
if selected_people[0] < len(faces):
primary = faces[selected_people[0]]
raw_focus_x = primary.center_x
raw_focus_y = primary.center_y
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
else:
# Multiple people - focus on the CENTER between them for stability
# This prevents jarring movements when switching focus between people
valid_people = [idx for idx in selected_people if idx < len(faces)]
if valid_people:
centers_x = [faces[idx].center_x for idx in valid_people]
centers_y = [faces[idx].center_y for idx in valid_people]
raw_focus_x = int(np.mean(centers_x))
raw_focus_y = int(np.mean(centers_y))
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
if self.focus_history:
last_x, last_y = self.focus_history[-1]
dx = abs(raw_focus_x - last_x)
dy = abs(raw_focus_y - last_y)
if dx < self.focus_dead_zone and dy < self.focus_dead_zone:
return self.focus_history[-1]
self.focus_history.append((raw_focus_x, raw_focus_y))
if len(self.focus_history) > self.focus_history_size:
self.focus_history.pop(0)
if len(self.focus_history) >= 5:
xs = [x for x, y in self.focus_history]
ys = [y for x, y in self.focus_history]
median_x = int(np.median(xs))
median_y = int(np.median(ys))
return (median_x, median_y)
else:
return (raw_focus_x, raw_focus_y)
def _calculate_group_bounding_box(
self,
faces: List[FaceDetection],
padding_percent: float = 0.15,
max_faces: int = 6
) -> Optional[GroupBoundingBox]:
"""
Calculate bounding box containing all detected faces with padding.
Args:
faces: List of detected faces
padding_percent: Padding around group as percentage of bbox dimensions
max_faces: Maximum faces to include (use most confident if exceeded)
Returns:
GroupBoundingBox or None if no faces
"""
if not faces: if not faces:
return None return None
if active_speakers: # If too many faces, use most confident ones
speaker_faces = [faces[i] for i in active_speakers if i < len(faces)] if len(faces) > max_faces:
if speaker_faces: faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces]
primary_speaker = max(speaker_faces, key=lambda f: f.confidence)
return (primary_speaker.center_x, primary_speaker.center_y)
most_confident = max(faces, key=lambda f: f.confidence) # Calculate bounding box containing all faces
return (most_confident.center_x, most_confident.center_y) min_x = min(f.x for f in faces)
max_x = max(f.x + f.width for f in faces)
min_y = min(f.y for f in faces)
max_y = max(f.y + f.height for f in faces)
# Add padding
width = max_x - min_x
height = max_y - min_y
pad_x = int(width * padding_percent)
pad_y = int(height * padding_percent)
final_x = max(0, min_x - pad_x)
final_y = max(0, min_y - pad_y)
final_width = width + 2 * pad_x
final_height = height + 2 * pad_y
return GroupBoundingBox(
x=final_x,
y=final_y,
width=final_width,
height=final_height,
center_x=final_x + final_width // 2,
center_y=final_y + final_height // 2,
face_count=len(faces)
)
def close(self): def close(self):
"""Release resources.""" """Release resources."""
self.detector.close() self.detector.close()
# Clear tracking state to free memory
self.previous_faces.clear()
self.current_selected_people.clear()
self.focus_history.clear()

View File

@@ -137,12 +137,12 @@ class OpenRouterCopywriter:
continue continue
duration = end - start duration = end - start
if duration < 45: if duration < 60:
logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)") logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)")
continue continue
if duration > 120: if duration > 120:
logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 120s)") logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 90s)")
continue continue
if not summary: if not summary:

View File

@@ -50,7 +50,10 @@ class MediaPreparer:
existing_children = list(workspace_dir.iterdir()) existing_children = list(workspace_dir.iterdir())
if existing_children: if existing_children:
logger.info("Limpando workspace existente para %s", sanitized_name) logger.info("Limpando workspace existente para %s", sanitized_name)
remove_paths(existing_children) try:
remove_paths(existing_children)
except Exception as e:
logger.warning(f"Não foi possível limpar workspace (não crítico): {e}")
if temp_transcription_json and temp_transcription_json.exists(): if temp_transcription_json and temp_transcription_json.exists():
shutil.move(str(temp_transcription_json), str(transcription_json)) shutil.move(str(temp_transcription_json), str(transcription_json))
@@ -66,7 +69,10 @@ class MediaPreparer:
output_dir = ensure_workspace(self.settings.outputs_dir, sanitized_name) output_dir = ensure_workspace(self.settings.outputs_dir, sanitized_name)
existing_outputs = list(output_dir.iterdir()) existing_outputs = list(output_dir.iterdir())
if existing_outputs: if existing_outputs:
remove_paths(existing_outputs) try:
remove_paths(existing_outputs)
except Exception as e:
logger.warning(f"Não foi possível limpar outputs antigos (não crítico): {e}")
audio_path = workspace_dir / "audio.wav" audio_path = workspace_dir / "audio.wav"
extract_audio_to_wav(working_video_path, audio_path) extract_audio_to_wav(working_video_path, audio_path)

View File

@@ -69,6 +69,7 @@ class VideoPipeline:
return self._build_success_payload(context) return self._build_success_payload(context)
except Exception as exc: except Exception as exc:
logger.exception("Falha ao processar vídeo %s", context.job.filename) logger.exception("Falha ao processar vídeo %s", context.job.filename)
return self._handle_failure(context, exc)
def _parse_job(self, message: Dict[str, Any]) -> JobMessage: def _parse_job(self, message: Dict[str, Any]) -> JobMessage:
filename = message.get("filename") filename = message.get("filename")
@@ -107,6 +108,9 @@ class VideoPipeline:
TranscriptionService.persist(transcription, context.workspace.workspace_dir) TranscriptionService.persist(transcription, context.workspace.workspace_dir)
context.transcription = transcription context.transcription = transcription
# Unload Whisper model immediately after transcription to free memory (1-3GB)
self.transcriber.unload_model()
def _determine_highlights(self, context: PipelineContext) -> None: def _determine_highlights(self, context: PipelineContext) -> None:
if not context.transcription: if not context.transcription:
raise RuntimeError("Transcricao nao disponivel") raise RuntimeError("Transcricao nao disponivel")

View File

@@ -345,7 +345,14 @@ class VideoRenderer:
target_width=settings.rendering.frame_width, target_width=settings.rendering.frame_width,
target_height=settings.rendering.frame_height, target_height=settings.rendering.frame_height,
frame_skip=settings.rendering.smart_framing_frame_skip, frame_skip=settings.rendering.smart_framing_frame_skip,
smoothing_window=settings.rendering.smart_framing_smoothing_window smoothing_window=settings.rendering.smart_framing_smoothing_window,
max_velocity=settings.rendering.smart_framing_max_velocity,
person_switch_cooldown=settings.rendering.smart_framing_person_switch_cooldown,
response_time=settings.rendering.smart_framing_response_time,
group_padding=settings.rendering.smart_framing_group_padding,
max_zoom_out=settings.rendering.smart_framing_max_zoom_out,
dead_zone=settings.rendering.smart_framing_dead_zone,
min_face_confidence=settings.rendering.smart_framing_min_confidence
) )
def render( def render(
@@ -436,12 +443,10 @@ class VideoRenderer:
audio_samples=audio_samples audio_samples=audio_samples
) )
# Apply smart framing based on detected layout # Apply smart framing (always single-person focus)
use_split_screen = framing_plan.layout_mode in ["dual_split", "grid"]
video_clip = self.smart_framer.apply_framing( video_clip = self.smart_framer.apply_framing(
video_clip=subclip, video_clip=subclip,
framing_plan=framing_plan, framing_plan=framing_plan
use_split_screen=use_split_screen
) )
logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, " logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, "
@@ -602,6 +607,10 @@ class VideoRenderer:
if audio_clip is not None and audio_needs_close: if audio_clip is not None and audio_needs_close:
audio_clip.close() audio_clip.close()
# Force garbage collection to free memory after rendering
import gc
gc.collect()
return str(output_path) return str(output_path)
def _materialize_audio( def _materialize_audio(

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import List, Optional from typing import List, Optional
import numpy as np
from faster_whisper import WhisperModel from faster_whisper import WhisperModel
from video_render.config import Settings from video_render.config import Settings
@@ -56,6 +57,17 @@ class TranscriptionService:
) )
return self._model return self._model
def unload_model(self) -> None:
"""Unload the Whisper model to free memory (reduces RAM usage by 1-3GB)."""
if self._model is not None:
logger.info("Descarregando modelo Whisper para liberar memória...")
del self._model
self._model = None
# Force garbage collection to immediately free GPU/CPU memory
import gc
gc.collect()
logger.info("Modelo Whisper descarregado com sucesso")
def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult: def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult:
if output_dir is not None: if output_dir is not None:
existing_transcription = self.load(output_dir) existing_transcription = self.load(output_dir)
@@ -63,7 +75,34 @@ class TranscriptionService:
logger.info("Transcrição já existe em %s, reutilizando...", output_dir) logger.info("Transcrição já existe em %s, reutilizando...", output_dir)
return existing_transcription return existing_transcription
logger.info("Iniciando transcrição do áudio com FasterWhisper...") # Get audio duration to decide if we need chunked processing
audio_duration = self._get_audio_duration(audio_path)
chunk_duration_minutes = 30 # Process in 30-minute chunks for long videos
chunk_duration_seconds = chunk_duration_minutes * 60
# For videos longer than 30 minutes, use chunked processing to avoid OOM
if audio_duration > chunk_duration_seconds:
logger.info(
f"Áudio longo detectado ({audio_duration/60:.1f} min). "
f"Processando em chunks de {chunk_duration_minutes} min para evitar erro de memória..."
)
return self._transcribe_chunked(audio_path, chunk_duration_seconds)
else:
logger.info(f"Iniciando transcrição do áudio ({audio_duration/60:.1f} min) com FasterWhisper...")
return self._transcribe_full(audio_path)
def _get_audio_duration(self, audio_path: Path) -> float:
"""Get audio duration in seconds."""
try:
from moviepy.audio.io.AudioFileClip import AudioFileClip
with AudioFileClip(str(audio_path)) as audio:
return audio.duration or 0.0
except Exception as e:
logger.warning(f"Falha ao obter duração do áudio, assumindo curto: {e}")
return 0.0 # Assume short if we can't determine
def _transcribe_full(self, audio_path: Path) -> TranscriptionResult:
"""Transcribe entire audio at once (for shorter videos)."""
model = self._load_model() model = self._load_model()
segments, _ = model.transcribe( segments, _ = model.transcribe(
str(audio_path), str(audio_path),
@@ -97,6 +136,101 @@ class TranscriptionService:
full_text=" ".join(full_text_parts).strip(), full_text=" ".join(full_text_parts).strip(),
) )
def _transcribe_chunked(self, audio_path: Path, chunk_duration: float) -> TranscriptionResult:
"""Transcribe audio in chunks to avoid OOM on long videos."""
import subprocess
from moviepy.audio.io.AudioFileClip import AudioFileClip
model = self._load_model()
all_segments: List[TranscriptSegment] = []
full_text_parts: List[str] = []
segment_id_counter = 0
# Get total duration
total_duration = self._get_audio_duration(audio_path)
num_chunks = int(np.ceil(total_duration / chunk_duration))
logger.info(f"Processando áudio em {num_chunks} chunks...")
for chunk_idx in range(num_chunks):
start_time = chunk_idx * chunk_duration
end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
logger.info(
f"Processando chunk {chunk_idx + 1}/{num_chunks} "
f"({start_time/60:.1f}min - {end_time/60:.1f}min)..."
)
# Extract chunk using ffmpeg directly (more reliable than moviepy subclip)
temp_chunk_path = audio_path.parent / f"temp_chunk_{chunk_idx}.wav"
try:
# Use ffmpeg to extract the chunk
chunk_duration_actual = end_time - start_time
ffmpeg_cmd = [
'ffmpeg',
'-y', # Overwrite output file
'-ss', str(start_time), # Start time
'-i', str(audio_path), # Input file
'-t', str(chunk_duration_actual), # Duration
'-acodec', 'pcm_s16le', # Audio codec
'-ar', '44100', # Sample rate
'-ac', '2', # Stereo
'-loglevel', 'error', # Only show errors
str(temp_chunk_path)
]
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
# Transcribe chunk
segments, _ = model.transcribe(
str(temp_chunk_path),
beam_size=5,
word_timestamps=True,
)
# Process segments with time offset
for segment in segments:
words = [
WordTiming(
start=w.start + start_time,
end=w.end + start_time,
word=w.word.strip()
)
for w in segment.words or []
if w.word.strip()
]
text = segment.text.strip()
full_text_parts.append(text)
all_segments.append(
TranscriptSegment(
id=segment_id_counter,
start=segment.start + start_time,
end=segment.end + start_time,
text=text,
words=words,
)
)
segment_id_counter += 1
# Force garbage collection after each chunk
import gc
gc.collect()
except subprocess.CalledProcessError as e:
logger.error(f"Erro ao extrair chunk {chunk_idx}: {e.stderr.decode() if e.stderr else str(e)}")
raise
finally:
# Clean up temp chunk
if temp_chunk_path.exists():
temp_chunk_path.unlink()
logger.info(f"Transcrição em chunks concluída: {len(all_segments)} segmentos processados")
return TranscriptionResult(
segments=all_segments,
full_text=" ".join(full_text_parts).strip(),
)
@staticmethod @staticmethod
def persist(result: TranscriptionResult, destination: Path) -> None: def persist(result: TranscriptionResult, destination: Path) -> None:
json_path = destination / "transcription.json" json_path = destination / "transcription.json"

View File

@@ -23,16 +23,58 @@ def ensure_workspace(root: Path, folder_name: str) -> Path:
def remove_paths(paths: Iterable[Path]) -> None: def remove_paths(paths: Iterable[Path]) -> None:
import logging
import time
logger = logging.getLogger(__name__)
for path in paths: for path in paths:
if not path.exists(): if not path.exists():
continue continue
if path.is_file() or path.is_symlink():
path.unlink(missing_ok=True) # Try to remove with retries and better error handling
else: max_retries = 3
for child in sorted(path.rglob("*"), reverse=True): for attempt in range(max_retries):
if child.is_file() or child.is_symlink(): try:
child.unlink(missing_ok=True) if path.is_file() or path.is_symlink():
elif child.is_dir(): path.unlink(missing_ok=True)
child.rmdir() else:
path.rmdir() for child in sorted(path.rglob("*"), reverse=True):
if child.is_file() or child.is_symlink():
try:
child.unlink(missing_ok=True)
except PermissionError:
logger.warning(f"Não foi possível deletar {child}: sem permissão")
# Try to change permissions and retry
try:
child.chmod(0o777)
child.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Falha ao forçar deleção de {child}: {e}")
elif child.is_dir():
try:
child.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {child}: {e}")
try:
path.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {path}: {e}")
break # Success, exit retry loop
except PermissionError as e:
if attempt < max_retries - 1:
logger.warning(f"Tentativa {attempt + 1}/{max_retries} falhou ao deletar {path}: {e}. Tentando novamente...")
time.sleep(0.5) # Wait a bit before retry
# Try to change permissions
try:
path.chmod(0o777)
except Exception:
pass
else:
logger.error(f"Não foi possível deletar {path} após {max_retries} tentativas: {e}")
except Exception as e:
logger.error(f"Erro inesperado ao deletar {path}: {e}")
break # Don't retry on unexpected errors