Compare commits

...

36 Commits

Author SHA1 Message Date
LeoMortari
f496663b63 Ajusta presets de render 2026-01-04 03:34:48 -03:00
LeoMortari
e4c5c6adfe Ajusta heartbeat do rabbit 2026-01-03 23:13:27 -03:00
LeoMortari
21d2d19435 Ajusta rabbit config 2026-01-03 19:51:31 -03:00
LeoMortari
3f7329869d Ajusta contexto, falas e foco, tremulação do video e demais bugs 2026-01-03 19:42:23 -03:00
LeoMortari
c1914dad00 Add return de excessao 2026-01-02 11:26:26 -03:00
LeoMortari
07d301f110 Realiza varios ajustes para melhorar o tracking e o render de video 2025-12-18 02:26:25 -03:00
LeoMortari
78e35d65fd Merge branch 'feat' 2025-11-12 11:43:49 -03:00
LeoMortari
c5d3e83a5f #v2 - Inicia testes da v2
- Adiciona rastreamento de objetos
- Facial detection
- Legenda interativa
- Cortes mais precisos
- Refinamento do Prompt
2025-11-12 11:38:09 -03:00
LeoMortari
87c6a5e27c Adiciona limpeza de arquivos apos sucesso ou falha 2025-10-29 23:58:06 -03:00
LeoMortari
ae8b228ea1 Add gemini api key env 2025-10-29 08:34:57 -03:00
LeoMortari
8abb8001d7 Ajusta configs do compose 2025-10-29 08:27:02 -03:00
LeoMortari
c18884e778 Finaliza os ajustes para render de video 2025-10-28 17:34:13 -03:00
LeoMortari
b5a27fa938 Ajustes do Gemini 2025-10-27 14:08:10 -03:00
LeoMortari
2692cc4dfd Ajusta git ignore 2025-10-27 09:15:43 -03:00
LeoMortari
8caa849148 Ajustes de rendering 2025-10-27 09:15:12 -03:00
d737177eab Ajusta 3000k bitrate 2025-08-05 21:23:29 +02:00
6420a02090 revert 2be19ee02c
revert remove bitrate
2025-08-05 21:19:31 +02:00
2be19ee02c remove bitrate 2025-08-05 20:32:07 +02:00
98613a0002 Implementa desacoplamento de I/O 2025-08-05 14:58:44 +02:00
501c45cad7 Ajusta callback 2025-08-05 14:43:12 +02:00
0fd0cda460 Ajusta rabbit 2025-08-05 04:39:03 +02:00
dd4f9fc51c Ajusta rabbitmq 2025-08-05 03:59:08 +02:00
6288d77d46 Ajusta FPS e bitrate de render 2025-08-05 00:02:00 +02:00
Leonardo Mortari
8f5934d576 Add param 2025-08-04 13:17:42 -03:00
Leonardo Mortari
a941eb6b98 Adjusta vars de font e videocodec 2025-08-04 13:08:57 -03:00
Leonardo Mortari
503f2817d2 Merge branch 'master' of https://gitea.leolitas.work.gd/admin/video-render-api 2025-08-04 09:04:55 -03:00
Leonardo Mortari
85b5717595 Adiciona vars faltantes 2025-08-04 09:04:51 -03:00
Leonardo Mortari
9c626a1e4a Altera background de branco para preto, altera cor da letra para branco, cria um auto-resize para formatar os textos com quebras de linhas 2025-08-04 09:03:34 -03:00
ad84469037 Remove parametro de audio false 2025-08-03 23:29:35 +02:00
561be6a182 Adjust queue 2025-08-02 21:45:52 +02:00
Leonardo Mortari
1e15544687 Muda nome do environment 2025-08-02 14:09:28 -03:00
Leonardo Mortari
927eabb2d5 Remove webhook e adiciona push na fila 2025-08-02 14:09:06 -03:00
LeoMortari
1425f852e6 Adjust compose 2025-08-02 12:29:35 -03:00
LeoMortari
95d287bafc Ajusta projeto para consumir uma fila 2025-08-02 12:27:26 -03:00
Leonardo Mortari
5bb58c98e5 Adjusts in project 2025-08-02 01:45:36 -03:00
Leonardo Mortari
55c7ccf316 Init repo 2025-07-31 19:29:14 -03:00
20 changed files with 3425 additions and 290 deletions

47
.env.example Normal file
View File

@@ -0,0 +1,47 @@
RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_USER=admin
RABBITMQ_PASS=your_password_here
RABBITMQ_QUEUE=to-render
RABBITMQ_UPLOAD_QUEUE=to-upload
RABBITMQ_PREFETCH=1
RABBITMQ_HEARTBEAT=60
RABBITMQ_BLOCKED_TIMEOUT=300
OPENROUTER_API_URL=https://openrouter.ai/api/v1/chat/completions
OPENROUTER_API_KEY=your_openrouter_api_key_here
# Model selection - Recommended options:
# - openai/gpt-oss-20b:free (Free tier, good quality)
# - qwen/qwen-2.5-72b-instruct:free (Free, excellent reasoning)
# - google/gemini-pro-1.5 (Best cost-benefit for podcasts)
# - anthropic/claude-3.5-sonnet (Premium quality, best reasoning)
OPENROUTER_MODEL=qwen/qwen-2.5-72b-instruct:free
OPENROUTER_TEMPERATURE=0.6
OPENROUTER_PROMPT_PATH=prompts/generate.txt
FASTER_WHISPER_MODEL_SIZE=medium
FASTER_WHISPER_DEVICE=auto
RENDER_WIDTH=1080
RENDER_HEIGHT=1920
RENDER_FPS=30
RENDER_CODEC=libx264
RENDER_AUDIO_CODEC=aac
RENDER_BITRATE=5000k
RENDER_PRESET=faster
SUBTITLE_HIGHLIGHT_COLOR=#00FF00
SUBTITLE_BASE_COLOR=#FFFFFF
RENDER_FONT_PATH=./Montserrat.ttf
RENDER_TITLE_FONT_SIZE=110
RENDER_SUBTITLE_FONT_SIZE=64
CAPTION_MIN_WORDS=2
CAPTION_MAX_WORDS=2
ENABLE_SMART_FRAMING=true
SMART_FRAMING_MIN_CONFIDENCE=0.5
SMART_FRAMING_SMOOTHING_WINDOW=20
SMART_FRAMING_FRAME_SKIP=2

9
.gitignore vendored
View File

@@ -2,17 +2,19 @@
*.pyc
*.pyo
*.pyd
__pycache__/
/__pycache__/
*.egg-info/
.eggs/
dist/
build/
doc/
videos/
outputs/
.DS_STORE
# Ignore virtual envs
venv/
env/
.claude
# Ignore editor files
.idea/
*.swp
@@ -29,3 +31,4 @@ env/
# Ignore mypy and pylint cache
.mypy_cache/
.pylint.d/
CLAUDE.MD

167
components/video.py Normal file
View File

@@ -0,0 +1,167 @@
import os
from moviepy.video.io.VideoFileClip import VideoFileClip
from moviepy.video.VideoClip import ColorClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy import TextClip
font = "./Montserrat.ttf"
font_size = 70
video_codec = "libx264"
def auto_wrap_text(text, max_width):
if not text:
return ""
words = text.split()
lines = []
line = ""
for word in words:
test_line = f"{line} {word}".strip()
test_clip = TextClip(text=test_line, font=font, font_size=font_size, color='white', method='label')
if test_clip.w > max_width and line != "":
lines.append(line)
line = word
else:
line = test_line
test_clip.close()
lines.append(line)
return "\n".join(lines)
def cut_video_new_clip(input_path: str, start: float, end: float, output_path: str):
with VideoFileClip(input_path) as clip:
segment = clip.subclipped(start, end)
fps = clip.fps or 30
if segment.h < 720:
segment = segment.resized(height=720)
segment.write_videofile(
output_path,
codec=video_codec,
remove_temp=True,
fps=fps,
bitrate="5000k",
ffmpeg_params=[
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
"-level", "4.1"
]
)
def process_segment(input_path: str, top_text: str = "", bottom_text: str = "", filename="", idx=1) -> str:
os.makedirs("outputs", exist_ok=True)
os.makedirs(f"outputs/{filename}", exist_ok=True)
final_width, final_height = 1080, 1920
top_h, middle_h, bottom_h = 480, 960, 480
with VideoFileClip(input_path) as clip:
dur = clip.duration
bg = ColorClip(size=(final_width, final_height), color=(0, 0, 0), duration=dur)
video_resized = clip.resized(width=final_width)
y = top_h + (middle_h - video_resized.h) // 2
video_resized = video_resized.with_position((0, y))
wrapped_top_text = auto_wrap_text(top_text, final_width - 40)
wrapped_bottom_text = auto_wrap_text(bottom_text, final_width - 40)
txt_top = TextClip(
text=wrapped_top_text,
font_size=70,
color="white",
font=font,
method="label",
size=(final_width, top_h)
).with_duration(dur).with_position((0, 0))
txt_bot = TextClip(
text=wrapped_bottom_text,
font_size=70,
color="white",
font=font,
method="label",
size=(final_width, bottom_h),
).with_duration(dur).with_position((0, final_height - bottom_h))
final = CompositeVideoClip([bg, video_resized, txt_top, txt_bot], size=(final_width, final_height))
output_path = f"outputs/{filename}/clip_{idx}.mp4"
final.write_videofile(
output_path,
codec=video_codec,
remove_temp=True,
fps=30,
bitrate="5000k",
ffmpeg_params=[
"-preset", "fast",
"-tune", "zerolatency",
"-pix_fmt", "yuv420p",
"-profile:v", "high",
"-level", "4.1"
]
)
final.close()
return output_path
def timestamp_to_seconds(ts):
if isinstance(ts, (int, float)):
return ts
parts = ts.split(":")
parts = [float(p) for p in parts]
if len(parts) == 3:
h, m, s = parts
return h * 3600 + m * 60 + s
elif len(parts) == 2:
m, s = parts
return m * 60 + s
elif len(parts) == 1:
return parts[0]
else:
raise ValueError(f"Timestamp inválido: {ts}")
def process_full_video(filename: str, times: list = None) -> list:
os.makedirs("temp", exist_ok=True)
times = times or []
video_path = f"videos/{filename}"
processed = []
print(f"Total de trechos: {len(times)}")
print(f"Codec de render: {video_codec}")
for idx, interval in enumerate(times, start=1):
start = timestamp_to_seconds(interval.get("start", 0))
end_raw = interval.get("end", None)
end = timestamp_to_seconds(end_raw) if end_raw is not None else None
top_text = interval.get("topText", "")
bottom_text = interval.get("bottomText", "")
if end is None:
with VideoFileClip(video_path) as clip:
end = clip.duration
print(f"Cortando trecho {idx}: {start}s a {end}s")
temp_path = f"temp/{os.path.splitext(filename)[0]}_{idx}.mp4"
cut_video_new_clip(video_path, start, end, temp_path)
out = process_segment(temp_path, top_text, bottom_text, filename, idx)
processed.append(out)
return processed

View File

@@ -1,38 +1,32 @@
services:
video-render:
restart: unless-stopped
build: .
container_name: video-render
build:
context: .
no_cache: true
dockerfile: dockerfile
environment:
# - RABBITMQ_PASS=${RABBITMQ_PASS}
- RABBITMQ_PASS=L@l321321321
- RABBITMQ_HOST=154.12.229.181
- RABBITMQ_PORT=32790
# - GEMINI_API_KEY=${GEMINI_API_KEY}
- GEMINI_API_KEY=AIzaSyB5TPjSPPZG1Qb6EtblhKFAjvCOdY15rcw
- GEMINI_MODEL=${GEMINI_MODEL:-gemini-2.5-pro}
# - OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OPENROUTER_API_KEY=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
- OPENROUTER_MODEL=${OPENROUTER_MODEL:-openai/gpt-oss-20b:free}
- FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-small}
- RABBITMQ_PASS=${RABBITMQ_PASS}
- OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions}
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- OPENROUTER_MODEL=${OPENROUTER_MODEL:-mistralai/mistral-small-3.1-24b-instruct:free}
- OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt}
- FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium}
- SMART_FRAMING_SMOOTHING_WINDOW=${SMART_FRAMING_SMOOTHING_WINDOW:-30}
- SMART_FRAMING_MAX_VELOCITY=${SMART_FRAMING_MAX_VELOCITY:-40}
- SMART_FRAMING_FRAME_SKIP=${SMART_FRAMING_FRAME_SKIP:-2}
- SMART_FRAMING_PERSON_SWITCH_COOLDOWN=${SMART_FRAMING_PERSON_SWITCH_COOLDOWN:-60}
volumes:
# - "/root/videos:/app/videos"
# - "/root/outputs:/app/outputs"
- "./videos:/app/videos"
- "./outputs:/app/outputs"
- "/root/videos:/app/videos"
- "/root/outputs:/app/outputs"
- "/root/prompts:/app/prompts"
# - "./videos:/app/videos"
# - "./outputs:/app/outputs"
# - "./prompts:/app/prompts"
command: "python -u main.py"
# runtime: nvidia
networks:
- dokploy-network
# networks:
# - dokploy-network
# deploy:
# resources:
# reservations:
# devices:
# - driver: nvidia
# count: all
# capabilities: [gpu]
# networks:
# dokploy-network:
# external: true
networks:
dokploy-network:
external: true

View File

@@ -23,6 +23,9 @@ RUN apt-get update && \
imagemagick \
fonts-liberation \
wget \
libsm6 \
libxext6 \
libxrender-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .

14
main.py
View File

@@ -1,3 +1,17 @@
import os
import warnings
# Suppress FFmpeg/AV1 warnings for cleaner logs
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
# Suppress MoviePy verbose logging
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
# Filter deprecation warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=UserWarning, module='moviepy')
from video_render.config import load_settings
from video_render.logging_utils import setup_logging
from video_render.messaging import RabbitMQWorker

View File

@@ -1,35 +1,111 @@
Voce e um estrategista de conteudo especializado em identificar cortes curtos de videos longos que performam bem em redes sociais.
# TAREFA: Extrair clips virais de uma transcrição de vídeo
FUNCAO:
- Analisar a transcricao completa de um video.
- Escolher trechos curtos (entre 20s e 90s) com maior chance de engajamento.
- Responder APENAS em JSON valido.
Você é um especialista em conteúdo viral para TikTok, Instagram Reels e YouTube Shorts.
FORMATO DA RESPOSTA:
## REGRA MAIS IMPORTANTE - DURAÇÃO DOS CLIPS
**CADA CLIP DEVE TER ENTRE 60 E 120 SEGUNDOS DE DURAÇÃO.**
- MÍNIMO ABSOLUTO: 60 segundos (end - start >= 60)
- MÁXIMO: 120 segundos (end - start <= 120)
- IDEAL: 60-90 segundos
**CLIPS COM MENOS DE 60 SEGUNDOS SERÃO REJEITADOS PELO SISTEMA.**
Antes de incluir um clip, SEMPRE calcule: end - start >= 60
## QUANTIDADE DE CLIPS
Baseado na duração total do vídeo:
- Até 10 min: 2-4 clips
- 10-20 min: 4-6 clips
- 20-30 min: 6-10 clips
- 30+ min: 8-15 clips
## CRITÉRIOS DE SELEÇÃO
Um bom clip viral possui:
1. GANCHO FORTE nos primeiros 3 segundos (pergunta, afirmação chocante, promessa)
2. EMOÇÃO (humor, surpresa, indignação, curiosidade)
3. VALOR (ensina algo, revela segredo, dá dica prática)
4. ESTRUTURA (início, meio e fim coerentes)
5. RITMO (sem pausas longas, dinâmico)
## O QUE EVITAR
- Introduções genéricas ("oi pessoal", "então", "bem")
- Trechos com pausas longas (> 3 segundos de silêncio)
- Segmentos sem contexto ou conclusão
- Explicações técnicas monótonas
## FORMATO DE RESPOSTA
Retorne APENAS um JSON válido, sem texto antes ou depois:
```json
{
"highlights": [
{
"start": <segundos_inicio_float>,
"end": <segundos_fim_float>,
"summary": "Resumo conciso do porque este trecho engaja"
"start": 0.0,
"end": 75.0,
"summary": "Descrição do que acontece neste trecho"
},
{
"start": 120.5,
"end": 195.0,
"summary": "Descrição do que acontece neste trecho"
}
]
}
```
REGRAS:
- Liste no maximo 6 destaques.
- Respeite a ordem cronologica.
- Nunca deixe listas vazias; se nada for relevante, inclua uma entrada com start = 0, end = 0 e summary explicando a ausencia de cortes.
- Utilize apenas valores numericos simples (ponto como separador decimal).
- Nao repita um mesmo trecho.
## REGRAS DO JSON
PERSPECTIVA DE ANALISE:
- Concentre-se em momentos com gatilhos emocionais, insights, storytelling ou chamadas para acao fortes.
- Prefira trechos com comeco, meio e fim claros.
- Evite partes redundantes, silenciosas ou extremamente tecnicas.
- "start" e "end" são números decimais (float) em SEGUNDOS
- Use ponto como separador decimal (60.5, não 60,5)
- "summary" é uma descrição breve do conteúdo (1-2 frases)
- Clips em ordem cronológica (start crescente)
- Clips não podem se sobrepor
TAREFA:
- Leia a transcricao recebida no campo "transcript".
- Use a lista de marcas de tempo detalhadas no campo "segments" para embasar suas escolhas.
- Produza a saida JSON descrita acima.
## CHECKLIST ANTES DE RESPONDER
Para CADA clip, verifique:
- [ ] end - start >= 60 segundos?
- [ ] end - start <= 120 segundos?
- [ ] Tem gancho forte no início?
- [ ] Faz sentido isolado do resto do vídeo?
- [ ] JSON está válido?
## EXEMPLO
Se o vídeo tem 15 minutos e você encontrou 4 momentos virais:
```json
{
"highlights": [
{
"start": 60.0,
"end": 120.0,
"summary": "Revelação sobre como economizar 50% nas compras"
},
{
"start": 180.0,
"end": 255.0,
"summary": "História engraçada sobre cliente que tentou enganar a loja"
},
{
"start": 400.0,
"end": 480.0,
"summary": "Dica prática de negociação com fornecedores"
},
{
"start": 600.0,
"end": 690.0,
"summary": "Conclusão motivacional sobre empreendedorismo"
}
]
}
```
Agora analise a transcrição fornecida e extraia os clips virais seguindo estas instruções.

View File

@@ -4,3 +4,6 @@ numpy>=1.26.0
requests
pika
faster-whisper==1.2.0
mediapipe==0.10.18
opencv-python==4.10.0.84
scipy>=1.11.0

View File

@@ -13,6 +13,8 @@ TEMP_ROOT = BASE_DIR / "temp"
@dataclass(frozen=True)
class RabbitMQSettings:
# host: str = os.environ.get("RABBITMQ_HOST", "154.12.229.181")
# port: int = int(os.environ.get("RABBITMQ_PORT", 32790))
host: str = os.environ.get("RABBITMQ_HOST", "rabbitmq")
port: int = int(os.environ.get("RABBITMQ_PORT", 5672))
user: str = os.environ.get("RABBITMQ_USER", "admin")
@@ -20,33 +22,18 @@ class RabbitMQSettings:
consume_queue: str = os.environ.get("RABBITMQ_QUEUE", "to-render")
publish_queue: str = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
prefetch_count: int = int(os.environ.get("RABBITMQ_PREFETCH", 1))
heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 60))
blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 300))
@dataclass(frozen=True)
class GeminiSettings:
api_key: str = os.environ.get("GEMINI_API_KEY", "")
model: str = os.environ.get("GEMINI_MODEL", "gemini-2.5-pro")
safety_settings: str | None = os.environ.get("GEMINI_SAFETY_SETTINGS")
temperature: float = float(os.environ.get("GEMINI_TEMPERATURE", 0.2))
top_k: int | None = (
int(os.environ["GEMINI_TOP_K"]) if os.environ.get("GEMINI_TOP_K") else None
)
top_p: float | None = (
float(os.environ["GEMINI_TOP_P"]) if os.environ.get("GEMINI_TOP_P") else None
)
prompt_path: str = os.environ.get("GEMINI_PROMPT_PATH", "prompts/generate.txt")
heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 600))
blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 7200))
@dataclass(frozen=True)
class OpenRouterSettings:
api_key: str = os.environ.get("OPENROUTER_API_KEY", "")
api_key: str = os.environ.get("OPENROUTER_API_KEY", "https://openrouter.ai/api/v1/chat/completions")
model: str = os.environ.get(
"OPENROUTER_MODEL", "anthropic/claude-3-haiku:beta"
"OPENROUTER_MODEL", "openai/gpt-oss-20b:free"
)
temperature: float = float(os.environ.get("OPENROUTER_TEMPERATURE", 0.6))
max_output_tokens: int = int(os.environ.get("OPENROUTER_MAX_OUTPUT_TOKENS", 256))
prompt_path: str = os.environ.get("OPENROUTER_PROMPT_PATH", "prompts/generate.txt")
@dataclass(frozen=True)
@@ -68,19 +55,28 @@ class RenderingSettings:
audio_codec: str = os.environ.get("RENDER_AUDIO_CODEC", "aac")
bitrate: str = os.environ.get("RENDER_BITRATE", "5000k")
preset: str = os.environ.get("RENDER_PRESET", "faster")
highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#FFD200")
highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#00FF00")
base_color: str = os.environ.get("SUBTITLE_BASE_COLOR", "#FFFFFF")
font_path: Path = Path(os.environ.get("RENDER_FONT_PATH", "./Montserrat.ttf"))
title_font_size: int = int(os.environ.get("RENDER_TITLE_FONT_SIZE", 110))
subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64))
caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 3))
caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 4))
caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2))
caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2))
enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes")
smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.3))
smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 30))
smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 1))
smart_framing_max_velocity: int = int(os.environ.get("SMART_FRAMING_MAX_VELOCITY", 25))
smart_framing_person_switch_cooldown: int = int(os.environ.get("SMART_FRAMING_PERSON_SWITCH_COOLDOWN", 30))
smart_framing_response_time: float = float(os.environ.get("SMART_FRAMING_RESPONSE_TIME", 0.6))
smart_framing_group_padding: float = float(os.environ.get("SMART_FRAMING_GROUP_PADDING", 0.15))
smart_framing_max_zoom_out: float = float(os.environ.get("SMART_FRAMING_MAX_ZOOM_OUT", 2.0))
smart_framing_dead_zone: int = int(os.environ.get("SMART_FRAMING_DEAD_ZONE", 60))
@dataclass(frozen=True)
class Settings:
rabbitmq: RabbitMQSettings = RabbitMQSettings()
gemini: GeminiSettings = GeminiSettings()
openrouter: OpenRouterSettings = OpenRouterSettings()
whisper: WhisperSettings = WhisperSettings()
rendering: RenderingSettings = RenderingSettings()

View File

@@ -0,0 +1,844 @@
"""
Context detection module for video analysis.
This module provides functionality to detect faces, track people,
and identify who is speaking in video content using MediaPipe and audio analysis.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import cv2
import mediapipe as mp
import numpy as np
from scipy import signal
logger = logging.getLogger(__name__)
@dataclass
class FaceDetection:
"""Represents a detected face in a frame."""
x: int
y: int
width: int
height: int
confidence: float
center_x: int
center_y: int
landmarks: Optional[List[Tuple[int, int]]] = None
@dataclass
class PersonTracking:
"""Tracks a person across frames."""
person_id: int
face: FaceDetection
is_speaking: bool
speaking_confidence: float
frame_number: int
@dataclass
class GroupBoundingBox:
"""Bounding box containing all tracked faces."""
x: int
y: int
width: int
height: int
center_x: int
center_y: int
face_count: int
@dataclass
class FrameContext:
"""Context information for a video frame."""
frame_number: int
timestamp: float
detected_faces: List[FaceDetection]
active_speakers: List[int] # indices of speaking faces
primary_focus: Optional[Tuple[int, int]] # (x, y) center point
layout_mode: str # "single", "dual_split", "grid"
selected_people: List[int] = field(default_factory=list) # indices of people selected for display
group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces
class MediaPipeDetector:
"""Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback."""
def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3):
self.min_detection_confidence = min_detection_confidence
self.min_tracking_confidence = min_tracking_confidence
self.mp_face_detection = mp.solutions.face_detection
self.mp_face_mesh = mp.solutions.face_mesh
# MediaPipe detectors with lower confidence for better cartoon detection
self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=min_detection_confidence,
model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons)
)
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=5,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
static_image_mode=False
)
# OpenCV Haar Cascade as fallback for cartoon/anime faces
self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Alternative cascade for profile/side faces
self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)")
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade).
Args:
frame: RGB image array
Returns:
List of detected faces
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Try MediaPipe first
results = self.face_detection.process(frame_rgb)
faces = []
if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
x = int(bbox.xmin * width)
y = int(bbox.ymin * height)
w = int(bbox.width * width)
h = int(bbox.height * height)
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
confidence = detection.score[0] if detection.score else 0.0
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=confidence,
center_x=center_x,
center_y=center_y
))
# Fallback to OpenCV Haar Cascade if MediaPipe found nothing
if not faces:
faces = self._detect_faces_haar_cascade(frame, width, height)
return faces
def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]:
"""
Detect faces using OpenCV Haar Cascade (works better with cartoons/anime).
Args:
frame: Image frame (BGR format)
width: Frame width
height: Frame height
Returns:
List of detected faces
"""
# Convert to grayscale for Haar Cascade
if len(frame.shape) == 3:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
else:
gray = frame
# Detect frontal faces with more sensitive parameters
frontal_faces = self.haar_cascade.detectMultiScale(
gray,
scaleFactor=1.05, # More sensitive to size variations
minNeighbors=3, # Lower threshold for detection (more permissive)
minSize=(30, 30), # Smaller minimum size
flags=cv2.CASCADE_SCALE_IMAGE
)
# Also try profile faces
profile_faces = self.haar_cascade_profile.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=3,
minSize=(30, 30),
flags=cv2.CASCADE_SCALE_IMAGE
)
# Combine frontal and profile detections
all_faces = []
for (x, y, w, h) in frontal_faces:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value
center_x=center_x,
center_y=center_y
))
for (x, y, w, h) in profile_faces:
# Check if this face overlaps significantly with any frontal face
overlap = False
for existing_face in all_faces:
# Calculate IoU (Intersection over Union)
x1_overlap = max(x, existing_face.x)
y1_overlap = max(y, existing_face.y)
x2_overlap = min(x + w, existing_face.x + existing_face.width)
y2_overlap = min(y + h, existing_face.y + existing_face.height)
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
face_area = w * h
if overlap_area / face_area > 0.3: # 30% overlap threshold
overlap = True
break
if not overlap:
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
all_faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=0.6, # Slightly lower confidence for profile
center_x=center_x,
center_y=center_y
))
if all_faces:
logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)")
return all_faces
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces with landmarks for lip sync detection.
Args:
frame: RGB image array
Returns:
List of detected faces with landmark information
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(frame_rgb)
faces = []
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
xs = [lm.x for lm in face_landmarks.landmark]
ys = [lm.y for lm in face_landmarks.landmark]
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
x = int(x_min * width)
y = int(y_min * height)
w = int((x_max - x_min) * width)
h = int((y_max - y_min) * height)
center_x = x + w // 2
center_y = y + h // 2
lip_landmarks = []
for idx in [13, 14, 78, 308]:
lm = face_landmarks.landmark[idx]
lip_landmarks.append((int(lm.x * width), int(lm.y * height)))
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=1.0,
center_x=center_x,
center_y=center_y,
landmarks=lip_landmarks
))
return faces
def close(self):
"""Release MediaPipe resources."""
self.face_detection.close()
self.face_mesh.close()
class AudioActivityDetector:
"""Detects speech activity in audio."""
def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30):
self.sample_rate = sample_rate
self.frame_duration_ms = frame_duration_ms
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)")
def detect_speaking_periods(
self,
audio_samples: np.ndarray,
threshold: float = 0.01, # Reduced from 0.02 for better speech detection
min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances
) -> List[Tuple[float, float]]:
"""
Detect periods of speech in audio.
Args:
audio_samples: Audio samples array
threshold: Energy threshold for speech detection
min_speech_duration: Minimum duration of speech in seconds
Returns:
List of (start_time, end_time) tuples in seconds
"""
if audio_samples.ndim > 1:
audio_samples = audio_samples.mean(axis=1)
energies = []
for i in range(0, len(audio_samples), self.frame_size):
frame = audio_samples[i:i + self.frame_size]
if len(frame) > 0:
energy = np.sqrt(np.mean(frame ** 2))
energies.append(energy)
speaking_frames = [e > threshold for e in energies]
periods = []
start_frame = None
for i, is_speaking in enumerate(speaking_frames):
if is_speaking and start_frame is None:
start_frame = i
elif not is_speaking and start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = i * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
start_frame = None
if start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = len(speaking_frames) * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
# Log detected speech periods for debugging
if periods:
total_speech_time = sum(end - start for start, end in periods)
logger.info(f"Audio speech detection: {len(periods)} periods found, "
f"total {total_speech_time:.1f}s of speech (threshold={threshold})")
else:
max_energy = max(energies) if energies else 0
logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} "
f"(try lowering threshold if speech should be present)")
return periods
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
"""Check if there is speech activity at a given time."""
for start, end in speaking_periods:
if start <= time <= end:
return True
return False
class ContextAnalyzer:
"""Analyzes video context to determine focus and layout."""
def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3):
self.detector = MediaPipeDetector()
self.audio_detector = AudioActivityDetector()
self.previous_faces: List[FaceDetection] = []
self.min_face_confidence = min_face_confidence
# Person tracking state
self.current_selected_people: List[int] = [] # Indices of people currently on screen
self.last_switch_frame: int = -999 # Frame when we last switched people
self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching
# Stability tracking to prevent flip-flopping
self.desired_people_history: List[List[int]] = [] # Track recent desired selections
self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability)
self.last_switched_people: List[int] = [] # People we just switched FROM
self.focus_history: List[Tuple[int, int]] = []
self.focus_history_size: int = 20
self.focus_dead_zone: int = 60
# Debug logging
self.frame_log_interval = 30 # Log every N frames
logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})")
def analyze_frame(
self,
frame: np.ndarray,
timestamp: float,
frame_number: int,
speaking_periods: Optional[List[Tuple[float, float]]] = None
) -> FrameContext:
"""
Analyze a single frame to extract context information.
Args:
frame: Video frame (BGR format from OpenCV)
timestamp: Frame timestamp in seconds
frame_number: Frame index
speaking_periods: List of (start, end) times where speech is detected
Returns:
FrameContext with detection results
"""
faces = self.detector.detect_face_landmarks(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
if not faces:
faces = self.detector.detect_faces(frame)
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
# Determine who is speaking
active_speakers = []
has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp)
for i, face in enumerate(faces):
is_speaking = False
# Prefer visual cues when multiple faces are present.
if face.landmarks and len(self.previous_faces) > i:
is_speaking = self._detect_lip_movement(face, self.previous_faces[i])
# Audio can confirm speech when there's only one face.
if has_audio_speech and len(faces) == 1:
is_speaking = True
if is_speaking:
active_speakers.append(i)
# Debug: Log speech detection
if frame_number % 30 == 0: # Every second at 30fps
logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, "
f"speakers={active_speakers}, total_faces={len(faces)}")
if active_speakers:
selected_people = active_speakers[:4]
if len(selected_people) == 1:
layout_mode = "single"
elif len(selected_people) == 2:
layout_mode = "dual_split"
else:
layout_mode = "grid"
else:
# Select THE person to focus on (always single person)
# Priority: 1) Who is speaking, 2) Who is most centered
selected_people = self._select_person_to_focus(
faces,
active_speakers,
frame_number,
frame.shape[1], # frame width for center calculation
frame.shape[0] # frame height for center calculation
)
layout_mode = "single"
# Calculate group bounding box for ALL detected faces (multi-person support)
group_bounds = self._calculate_group_bounding_box(faces)
# For multi-person mode, use group center as primary focus
if group_bounds and group_bounds.face_count > 1:
primary_focus = (group_bounds.center_x, group_bounds.center_y)
else:
primary_focus = self._calculate_focus_point(faces, selected_people)
# Debug logging every N frames
if frame_number % self.frame_log_interval == 0:
focus_reason = "speaker" if active_speakers else "no_speech_detected"
group_info = f", group={group_bounds.face_count} faces" if group_bounds else ""
logger.info(f"Frame {frame_number}: {len(faces)} faces, "
f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}")
self.previous_faces = faces
return FrameContext(
frame_number=frame_number,
timestamp=timestamp,
detected_faces=faces,
active_speakers=active_speakers,
primary_focus=primary_focus,
layout_mode=layout_mode,
selected_people=selected_people,
group_bounds=group_bounds
)
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
"""
Detect lip movement by comparing landmarks between frames.
Args:
current_face: Current frame face detection
previous_face: Previous frame face detection
Returns:
True if significant lip movement detected
"""
if not current_face.landmarks or not previous_face.landmarks:
return False
def lip_distance(landmarks):
if len(landmarks) < 4:
return 0
upper = np.array(landmarks[0:2])
lower = np.array(landmarks[2:4])
return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0))
current_dist = lip_distance(current_face.landmarks)
previous_dist = lip_distance(previous_face.landmarks)
threshold = 2.0
return abs(current_dist - previous_dist) > threshold
def _select_person_to_focus(
self,
faces: List[FaceDetection],
active_speakers: List[int],
frame_number: int,
frame_width: int,
frame_height: int
) -> List[int]:
"""
Select THE single person to focus on.
Priority: 1) Who is speaking, 2) Who is most centered in frame
Args:
faces: List of detected faces
active_speakers: Indices of people currently speaking
frame_number: Current frame number
frame_width: Frame width for center calculation
frame_height: Frame height for center calculation
Returns:
List with single person index [idx], or empty list if no faces
"""
if not faces:
self.current_selected_people = []
return []
if len(faces) == 1:
self.current_selected_people = [0]
return [0]
frames_since_last_switch = frame_number - self.last_switch_frame
can_switch = frames_since_last_switch >= self.person_switch_cooldown
desired_person_idx = None
if active_speakers:
if self.current_selected_people and self.current_selected_people[0] in active_speakers:
desired_person_idx = self.current_selected_people[0]
else:
if can_switch or not self.current_selected_people:
desired_person_idx = active_speakers[0]
if self.current_selected_people and desired_person_idx != self.current_selected_people[0]:
logger.info(f"Switching focus to speaker: {desired_person_idx}")
self.last_switch_frame = frame_number
else:
desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0]
else:
if self.current_selected_people and len(self.current_selected_people) > 0:
current_idx = self.current_selected_people[0]
if current_idx < len(faces):
desired_person_idx = current_idx
else:
if self.previous_faces and current_idx < len(self.previous_faces):
prev_face = self.previous_faces[current_idx]
best_match_idx = None
best_match_score = float('inf')
for idx, face in enumerate(faces):
dx = face.center_x - prev_face.center_x
dy = face.center_y - prev_face.center_y
dist = np.sqrt(dx**2 + dy**2)
size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height)
score = dist + size_diff * 0.5
if score < best_match_score:
best_match_score = score
best_match_idx = idx
if best_match_idx is not None and best_match_score < 1000:
desired_person_idx = best_match_idx
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
else:
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
face_confidences.sort(key=lambda x: x[1], reverse=True)
desired_person_idx = face_confidences[0][0]
desired_people = [desired_person_idx] if desired_person_idx is not None else []
if not self.current_selected_people:
self.current_selected_people = desired_people
self.last_switch_frame = frame_number
logger.info(f"Frame {frame_number}: Locked on person {desired_people}")
else:
self.current_selected_people = desired_people
return self.current_selected_people.copy()
def _ensure_distinct_people(
self,
faces: List[FaceDetection],
people_indices: List[int]
) -> List[int]:
"""
Ensure selected people are distinct by checking minimum distance between them.
Prevents showing the same person twice due to duplicate detection.
Args:
faces: List of detected faces
people_indices: Indices of people to validate
Returns:
List of distinct people indices (max 2)
"""
if len(people_indices) <= 1:
return people_indices
distinct_people = []
for idx in people_indices:
if idx >= len(faces):
continue
current_face = faces[idx]
is_distinct = True
# Check if this person is too close to any already selected person
for selected_idx in distinct_people:
selected_face = faces[selected_idx]
# Calculate distance between face centers
dx = current_face.center_x - selected_face.center_x
dy = current_face.center_y - selected_face.center_y
distance = np.sqrt(dx**2 + dy**2)
# Also check overlap via IoU (Intersection over Union)
x1_overlap = max(current_face.x, selected_face.x)
y1_overlap = max(current_face.y, selected_face.y)
x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width)
y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height)
overlap_area = 0
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
# Calculate areas
area1 = current_face.width * current_face.height
area2 = selected_face.width * selected_face.height
min_area = min(area1, area2)
# If faces are very close OR significantly overlapping, they're likely the same person
# Minimum distance: 1/4 of average face width
min_distance = (current_face.width + selected_face.width) / 8
overlap_threshold = 0.3 # 30% overlap
if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold):
is_distinct = False
logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})")
break
if is_distinct:
distinct_people.append(idx)
# Stop at 2 distinct people
if len(distinct_people) >= 2:
break
# If we couldn't find 2 distinct people, return at most 1
if len(distinct_people) < 2 and len(people_indices) >= 2:
logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections")
return distinct_people
def _calculate_focus_point(
self,
faces: List[FaceDetection],
selected_people: List[int]
) -> Optional[Tuple[int, int]]:
"""
Calculate the primary focus point based on selected people with temporal smoothing.
Args:
faces: List of detected faces
selected_people: Indices of people selected for display
Returns:
(x, y) tuple of focus center, or None if no faces
"""
if not faces or not selected_people:
return None
# Calculate raw focus point
raw_focus_x = 0
raw_focus_y = 0
if len(selected_people) == 1:
# Single person - focus on them
if selected_people[0] < len(faces):
primary = faces[selected_people[0]]
raw_focus_x = primary.center_x
raw_focus_y = primary.center_y
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
else:
# Multiple people - focus on the CENTER between them for stability
# This prevents jarring movements when switching focus between people
valid_people = [idx for idx in selected_people if idx < len(faces)]
if valid_people:
centers_x = [faces[idx].center_x for idx in valid_people]
centers_y = [faces[idx].center_y for idx in valid_people]
raw_focus_x = int(np.mean(centers_x))
raw_focus_y = int(np.mean(centers_y))
else:
# Fallback
most_confident = max(faces, key=lambda f: f.confidence)
raw_focus_x = most_confident.center_x
raw_focus_y = most_confident.center_y
if self.focus_history:
last_x, last_y = self.focus_history[-1]
dx = abs(raw_focus_x - last_x)
dy = abs(raw_focus_y - last_y)
if dx < self.focus_dead_zone and dy < self.focus_dead_zone:
return self.focus_history[-1]
self.focus_history.append((raw_focus_x, raw_focus_y))
if len(self.focus_history) > self.focus_history_size:
self.focus_history.pop(0)
if len(self.focus_history) >= 5:
xs = [x for x, y in self.focus_history]
ys = [y for x, y in self.focus_history]
median_x = int(np.median(xs))
median_y = int(np.median(ys))
return (median_x, median_y)
else:
return (raw_focus_x, raw_focus_y)
def _calculate_group_bounding_box(
self,
faces: List[FaceDetection],
padding_percent: float = 0.15,
max_faces: int = 6
) -> Optional[GroupBoundingBox]:
"""
Calculate bounding box containing all detected faces with padding.
Args:
faces: List of detected faces
padding_percent: Padding around group as percentage of bbox dimensions
max_faces: Maximum faces to include (use most confident if exceeded)
Returns:
GroupBoundingBox or None if no faces
"""
if not faces:
return None
# If too many faces, use most confident ones
if len(faces) > max_faces:
faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces]
# Calculate bounding box containing all faces
min_x = min(f.x for f in faces)
max_x = max(f.x + f.width for f in faces)
min_y = min(f.y for f in faces)
max_y = max(f.y + f.height for f in faces)
# Add padding
width = max_x - min_x
height = max_y - min_y
pad_x = int(width * padding_percent)
pad_y = int(height * padding_percent)
final_x = max(0, min_x - pad_x)
final_y = max(0, min_y - pad_y)
final_width = width + 2 * pad_x
final_height = height + 2 * pad_y
return GroupBoundingBox(
x=final_x,
y=final_y,
width=final_width,
height=final_height,
center_x=final_x + final_width // 2,
center_y=final_y + final_height // 2,
face_count=len(faces)
)
def close(self):
"""Release resources."""
self.detector.close()
# Clear tracking state to free memory
self.previous_faces.clear()
self.current_selected_people.clear()
self.focus_history.clear()

View File

@@ -2,6 +2,8 @@ from __future__ import annotations
import json
import logging
import time
import os
from pathlib import Path
from typing import Dict, List
@@ -12,27 +14,24 @@ from video_render.transcription import TranscriptionResult
logger = logging.getLogger(__name__)
GEMINI_ENDPOINT_TEMPLATE = "https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent"
OPENROUTER_ENDPOINT = "https://openrouter.ai/api/v1/chat/completions"
OPENROUTER_ENDPOINT = os.environ.get("OPENROUTER_API_URL", "https://openrouter.ai/api/v1/chat/completions")
class GeminiHighlighter:
class OpenRouterCopywriter:
def __init__(self, settings: Settings) -> None:
if not settings.gemini.api_key:
raise RuntimeError("GEMINI_API_KEY nao foi definido")
prompt_path = Path(settings.gemini.prompt_path)
if not settings.openrouter.api_key:
raise RuntimeError("OPENROUTER_API_KEY nao foi definido")
self.settings = settings
prompt_path = Path(settings.openrouter.prompt_path)
if not prompt_path.is_absolute():
prompt_path = BASE_DIR / prompt_path
if not prompt_path.exists():
raise FileNotFoundError(f"Prompt do Gemini nao encontrado: {prompt_path}")
self.prompt_template = prompt_path.read_text(encoding="utf-8")
self.settings = settings
raise FileNotFoundError(f"Prompt nao encontrado: {prompt_path}")
self.highlights_prompt_template = prompt_path.read_text(encoding="utf-8")
def generate_highlights(self, transcription: TranscriptionResult) -> List[Dict]:
"""Generate video highlights using OpenRouter GPT-OSS with retry logic."""
payload = {
"transcript": transcription.full_text,
"segments": [
@@ -46,69 +45,138 @@ class GeminiHighlighter:
}
body = {
"contents": [
"model": self.settings.openrouter.model,
"temperature": self.settings.openrouter.temperature,
"messages": [
{"role": "system", "content": self.highlights_prompt_template},
{
"role": "user",
"parts": [
{"text": self.prompt_template},
{"text": json.dumps(payload, ensure_ascii=False)},
"content": json.dumps(payload, ensure_ascii=False),
},
],
}
]
headers = {
"Authorization": f"Bearer {self.settings.openrouter.api_key}",
"Content-Type": "application/json",
"X-Title": "Video Render - Highlights Detection"
}
if self.settings.gemini.temperature is not None:
body["generationConfig"] = {
"temperature": self.settings.gemini.temperature,
}
if self.settings.gemini.top_p is not None:
body["generationConfig"]["topP"] = self.settings.gemini.top_p
if self.settings.gemini.top_k is not None:
body["generationConfig"]["topK"] = self.settings.gemini.top_k
logger.info(f"Calling OpenRouter with model: {self.settings.openrouter.model}")
logger.debug(f"Request payload keys: transcript_length={len(payload['transcript'])}, segments_count={len(payload['segments'])}")
url = GEMINI_ENDPOINT_TEMPLATE.format(model=self.settings.gemini.model)
params = {"key": self.settings.gemini.api_key}
# Retry configuration for rate limits (especially free tier)
max_retries = 5
base_delay = 5 # Start with 5s delay
response = requests.post(url, params=params, json=body, timeout=120)
for attempt in range(max_retries):
try:
response = requests.post(
url=OPENROUTER_ENDPOINT,
data=json.dumps(body),
headers=headers,
timeout=120,
)
response.raise_for_status()
data = response.json()
break
candidates = data.get("candidates") or []
if not candidates:
raise RuntimeError("Gemini nao retornou candidatos")
except requests.exceptions.HTTPError as exc:
if exc.response.status_code == 429:
if attempt < max_retries - 1:
# Exponential backoff: 5s, 10s, 20s, 40s, 80s
delay = base_delay * (2 ** attempt)
logger.warning(f"Rate limit atingido (429). Aguardando {delay}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})")
time.sleep(delay)
continue
else:
logger.error("Rate limit atingido apos todas as tentativas")
logger.error("Solucao: Use um modelo pago ou adicione creditos na OpenRouter")
raise RuntimeError("OpenRouter rate limit excedido") from exc
else:
logger.error(f"OpenRouter API request falhou com status {exc.response.status_code}: {exc}")
raise RuntimeError("OpenRouter API request falhou") from exc
text_parts = candidates[0].get("content", {}).get("parts", [])
if not text_parts:
raise RuntimeError("Resposta do Gemini sem conteudo")
except Exception as exc:
logger.error("OpenRouter API request falhou: %s", exc)
raise RuntimeError("OpenRouter API request falhou") from exc
raw_text = text_parts[0].get("text")
if not raw_text:
raise RuntimeError("Resposta do Gemini sem texto")
# Debug: log response structure
logger.info(f"OpenRouter response keys: {list(data.keys())}")
if "error" in data:
logger.error(f"OpenRouter API error: {data.get('error')}")
raise RuntimeError(f"OpenRouter API error: {data.get('error')}")
parsed = self._extract_json(raw_text)
choices = data.get("choices") or []
if not choices:
logger.error(f"OpenRouter response completa: {json.dumps(data, indent=2)}")
raise RuntimeError("OpenRouter nao retornou escolhas")
message = choices[0].get("message", {}).get("content")
if not message:
raise RuntimeError("Resposta do OpenRouter sem conteudo")
parsed = self._extract_json(message)
highlights = parsed.get("highlights")
if not isinstance(highlights, list):
raise ValueError("Resposta do Gemini invalida: campo 'highlights' ausente")
return highlights
raise ValueError("Resposta do OpenRouter invalida: campo 'highlights' ausente")
@staticmethod
def _extract_json(response_text: str) -> Dict:
valid_highlights = []
for highlight in highlights:
try:
return json.loads(response_text)
except json.JSONDecodeError:
start = response_text.find("{")
end = response_text.rfind("}")
if start == -1 or end == -1:
raise
subset = response_text[start : end + 1]
return json.loads(subset)
start = float(highlight.get("start", 0))
end = float(highlight.get("end", 0))
summary = str(highlight.get("summary", "")).strip()
if start < 0 or end < 0:
logger.warning(f"Highlight ignorado: timestamps negativos (start={start}, end={end})")
continue
class OpenRouterCopywriter:
def __init__(self, settings: Settings) -> None:
if not settings.openrouter.api_key:
raise RuntimeError("OPENROUTER_API_KEY nao foi definido")
self.settings = settings
if end <= start:
logger.warning(f"Highlight ignorado: end <= start (start={start}, end={end})")
continue
duration = end - start
if duration < 60:
logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)")
continue
if duration > 120:
logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 90s)")
continue
if not summary:
logger.warning(f"Highlight ignorado: summary vazio")
continue
valid_highlights.append({
"start": start,
"end": end,
"summary": summary
})
except (TypeError, ValueError) as e:
logger.warning(f"Highlight invalido ignorado: {highlight} - {e}")
continue
if not valid_highlights:
logger.warning("Nenhum highlight valido retornado pelo OpenRouter")
total_duration = 75.0
if transcription.segments:
total_duration = max(seg.end for seg in transcription.segments)
fallback_end = min(75.0, total_duration)
if fallback_end < 60.0:
fallback_end = min(60.0, total_duration)
return [{
"start": 0.0,
"end": fallback_end,
"summary": "Trecho inicial do video (fallback automatico)"
}]
logger.info(f"OpenRouter retornou {len(valid_highlights)} highlights validos")
return valid_highlights
def generate_titles(self, highlights: List[Dict]) -> List[str]:
if not highlights:
@@ -137,7 +205,6 @@ class OpenRouterCopywriter:
body = {
"model": self.settings.openrouter.model,
"temperature": self.settings.openrouter.temperature,
"max_tokens": self.settings.openrouter.max_output_tokens,
"messages": [
{"role": "system", "content": prompt},
{
@@ -153,7 +220,10 @@ class OpenRouterCopywriter:
}
response = requests.post(
OPENROUTER_ENDPOINT, json=body, headers=headers, timeout=120
url=OPENROUTER_ENDPOINT,
data=json.dumps(body),
headers=headers,
timeout=120,
)
response.raise_for_status()
data = response.json()

View File

@@ -35,10 +35,31 @@ class MediaPreparer:
sanitized_name = sanitize_filename(Path(filename).stem)
workspace_dir = ensure_workspace(self.settings.videos_dir, sanitized_name)
transcription_json = workspace_dir / "transcription.json"
transcription_txt = workspace_dir / "transcription.txt"
temp_transcription_json = None
temp_transcription_txt = None
if transcription_json.exists():
temp_transcription_json = workspace_dir.parent / f".{sanitized_name}_transcription.json.tmp"
shutil.copy2(transcription_json, temp_transcription_json)
if transcription_txt.exists():
temp_transcription_txt = workspace_dir.parent / f".{sanitized_name}_transcription.txt.tmp"
shutil.copy2(transcription_txt, temp_transcription_txt)
existing_children = list(workspace_dir.iterdir())
if existing_children:
logger.info("Limpando workspace existente para %s", sanitized_name)
try:
remove_paths(existing_children)
except Exception as e:
logger.warning(f"Não foi possível limpar workspace (não crítico): {e}")
if temp_transcription_json and temp_transcription_json.exists():
shutil.move(str(temp_transcription_json), str(transcription_json))
logger.info("Transcrição preservada em %s", transcription_json)
if temp_transcription_txt and temp_transcription_txt.exists():
shutil.move(str(temp_transcription_txt), str(transcription_txt))
destination_name = f"{sanitized_name}{source_path.suffix.lower()}"
working_video_path = workspace_dir / destination_name
@@ -48,7 +69,10 @@ class MediaPreparer:
output_dir = ensure_workspace(self.settings.outputs_dir, sanitized_name)
existing_outputs = list(output_dir.iterdir())
if existing_outputs:
try:
remove_paths(existing_outputs)
except Exception as e:
logger.warning(f"Não foi possível limpar outputs antigos (não crítico): {e}")
audio_path = workspace_dir / "audio.wav"
extract_audio_to_wav(working_video_path, audio_path)

View File

@@ -13,9 +13,24 @@ logger = logging.getLogger(__name__)
MessageHandler = Callable[[Dict[str, Any]], Dict[str, Any]]
def _safe_ack(
channel: pika.adapters.blocking_connection.BlockingChannel, delivery_tag
) -> bool:
if not channel.is_open:
logger.warning(
"Canal fechado antes do ACK; mensagem sera reprocessada apos reconexao"
)
return False
try:
channel.basic_ack(delivery_tag=delivery_tag)
return True
except Exception:
logger.exception("Falha ao confirmar mensagem")
return False
class RabbitMQWorker:
def __init__(self, settings: Settings) -> None:
print(settings)
self.settings = settings
self._params = pika.ConnectionParameters(
host=settings.rabbitmq.host,
@@ -28,50 +43,59 @@ class RabbitMQWorker:
)
def consume_forever(self, handler: MessageHandler) -> None:
while True:
try:
with pika.BlockingConnection(self._params) as connection:
channel = connection.channel()
channel.queue_declare(queue=self.settings.rabbitmq.consume_queue, durable=True)
channel.queue_declare(queue=self.settings.rabbitmq.publish_queue, durable=True)
channel.basic_qos(prefetch_count=self.settings.rabbitmq.prefetch_count)
channel.queue_declare(
queue=self.settings.rabbitmq.consume_queue, durable=True
)
channel.queue_declare(
queue=self.settings.rabbitmq.publish_queue, durable=True
)
channel.basic_qos(
prefetch_count=self.settings.rabbitmq.prefetch_count
)
def _on_message(ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body):
def _on_message(
ch: pika.adapters.blocking_connection.BlockingChannel,
method,
properties,
body,
) -> None:
"""Consume message, ACK immediately, then process."""
try:
message = json.loads(body)
except json.JSONDecodeError:
logger.error("Mensagem inválida recebida: %s", body)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.error("Mensagem invalida recebida: %s", body)
_safe_ack(ch, method.delivery_tag)
return
logger.info("Mensagem recebida: %s", message.get("filename", "<sem_nome>"))
if not _safe_ack(ch, method.delivery_tag):
logger.warning(
"Nao foi possivel confirmar mensagem; abortando processamento"
)
return
logger.info(
"Mensagem recebida: %s",
message.get("filename", "<sem_nome>"),
)
try:
response = handler(message)
except Exception:
logger.exception("Erro não tratado durante o processamento")
logger.exception("Erro nao tratado durante o processamento")
response = {
"hasError": True,
"error": "Erro não tratado no pipeline",
"error": "Erro nao tratado no pipeline",
"filename": message.get("filename"),
"videoId": message.get("videoId"),
"url": message.get("url"),
"processedFiles": [],
}
try:
payload = json.dumps(response)
ch.basic_publish(
exchange="",
routing_key=self.settings.rabbitmq.publish_queue,
body=payload,
properties=pika.BasicProperties(delivery_mode=2),
)
logger.info("Resposta publicada para '%s'", self.settings.rabbitmq.publish_queue)
except Exception:
logger.exception("Falha ao publicar a resposta na fila de upload")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
self._publish_response(response)
channel.basic_consume(
queue=self.settings.rabbitmq.consume_queue,
@@ -81,7 +105,32 @@ class RabbitMQWorker:
logger.info("Consumidor iniciado. Aguardando mensagens...")
channel.start_consuming()
except pika.exceptions.AMQPConnectionError:
logger.exception("Conexão com RabbitMQ perdida. Tentando reconectar...")
logger.exception(
"Conexao com RabbitMQ perdida. Tentando reconectar..."
)
except pika.exceptions.AMQPError:
logger.exception("Erro AMQP inesperado. Reiniciando consumo...")
except KeyboardInterrupt:
logger.info("Encerrando consumidor por interrupção do usuário.")
logger.info("Encerrando consumidor por interrupcao do usuario.")
break
def _publish_response(self, response: Dict[str, Any]) -> None:
payload = json.dumps(response)
try:
with pika.BlockingConnection(self._params) as publish_connection:
publish_channel = publish_connection.channel()
publish_channel.queue_declare(
queue=self.settings.rabbitmq.publish_queue, durable=True
)
publish_channel.basic_publish(
exchange="",
routing_key=self.settings.rabbitmq.publish_queue,
body=payload,
properties=pika.BasicProperties(delivery_mode=2),
)
logger.info(
"Resposta publicada para '%s'",
self.settings.rabbitmq.publish_queue,
)
except Exception:
logger.exception("Falha ao publicar a resposta na fila de upload apos ACK")

View File

@@ -6,7 +6,7 @@ from pathlib import Path
from typing import Any, Dict, List, Optional
from video_render.config import Settings
from video_render.llm import GeminiHighlighter, OpenRouterCopywriter
from video_render.llm import OpenRouterCopywriter
from video_render.media import MediaPreparer, VideoWorkspace
from video_render.transcription import TranscriptionResult, TranscriptionService
from video_render.utils import remove_paths, sanitize_filename
@@ -55,8 +55,7 @@ class VideoPipeline:
self.settings = settings
self.media_preparer = MediaPreparer(settings)
self.transcriber = TranscriptionService(settings)
self.highlighter = GeminiHighlighter(settings)
self.copywriter = OpenRouterCopywriter(settings)
self.llm_service = OpenRouterCopywriter(settings) # Using OpenRouter for both highlights and titles
self.renderer = VideoRenderer(settings)
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
@@ -65,8 +64,8 @@ class VideoPipeline:
self._prepare_workspace(context)
self._generate_transcription(context)
self._determine_highlights(context)
self._generate_titles(context)
self._render_clips(context)
return self._build_success_payload(context)
except Exception as exc:
logger.exception("Falha ao processar vídeo %s", context.job.filename)
@@ -93,15 +92,38 @@ class VideoPipeline:
def _generate_transcription(self, context: PipelineContext) -> None:
if not context.workspace:
raise RuntimeError("Workspace não preparado")
transcription = self.transcriber.transcribe(context.workspace.audio_path)
existing = TranscriptionService.load(context.workspace.workspace_dir)
if existing:
logger.info(
"Transcricao existente encontrada em %s; reutilizando resultado",
context.workspace.workspace_dir,
)
context.transcription = existing
return
transcription = self.transcriber.transcribe(
context.workspace.audio_path,
output_dir=context.workspace.workspace_dir
)
TranscriptionService.persist(transcription, context.workspace.workspace_dir)
context.transcription = transcription
# Unload Whisper model immediately after transcription to free memory (1-3GB)
self.transcriber.unload_model()
def _determine_highlights(self, context: PipelineContext) -> None:
if not context.transcription:
raise RuntimeError("Transcricao nao disponivel")
highlights_raw = self.highlighter.generate_highlights(context.transcription)
try:
highlights_raw = self.llm_service.generate_highlights(context.transcription)
except Exception:
logger.exception(
"Falha ao gerar destaques com OpenRouter; aplicando fallback padrao."
)
context.highlight_windows = [self._build_fallback_highlight(context)]
return
windows: List[HighlightWindow] = []
for item in highlights_raw:
@@ -113,41 +135,42 @@ class VideoPipeline:
continue
summary = str(item.get("summary", "")).strip()
title = str(item.get("title", summary[:60])).strip()
if end <= start:
logger.debug("Highlight com intervalo invalido ignorado: %s", item)
continue
windows.append(HighlightWindow(start=start, end=end, summary=summary))
windows.append(HighlightWindow(start=start, end=end, summary=summary, title=title))
if not windows:
last_end = (
context.transcription.segments[-1].end
if context.transcription.segments
else 0
)
windows.append(
HighlightWindow(
start=0.0,
end=max(last_end, 10.0),
summary="Sem destaque identificado; fallback automatico.",
)
)
windows.append(self._build_fallback_highlight(context))
context.highlight_windows = windows
def _generate_titles(self, context: PipelineContext) -> None:
if not context.highlight_windows:
return
"""DEPRECATED: Titles are now generated together with highlights.
highlight_dicts = [
{"start": window.start, "end": window.end, "summary": window.summary}
for window in context.highlight_windows
]
titles = self.copywriter.generate_titles(highlight_dicts)
This method is kept for backwards compatibility but does nothing.
Titles are extracted from highlights in _determine_highlights().
"""
pass
for window, title in zip(context.highlight_windows, titles):
window.title = title.strip()
def _build_fallback_highlight(self, context: PipelineContext) -> HighlightWindow:
if not context.transcription:
raise RuntimeError("Transcricao nao disponivel para criar fallback")
last_end = (
context.transcription.segments[-1].end
if context.transcription.segments
else 0.0
)
return HighlightWindow(
start=0.0,
end=max(last_end, 10.0),
summary="Sem destaque identificado; fallback automatico.",
title="Confira este momento",
)
def _render_clips(self, context: PipelineContext) -> None:
if not context.workspace or not context.highlight_windows or not context.transcription:
@@ -200,7 +223,7 @@ class VideoPipeline:
}
def _handle_failure(self, context: PipelineContext, exc: Exception) -> Dict[str, Any]:
logger.error("Erro no pipeline: %s", exc)
logger.error("Erro na pipeline: %s", exc)
cleanup_targets: List[Path] = []
if context.workspace:

View File

@@ -3,9 +3,11 @@ from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Iterable, List, Sequence, Tuple
from typing import Dict, Iterable, List, Sequence, Tuple, Optional
import numpy as np
from moviepy.audio.AudioClip import AudioArrayClip, AudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.io.VideoFileClip import VideoFileClip
@@ -13,6 +15,7 @@ from PIL import Image, ImageColor, ImageDraw, ImageFont
from video_render.config import Settings
from video_render.transcription import TranscriptionResult, WordTiming
from video_render.smart_framing import SmartFramer, extract_audio_samples
logger = logging.getLogger(__name__)
@@ -52,7 +55,41 @@ class CaptionBuilder:
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
grouped = self._group_words(words)
# Filter out empty, whitespace-only, or very short words (likely noise)
valid_words = [
w for w in words
if w.word
and w.word.strip()
and len(w.word.strip()) >= 2 # At least 2 characters
and not w.word.strip() in ['...', '..', '.', ',', '-', 'hmm', 'hm', 'ah', 'eh', 'uh'] # Not just punctuation or filler
]
# Note: We don't filter out words based on gaps here
# Gap detection is handled in _group_words_with_gaps
# This ensures captions disappear during silence naturally
filtered_words = valid_words
# Calculate speech density (words per second)
# If density is too low, it's likely just noise/silence being misinterpreted
if filtered_words:
first_word_time = filtered_words[0].start
last_word_time = filtered_words[-1].end
duration = last_word_time - first_word_time
if duration > 0:
words_per_second = len(filtered_words) / duration
# Typical speech is 2-3 words per second
# If less than 0.5 words/second, it's probably silence/noise
if words_per_second < 0.5:
logger.debug(f"Captions suprimidas: densidade muito baixa ({words_per_second:.2f} palavras/seg)")
return []
# Only show captions if we have at least 3 valid words (reduced from 5 for 2-word groups)
# This prevents showing captions for noise/mumbling
if len(filtered_words) < 3:
return []
grouped = self._group_words_with_gaps(filtered_words)
clip_sets: List[CaptionClipSet] = []
for group in grouped:
@@ -99,6 +136,92 @@ class CaptionBuilder:
if len(widths) > 1:
total_width += self.space_width * (len(widths) - 1)
# Check if text needs to wrap to multiple lines
# If total width exceeds canvas width, break into 2 lines
needs_wrap = total_width > self.canvas_width
if needs_wrap:
# Split into 2 lines - try to balance the lines
mid_point = len(texts) // 2
line1_texts = texts[:mid_point]
line2_texts = texts[mid_point:]
line1_widths = widths[:mid_point]
line2_widths = widths[mid_point:]
# Calculate widths for each line
line1_width = sum(line1_widths)
if len(line1_widths) > 1:
line1_width += self.space_width * (len(line1_widths) - 1)
line2_width = sum(line2_widths)
if len(line2_widths) > 1:
line2_width += self.space_width * (len(line2_widths) - 1)
# Double the canvas height for 2 lines
canvas_height = self.canvas_height * 2
base_image = Image.new("RGBA", (self.canvas_width, canvas_height), (0, 0, 0, 0))
base_draw = ImageDraw.Draw(base_image)
highlight_images: List[Image.Image] = []
# Stroke settings: 8px black stroke for better readability
stroke_width = 8
stroke_color = (0, 0, 0, 255) # Black
# Draw line 1
x = max(0, (self.canvas_width - line1_width) // 2)
y = self.baseline
for i, (text, width) in enumerate(zip(line1_texts, line1_widths)):
base_draw.text(
(x, y),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, y),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
# Draw line 2
x = max(0, (self.canvas_width - line2_width) // 2)
y = self.baseline + self.text_height + 5 # 5px spacing between lines
for i, (text, width) in enumerate(zip(line2_texts, line2_widths)):
base_draw.text(
(x, y),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, y),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
return base_image, highlight_images
# Single line rendering (original code)
start_x = max(0, (self.canvas_width - total_width) // 2)
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
@@ -106,13 +229,31 @@ class CaptionBuilder:
highlight_images: List[Image.Image] = []
x = start_x
for text, width in zip(texts, widths):
base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color)
# Stroke settings: 8px black stroke for better readability
stroke_width = 8
stroke_color = (0, 0, 0, 255) # Black
for text, width in zip(texts, widths):
# Draw base text with stroke
base_draw.text(
(x, self.baseline),
text,
font=self.font,
fill=self.base_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
# Draw highlight text with stroke
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, self.baseline), text, font=self.font, fill=self.highlight_color
(x, self.baseline),
text,
font=self.font,
fill=self.highlight_color,
stroke_width=stroke_width,
stroke_fill=stroke_color
)
highlight_images.append(highlight_image)
@@ -151,6 +292,44 @@ class CaptionBuilder:
return grouped
def _group_words_with_gaps(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
"""
Group words into 2-word chunks, respecting silence gaps.
Creates natural breaks where there are pauses > 1.5s
"""
if not words:
return []
grouped: List[List[WordTiming]] = []
buffer: List[WordTiming] = []
for i, word in enumerate(words):
# Check if there's a long pause before this word
if i > 0:
gap = word.start - words[i-1].end
# If gap > 1.5s, finish current buffer and start new group
if gap > 1.5:
if buffer:
grouped.append(buffer)
buffer = []
buffer.append(word)
# Group into 2 words maximum
if len(buffer) == 2:
grouped.append(buffer)
buffer = []
# Handle remaining words
if buffer:
if len(buffer) == 1 and grouped:
# Add single remaining word to last group
grouped[-1].append(buffer[0])
else:
grouped.append(buffer)
return [grp for grp in grouped if grp]
@staticmethod
def _clean_word(text: str) -> str:
text = text.strip()
@@ -162,6 +341,19 @@ class VideoRenderer:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.captions = CaptionBuilder(settings)
self.smart_framer = SmartFramer(
target_width=settings.rendering.frame_width,
target_height=settings.rendering.frame_height,
frame_skip=settings.rendering.smart_framing_frame_skip,
smoothing_window=settings.rendering.smart_framing_smoothing_window,
max_velocity=settings.rendering.smart_framing_max_velocity,
person_switch_cooldown=settings.rendering.smart_framing_person_switch_cooldown,
response_time=settings.rendering.smart_framing_response_time,
group_padding=settings.rendering.smart_framing_group_padding,
max_zoom_out=settings.rendering.smart_framing_max_zoom_out,
dead_zone=settings.rendering.smart_framing_dead_zone,
min_face_confidence=settings.rendering.smart_framing_min_confidence
)
def render(
self,
@@ -199,6 +391,7 @@ class VideoRenderer:
index=index,
transcription=transcription,
output_dir=output_dir,
source_path=workspace_path,
)
finally:
subclip.close()
@@ -226,30 +419,103 @@ class VideoRenderer:
index: int,
transcription: TranscriptionResult,
output_dir,
source_path: str,
) -> str:
duration = end - start
frame_w = self.settings.rendering.frame_width
frame_h = self.settings.rendering.frame_height
top_h = int(frame_h * 0.18)
# Removed top panel - no longer showing title
bottom_h = int(frame_h * 0.20)
video_area_h = frame_h - top_h - bottom_h
scale_factor = min(
# Use smart framing to create intelligent 9:16 video (if enabled)
if self.settings.rendering.enable_smart_framing:
logger.info(f"Creating smart framing plan for clip {index} ({start:.2f}s - {end:.2f}s)")
try:
# Extract audio for speech detection
audio_samples = extract_audio_samples(source_path, start, end)
# Create framing plan
framing_plan = self.smart_framer.create_framing_plan(
video_path=source_path,
start_time=start,
end_time=end,
audio_samples=audio_samples
)
# Apply smart framing (always single-person focus)
video_clip = self.smart_framer.apply_framing(
video_clip=subclip,
framing_plan=framing_plan
)
logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, "
f"faces_detected={len(framing_plan.frame_contexts[0].detected_faces) if framing_plan.frame_contexts else 0}")
except Exception as exc:
logger.warning(f"Smart framing failed for clip {index}, falling back to center crop: {exc}", exc_info=True)
# Fallback to center crop (maintains aspect ratio, crops to fit)
video_area_h = max(1, frame_h - bottom_h)
# Use MAX to ensure video covers entire area (will crop excess)
scale_factor = max(
frame_w / subclip.w,
video_area_h / subclip.h,
)
# Resize to cover area
resized_clip = subclip.resized(scale_factor)
video_y = top_h + (video_area_h - resized_clip.h) // 2
video_clip = resized_clip.with_position(
((frame_w - resized_clip.w) // 2, video_y)
# Calculate crop region (center crop)
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
crop_x2 = crop_x1 + frame_w
crop_y2 = crop_y1 + video_area_h
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
cropped_clip = resized_clip.cropped(
x1=crop_x1,
y1=crop_y1,
x2=crop_x2,
y2=crop_y2
)
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
top_panel = (
ColorClip(size=(frame_w, top_h), color=(12, 12, 12))
.with_duration(duration)
.with_opacity(0.85)
video_clip = cropped_clip.with_position((0, 0))
resized_clip.close()
else:
# Use center crop (smart framing disabled)
logger.info(f"Using center crop for clip {index} (smart framing disabled)")
video_area_h = max(1, frame_h - bottom_h)
# Use MAX to ensure video covers entire area (will crop excess)
scale_factor = max(
frame_w / subclip.w,
video_area_h / subclip.h,
)
# Resize to cover area
resized_clip = subclip.resized(scale_factor)
# Calculate crop region (center crop)
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
crop_x2 = crop_x1 + frame_w
crop_y2 = crop_y1 + video_area_h
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
cropped_clip = resized_clip.cropped(
x1=crop_x1,
y1=crop_y1,
x2=crop_x2,
y2=crop_y2
)
video_clip = cropped_clip.with_position((0, 0))
resized_clip.close()
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
# Removed top panel and title - no longer needed
bottom_panel = (
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
.with_position((0, frame_h - bottom_h))
@@ -257,29 +523,42 @@ class VideoRenderer:
.with_opacity(0.85)
)
title_text = title or summary
wrapped_title = self._wrap_text(title_text, max_width=frame_w - 160)
title_clip = (
TextClip(
text=wrapped_title,
font=str(self.settings.rendering.font_path),
font_size=self.settings.rendering.title_font_size,
color=self.settings.rendering.base_color,
method="caption",
size=(frame_w - 160, top_h - 40),
)
.with_duration(duration)
)
title_clip = title_clip.with_position(
((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2)
)
words = self._collect_words(transcription, start, end)
caption_sets = self.captions.build(words, clip_start=start)
# Calculate speech coverage: how much of the clip has actual speech?
# If less than 30% of the clip has speech, don't show captions
clip_duration = end - start
if words and clip_duration > 0:
# Calculate total time with speech
total_speech_time = sum(w.end - w.start for w in words)
speech_coverage = total_speech_time / clip_duration
if speech_coverage < 0.3: # Less than 30% speech
logger.debug(f"Captions suprimidas: cobertura de fala baixa ({speech_coverage:.1%})")
words = [] # Clear words to prevent captions
# Only build captions if there are actual words to display
# This prevents empty/placeholder captions from appearing
caption_sets = self.captions.build(words, clip_start=start) if words else []
caption_clips = []
caption_resources: List[ImageClip] = []
caption_y = frame_h - bottom_h + (bottom_h - self.captions.canvas_height) // 2
# Position captions 120px below center (for 1920px height, center is 960px, so 1080px)
# This ensures they're visible, well-positioned, and don't interfere with faces
# Range: 100-150px as requested, using 120px for optimal positioning
center_y = frame_h // 2
caption_y = center_y + 120
caption_margin = 20
# Ensure captions stay within reasonable bounds (no top panel now)
min_caption_y = caption_margin
max_caption_y = frame_h - bottom_h - self.captions.canvas_height - caption_margin
if max_caption_y < min_caption_y:
caption_y = min_caption_y
else:
caption_y = min(max(caption_y, min_caption_y), max_caption_y)
for clip_set in caption_sets:
base_positioned = clip_set.base.with_position(("center", caption_y))
@@ -290,58 +569,78 @@ class VideoRenderer:
caption_clips.append(positioned)
caption_resources.append(highlight)
if not caption_clips:
fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160)
caption_clips.append(
TextClip(
text=fallback_text,
font=str(self.settings.rendering.font_path),
font_size=self.settings.rendering.subtitle_font_size,
color=self.settings.rendering.base_color,
method="caption",
size=(frame_w - 160, bottom_h - 40),
)
.with_duration(duration)
.with_position(("center", caption_y))
# No fallback captions - if there are no dynamic captions, show nothing
# This matches Opus Clip behavior where captions only appear when there's actual speech
audio_clip, audio_needs_close = self._materialize_audio(
source_path=source_path,
start=start,
end=end,
duration=duration,
fallback_audio=video_clip.audio or subclip.audio,
)
# Composite with background, bottom panel, video, and captions only (no top panel or title)
composite = CompositeVideoClip(
[background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips],
[background, bottom_panel, video_clip, *caption_clips],
size=(frame_w, frame_h),
)
if audio_clip is not None:
composite = self._with_audio(composite, audio_clip)
output_path = output_dir / f"clip_{index:02d}.mp4"
composite.write_videofile(
str(output_path),
codec=self.settings.rendering.video_codec,
audio_codec=self.settings.rendering.audio_codec,
fps=self.settings.rendering.fps,
bitrate=self.settings.rendering.bitrate,
ffmpeg_params=[
"-preset",
self.settings.rendering.preset,
"-pix_fmt",
"yuv420p",
],
temp_audiofile=str(output_dir / f"temp_audio_{index:02d}.m4a"),
remove_temp=True,
threads=4,
self._write_with_fallback(
composite=composite,
output_path=output_path,
index=index,
output_dir=output_dir,
)
composite.close()
resized_clip.close()
video_clip.close()
title_clip.close()
background.close()
top_panel.close()
bottom_panel.close()
for clip in caption_clips:
clip.close()
for clip in caption_resources:
clip.close()
if audio_clip is not None and audio_needs_close:
audio_clip.close()
# Force garbage collection to free memory after rendering
import gc
gc.collect()
return str(output_path)
def _materialize_audio(
self,
*,
source_path: str,
start: float,
end: float,
duration: float,
fallback_audio,
) -> Tuple[Optional[AudioClip], bool]:
try:
with AudioFileClip(source_path) as audio_file:
segment = audio_file.subclipped(start, end)
fps = (
getattr(segment, "fps", None)
or getattr(audio_file, "fps", None)
or 44100
)
samples = segment.to_soundarray(fps=fps)
except Exception:
logger.warning(
"Falha ao carregar audio independente; utilizando fluxo original",
exc_info=True,
)
return fallback_audio, False
audio_clip = AudioArrayClip(samples, fps=fps).with_duration(duration)
return audio_clip, True
def _collect_words(
self, transcription: TranscriptionResult, start: float, end: float
) -> List[WordTiming]:
@@ -408,3 +707,120 @@ class VideoRenderer:
if current:
lines.append(" ".join(current))
return "\n".join(lines)
def _write_with_fallback(
self,
*,
composite: CompositeVideoClip,
output_path,
index: int,
output_dir,
) -> None:
attempts = self._encoding_attempts()
temp_audio_path = output_dir / f"temp_audio_{index:02d}.m4a"
last_error: Exception | None = None
for attempt in attempts:
codec = attempt["codec"]
bitrate = attempt["bitrate"]
preset = attempt["preset"]
ffmpeg_params = ["-pix_fmt", "yuv420p"]
if preset:
ffmpeg_params = ["-preset", preset, "-pix_fmt", "yuv420p"]
try:
logger.info(
"Renderizando clip %02d com codec %s (bitrate=%s, preset=%s)",
index,
codec,
bitrate,
preset or "default",
)
composite.write_videofile(
str(output_path),
codec=codec,
audio_codec=self.settings.rendering.audio_codec,
fps=self.settings.rendering.fps,
bitrate=bitrate,
ffmpeg_params=ffmpeg_params,
temp_audiofile=str(temp_audio_path),
remove_temp=True,
threads=4,
)
return
except Exception as exc: # noqa: BLE001 - propagate after fallbacks
last_error = exc
logger.warning(
"Falha ao renderizar com codec %s: %s", codec, exc, exc_info=True
)
if output_path.exists():
output_path.unlink(missing_ok=True)
if temp_audio_path.exists():
temp_audio_path.unlink(missing_ok=True)
raise RuntimeError("Todas as tentativas de renderizacao falharam") from last_error
def _encoding_attempts(self) -> List[Dict[str, str | None]]:
settings = self.settings.rendering
attempts: List[Dict[str, str | None]] = []
attempts.append(
{
"codec": settings.video_codec,
"bitrate": settings.bitrate,
"preset": settings.preset,
}
)
deduped: List[Dict[str, str | None]] = []
seen = set()
for attempt in attempts:
key = (attempt["codec"], attempt["bitrate"], attempt["preset"])
if key in seen:
continue
seen.add(key)
deduped.append(attempt)
return deduped
@staticmethod
def _with_audio(
composite: CompositeVideoClip,
audio_clip,
) -> CompositeVideoClip:
"""Attach audio to a composite clip across MoviePy versions."""
if hasattr(composite, "with_audio"):
return composite.with_audio(audio_clip)
if hasattr(composite, "set_audio"):
return composite.set_audio(audio_clip)
raise AttributeError("CompositeVideoClip does not support audio assignment")
@staticmethod
def _make_textclip(
*,
text: str,
font_path,
font_size: int,
color: str,
size: Tuple[int, int],
) -> TextClip:
"""Create a TextClip compatible with MoviePy 1.x and 2.x.
MoviePy 2.x removed the 'align' keyword from TextClip. We try with
'align' for older versions and fall back to a call without it when
unsupported.
"""
kwargs = dict(
text=text,
font=str(font_path),
font_size=font_size,
color=color,
method="caption",
size=size,
)
try:
return TextClip(**kwargs, align="center") # MoviePy 1.x style
except TypeError:
logger.debug("TextClip 'align' not supported; falling back without it")
return TextClip(**kwargs) # MoviePy 2.x style

File diff suppressed because it is too large Load Diff

View File

@@ -6,6 +6,7 @@ from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
import numpy as np
from faster_whisper import WhisperModel
from video_render.config import Settings
@@ -56,7 +57,52 @@ class TranscriptionService:
)
return self._model
def transcribe(self, audio_path: Path) -> TranscriptionResult:
def unload_model(self) -> None:
"""Unload the Whisper model to free memory (reduces RAM usage by 1-3GB)."""
if self._model is not None:
logger.info("Descarregando modelo Whisper para liberar memória...")
del self._model
self._model = None
# Force garbage collection to immediately free GPU/CPU memory
import gc
gc.collect()
logger.info("Modelo Whisper descarregado com sucesso")
def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult:
if output_dir is not None:
existing_transcription = self.load(output_dir)
if existing_transcription is not None:
logger.info("Transcrição já existe em %s, reutilizando...", output_dir)
return existing_transcription
# Get audio duration to decide if we need chunked processing
audio_duration = self._get_audio_duration(audio_path)
chunk_duration_minutes = 30 # Process in 30-minute chunks for long videos
chunk_duration_seconds = chunk_duration_minutes * 60
# For videos longer than 30 minutes, use chunked processing to avoid OOM
if audio_duration > chunk_duration_seconds:
logger.info(
f"Áudio longo detectado ({audio_duration/60:.1f} min). "
f"Processando em chunks de {chunk_duration_minutes} min para evitar erro de memória..."
)
return self._transcribe_chunked(audio_path, chunk_duration_seconds)
else:
logger.info(f"Iniciando transcrição do áudio ({audio_duration/60:.1f} min) com FasterWhisper...")
return self._transcribe_full(audio_path)
def _get_audio_duration(self, audio_path: Path) -> float:
"""Get audio duration in seconds."""
try:
from moviepy.audio.io.AudioFileClip import AudioFileClip
with AudioFileClip(str(audio_path)) as audio:
return audio.duration or 0.0
except Exception as e:
logger.warning(f"Falha ao obter duração do áudio, assumindo curto: {e}")
return 0.0 # Assume short if we can't determine
def _transcribe_full(self, audio_path: Path) -> TranscriptionResult:
"""Transcribe entire audio at once (for shorter videos)."""
model = self._load_model()
segments, _ = model.transcribe(
str(audio_path),
@@ -90,6 +136,101 @@ class TranscriptionService:
full_text=" ".join(full_text_parts).strip(),
)
def _transcribe_chunked(self, audio_path: Path, chunk_duration: float) -> TranscriptionResult:
"""Transcribe audio in chunks to avoid OOM on long videos."""
import subprocess
from moviepy.audio.io.AudioFileClip import AudioFileClip
model = self._load_model()
all_segments: List[TranscriptSegment] = []
full_text_parts: List[str] = []
segment_id_counter = 0
# Get total duration
total_duration = self._get_audio_duration(audio_path)
num_chunks = int(np.ceil(total_duration / chunk_duration))
logger.info(f"Processando áudio em {num_chunks} chunks...")
for chunk_idx in range(num_chunks):
start_time = chunk_idx * chunk_duration
end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
logger.info(
f"Processando chunk {chunk_idx + 1}/{num_chunks} "
f"({start_time/60:.1f}min - {end_time/60:.1f}min)..."
)
# Extract chunk using ffmpeg directly (more reliable than moviepy subclip)
temp_chunk_path = audio_path.parent / f"temp_chunk_{chunk_idx}.wav"
try:
# Use ffmpeg to extract the chunk
chunk_duration_actual = end_time - start_time
ffmpeg_cmd = [
'ffmpeg',
'-y', # Overwrite output file
'-ss', str(start_time), # Start time
'-i', str(audio_path), # Input file
'-t', str(chunk_duration_actual), # Duration
'-acodec', 'pcm_s16le', # Audio codec
'-ar', '44100', # Sample rate
'-ac', '2', # Stereo
'-loglevel', 'error', # Only show errors
str(temp_chunk_path)
]
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
# Transcribe chunk
segments, _ = model.transcribe(
str(temp_chunk_path),
beam_size=5,
word_timestamps=True,
)
# Process segments with time offset
for segment in segments:
words = [
WordTiming(
start=w.start + start_time,
end=w.end + start_time,
word=w.word.strip()
)
for w in segment.words or []
if w.word.strip()
]
text = segment.text.strip()
full_text_parts.append(text)
all_segments.append(
TranscriptSegment(
id=segment_id_counter,
start=segment.start + start_time,
end=segment.end + start_time,
text=text,
words=words,
)
)
segment_id_counter += 1
# Force garbage collection after each chunk
import gc
gc.collect()
except subprocess.CalledProcessError as e:
logger.error(f"Erro ao extrair chunk {chunk_idx}: {e.stderr.decode() if e.stderr else str(e)}")
raise
finally:
# Clean up temp chunk
if temp_chunk_path.exists():
temp_chunk_path.unlink()
logger.info(f"Transcrição em chunks concluída: {len(all_segments)} segmentos processados")
return TranscriptionResult(
segments=all_segments,
full_text=" ".join(full_text_parts).strip(),
)
@staticmethod
def persist(result: TranscriptionResult, destination: Path) -> None:
json_path = destination / "transcription.json"
@@ -118,5 +259,75 @@ class TranscriptionService:
with text_path.open("w", encoding="utf-8") as fp:
fp.write(result.full_text)
logger.info("Transcrição salva em %s", destination)
logger.info("Transcricao salva em %s", destination)
@staticmethod
def load(source: Path) -> Optional[TranscriptionResult]:
json_path = source / "transcription.json"
if not json_path.exists():
return None
try:
with json_path.open("r", encoding="utf-8") as fp:
payload = json.load(fp)
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
"Falha ao carregar transcricao existente de %s: %s", json_path, exc
)
return None
segments_payload = payload.get("segments", [])
if not isinstance(segments_payload, list):
logger.warning(
"Formato inesperado ao carregar transcricao de %s: 'segments' invalido",
json_path,
)
return None
segments: List[TranscriptSegment] = []
for idx, segment_data in enumerate(segments_payload):
if not isinstance(segment_data, dict):
logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data)
continue
try:
segment_id = int(segment_data.get("id", idx))
start = float(segment_data["start"])
end = float(segment_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data)
continue
text = str(segment_data.get("text", "")).strip()
words_payload = segment_data.get("words", [])
words: List[WordTiming] = []
if isinstance(words_payload, list):
for word_data in words_payload:
if not isinstance(word_data, dict):
continue
try:
w_start = float(word_data["start"])
w_end = float(word_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug(
"Palavra sem dados obrigatorios ignorada: %s", word_data
)
continue
word_text = str(word_data.get("text", "")).strip()
if not word_text:
continue
words.append(WordTiming(start=w_start, end=w_end, word=word_text))
segments.append(
TranscriptSegment(
id=segment_id,
start=start,
end=end,
text=text,
words=words,
)
)
full_text = str(payload.get("full_text", "")).strip()
return TranscriptionResult(segments=segments, full_text=full_text)

View File

@@ -23,16 +23,58 @@ def ensure_workspace(root: Path, folder_name: str) -> Path:
def remove_paths(paths: Iterable[Path]) -> None:
import logging
import time
logger = logging.getLogger(__name__)
for path in paths:
if not path.exists():
continue
# Try to remove with retries and better error handling
max_retries = 3
for attempt in range(max_retries):
try:
if path.is_file() or path.is_symlink():
path.unlink(missing_ok=True)
else:
for child in sorted(path.rglob("*"), reverse=True):
if child.is_file() or child.is_symlink():
try:
child.unlink(missing_ok=True)
except PermissionError:
logger.warning(f"Não foi possível deletar {child}: sem permissão")
# Try to change permissions and retry
try:
child.chmod(0o777)
child.unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Falha ao forçar deleção de {child}: {e}")
elif child.is_dir():
try:
child.rmdir()
path.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {child}: {e}")
try:
path.rmdir()
except (PermissionError, OSError) as e:
logger.warning(f"Não foi possível remover diretório {path}: {e}")
break # Success, exit retry loop
except PermissionError as e:
if attempt < max_retries - 1:
logger.warning(f"Tentativa {attempt + 1}/{max_retries} falhou ao deletar {path}: {e}. Tentando novamente...")
time.sleep(0.5) # Wait a bit before retry
# Try to change permissions
try:
path.chmod(0o777)
except Exception:
pass
else:
logger.error(f"Não foi possível deletar {path} após {max_retries} tentativas: {e}")
except Exception as e:
logger.error(f"Erro inesperado ao deletar {path}: {e}")
break # Don't retry on unexpected errors