Compare commits
22 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f496663b63 | ||
|
|
e4c5c6adfe | ||
|
|
21d2d19435 | ||
|
|
3f7329869d | ||
|
|
c1914dad00 | ||
|
|
07d301f110 | ||
|
|
78e35d65fd | ||
|
|
c5d3e83a5f | ||
|
|
87c6a5e27c | ||
|
|
ae8b228ea1 | ||
|
|
8abb8001d7 | ||
|
|
c18884e778 | ||
|
|
b5a27fa938 | ||
|
|
2692cc4dfd | ||
|
|
8caa849148 | ||
|
|
ba768cf093 | ||
|
|
b9e1dcd1e2 | ||
|
|
c641fd6331 | ||
|
|
b090f7c2cb | ||
| 2b99d2ad78 | |||
|
|
7ccc745a5d | ||
|
|
0c0a9c3b5c |
47
.env.example
Normal file
47
.env.example
Normal file
@@ -0,0 +1,47 @@
|
||||
RABBITMQ_HOST=rabbitmq
|
||||
RABBITMQ_PORT=5672
|
||||
RABBITMQ_USER=admin
|
||||
RABBITMQ_PASS=your_password_here
|
||||
RABBITMQ_QUEUE=to-render
|
||||
RABBITMQ_UPLOAD_QUEUE=to-upload
|
||||
RABBITMQ_PREFETCH=1
|
||||
RABBITMQ_HEARTBEAT=60
|
||||
RABBITMQ_BLOCKED_TIMEOUT=300
|
||||
OPENROUTER_API_URL=https://openrouter.ai/api/v1/chat/completions
|
||||
OPENROUTER_API_KEY=your_openrouter_api_key_here
|
||||
|
||||
# Model selection - Recommended options:
|
||||
# - openai/gpt-oss-20b:free (Free tier, good quality)
|
||||
# - qwen/qwen-2.5-72b-instruct:free (Free, excellent reasoning)
|
||||
# - google/gemini-pro-1.5 (Best cost-benefit for podcasts)
|
||||
# - anthropic/claude-3.5-sonnet (Premium quality, best reasoning)
|
||||
OPENROUTER_MODEL=qwen/qwen-2.5-72b-instruct:free
|
||||
OPENROUTER_TEMPERATURE=0.6
|
||||
OPENROUTER_PROMPT_PATH=prompts/generate.txt
|
||||
|
||||
FASTER_WHISPER_MODEL_SIZE=medium
|
||||
FASTER_WHISPER_DEVICE=auto
|
||||
|
||||
RENDER_WIDTH=1080
|
||||
RENDER_HEIGHT=1920
|
||||
|
||||
RENDER_FPS=30
|
||||
RENDER_CODEC=libx264
|
||||
RENDER_AUDIO_CODEC=aac
|
||||
RENDER_BITRATE=5000k
|
||||
RENDER_PRESET=faster
|
||||
|
||||
SUBTITLE_HIGHLIGHT_COLOR=#00FF00
|
||||
SUBTITLE_BASE_COLOR=#FFFFFF
|
||||
|
||||
RENDER_FONT_PATH=./Montserrat.ttf
|
||||
RENDER_TITLE_FONT_SIZE=110
|
||||
RENDER_SUBTITLE_FONT_SIZE=64
|
||||
|
||||
CAPTION_MIN_WORDS=2
|
||||
CAPTION_MAX_WORDS=2
|
||||
|
||||
ENABLE_SMART_FRAMING=true
|
||||
SMART_FRAMING_MIN_CONFIDENCE=0.5
|
||||
SMART_FRAMING_SMOOTHING_WINDOW=20
|
||||
SMART_FRAMING_FRAME_SKIP=2
|
||||
38
.gitignore
vendored
38
.gitignore
vendored
@@ -1,4 +1,34 @@
|
||||
/videos
|
||||
/outputs
|
||||
/temp
|
||||
/components/__pycache__
|
||||
# Ignore Python files
|
||||
*.pyc
|
||||
*.pyo
|
||||
*.pyd
|
||||
/__pycache__/
|
||||
*.egg-info/
|
||||
.eggs/
|
||||
dist/
|
||||
build/
|
||||
doc/
|
||||
videos/
|
||||
outputs/
|
||||
.DS_STORE
|
||||
# Ignore virtual envs
|
||||
venv/
|
||||
env/
|
||||
.claude
|
||||
# Ignore editor files
|
||||
.idea/
|
||||
*.swp
|
||||
*.swo
|
||||
|
||||
# Ignore project files
|
||||
*.tmproj
|
||||
*.sublime-project
|
||||
*.sublime-workspace
|
||||
|
||||
# Ignore git itself
|
||||
.git
|
||||
|
||||
# Ignore mypy and pylint cache
|
||||
.mypy_cache/
|
||||
.pylint.d/
|
||||
CLAUDE.MD
|
||||
|
||||
@@ -40,18 +40,21 @@ def cut_video_new_clip(input_path: str, start: float, end: float, output_path: s
|
||||
segment = clip.subclipped(start, end)
|
||||
fps = clip.fps or 30
|
||||
|
||||
if segment.h < 720:
|
||||
segment = segment.resized(height=720)
|
||||
|
||||
segment.write_videofile(
|
||||
output_path,
|
||||
codec=video_codec,
|
||||
remove_temp=True,
|
||||
fps=fps,
|
||||
bitrate="3000k",
|
||||
bitrate="5000k",
|
||||
ffmpeg_params=[
|
||||
"-preset", "ultrafast",
|
||||
"-tune", "zerolatency",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-profile:v", "high",
|
||||
"-level", "4.1"
|
||||
"-preset", "fast",
|
||||
"-tune", "zerolatency",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-profile:v", "high",
|
||||
"-level", "4.1"
|
||||
]
|
||||
)
|
||||
|
||||
@@ -98,13 +101,13 @@ def process_segment(input_path: str, top_text: str = "", bottom_text: str = "",
|
||||
codec=video_codec,
|
||||
remove_temp=True,
|
||||
fps=30,
|
||||
bitrate="3000k",
|
||||
bitrate="5000k",
|
||||
ffmpeg_params=[
|
||||
"-preset", "ultrafast",
|
||||
"-tune", "zerolatency",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-profile:v", "high",
|
||||
"-level", "4.1"
|
||||
"-preset", "fast",
|
||||
"-tune", "zerolatency",
|
||||
"-pix_fmt", "yuv420p",
|
||||
"-profile:v", "high",
|
||||
"-level", "4.1"
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
@@ -1,32 +1,32 @@
|
||||
services:
|
||||
video-render-api:
|
||||
video-render:
|
||||
restart: unless-stopped
|
||||
build: .
|
||||
container_name: video-render
|
||||
build:
|
||||
context: .
|
||||
no_cache: true
|
||||
dockerfile: dockerfile
|
||||
environment:
|
||||
- RABBITMQ_PASS=${RABBITMQ_PASS}
|
||||
ports:
|
||||
- "5000:5000"
|
||||
- OPENROUTER_API_URL=${OPENROUTER_API_URL:-https://openrouter.ai/api/v1/chat/completions}
|
||||
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
|
||||
- OPENROUTER_MODEL=${OPENROUTER_MODEL:-mistralai/mistral-small-3.1-24b-instruct:free}
|
||||
- OPENROUTER_PROMPT_PATH=${OPENROUTER_PROMPT_PATH:-prompts/generate.txt}
|
||||
- FASTER_WHISPER_MODEL_SIZE=${FASTER_WHISPER_MODEL_SIZE:-medium}
|
||||
- SMART_FRAMING_SMOOTHING_WINDOW=${SMART_FRAMING_SMOOTHING_WINDOW:-30}
|
||||
- SMART_FRAMING_MAX_VELOCITY=${SMART_FRAMING_MAX_VELOCITY:-40}
|
||||
- SMART_FRAMING_FRAME_SKIP=${SMART_FRAMING_FRAME_SKIP:-2}
|
||||
- SMART_FRAMING_PERSON_SWITCH_COOLDOWN=${SMART_FRAMING_PERSON_SWITCH_COOLDOWN:-60}
|
||||
volumes:
|
||||
- "/root/videos:/app/videos"
|
||||
- "/root/temp:/app/temp"
|
||||
- "/root/outputs:/app/outputs"
|
||||
# gpus: all
|
||||
# environment:
|
||||
# - NVIDIA_VISIBLE_DEVICES=all
|
||||
# - NVIDIA_DRIVER_CAPABILITIES=compute,video,utility
|
||||
- "/root/prompts:/app/prompts"
|
||||
# - "./videos:/app/videos"
|
||||
# - "./outputs:/app/outputs"
|
||||
# - "./prompts:/app/prompts"
|
||||
command: "python -u main.py"
|
||||
# runtime: nvidia
|
||||
networks:
|
||||
- dokploy-network
|
||||
|
||||
# deploy:
|
||||
# resources:
|
||||
# reservations:
|
||||
# devices:
|
||||
# - driver: nvidia
|
||||
# count: all
|
||||
# capabilities: [gpu]
|
||||
networks:
|
||||
dokploy-network:
|
||||
external: true
|
||||
|
||||
47
dockerfile
47
dockerfile
@@ -2,35 +2,42 @@ FROM python:3.11-slim
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
EXPOSE 5000
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
COPY requirements.txt Montserrat.ttf ./
|
||||
ENV DEBIAN_FRONTEND=noninteractive \
|
||||
PYTHONUNBUFFERED=1 \
|
||||
PYTHONDONTWRITEBYTECODE=1
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -qq -y \
|
||||
build-essential \
|
||||
xvfb \
|
||||
xdg-utils \
|
||||
wget \
|
||||
unzip \
|
||||
apt-get install -y --no-install-recommends \
|
||||
ffmpeg \
|
||||
libpq-dev \
|
||||
vim \
|
||||
libavcodec-dev \
|
||||
libavdevice-dev \
|
||||
libavfilter-dev \
|
||||
libavformat-dev \
|
||||
libavutil-dev \
|
||||
libswresample-dev \
|
||||
libswscale-dev \
|
||||
libgl1 \
|
||||
libglib2.0-0 \
|
||||
libgomp1 \
|
||||
libmagick++-dev \
|
||||
imagemagick \
|
||||
fonts-liberation \
|
||||
sox \
|
||||
bc \
|
||||
gsfonts && \
|
||||
fc-cache -fv && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
wget \
|
||||
libsm6 \
|
||||
libxext6 \
|
||||
libxrender-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY requirements.txt .
|
||||
|
||||
RUN pip install --no-cache-dir --upgrade pip && \
|
||||
pip install --no-cache-dir setuptools wheel && \
|
||||
pip install --no-cache-dir -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
|
||||
VOLUME ["/app"]
|
||||
RUN mkdir -p /app/videos /app/outputs
|
||||
|
||||
VOLUME ["/app/videos", "/app/outputs"]
|
||||
|
||||
CMD ["python", "-u", "main.py"]
|
||||
|
||||
110
main.py
110
main.py
@@ -1,103 +1,31 @@
|
||||
import os
|
||||
import pika
|
||||
import json
|
||||
import time
|
||||
from components.video import process_full_video
|
||||
import warnings
|
||||
|
||||
RABBITMQ_HOST = os.environ.get('RABBITMQ_HOST', 'rabbitmq')
|
||||
RABBITMQ_PORT = int(os.environ.get('RABBITMQ_PORT', 5672))
|
||||
RABBITMQ_USER = os.environ.get('RABBITMQ_USER', 'admin')
|
||||
RABBITMQ_PASS = os.environ.get('RABBITMQ_PASS')
|
||||
RABBITMQ_QUEUE = os.environ.get('RABBITMQ_QUEUE', 'to-render')
|
||||
RABBITMQ_UPLOAD_QUEUE = os.environ.get('RABBITMQ_UPLOAD_QUEUE', 'to-upload')
|
||||
# Suppress FFmpeg/AV1 warnings for cleaner logs
|
||||
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
|
||||
os.environ['OPENCV_LOG_LEVEL'] = 'ERROR'
|
||||
|
||||
if not RABBITMQ_PASS:
|
||||
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
|
||||
# Suppress MoviePy verbose logging
|
||||
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = '1'
|
||||
|
||||
def get_next_message():
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=RABBITMQ_HOST,
|
||||
port=RABBITMQ_PORT,
|
||||
credentials=credentials,
|
||||
heartbeat=60,
|
||||
blocked_connection_timeout=300
|
||||
)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
method_frame, header_frame, body = channel.basic_get(RABBITMQ_QUEUE)
|
||||
if method_frame:
|
||||
channel.basic_ack(method_frame.delivery_tag)
|
||||
connection.close()
|
||||
return body
|
||||
else:
|
||||
connection.close()
|
||||
return None
|
||||
# Filter deprecation warnings
|
||||
warnings.filterwarnings('ignore', category=DeprecationWarning)
|
||||
warnings.filterwarnings('ignore', category=UserWarning, module='moviepy')
|
||||
|
||||
def publish_to_queue(payload):
|
||||
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||
parameters = pika.ConnectionParameters(
|
||||
host=RABBITMQ_HOST,
|
||||
port=RABBITMQ_PORT,
|
||||
credentials=credentials,
|
||||
heartbeat=60,
|
||||
blocked_connection_timeout=300
|
||||
)
|
||||
connection = pika.BlockingConnection(parameters)
|
||||
channel = connection.channel()
|
||||
channel.queue_declare(queue=RABBITMQ_UPLOAD_QUEUE, durable=True)
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key=RABBITMQ_UPLOAD_QUEUE,
|
||||
body=json.dumps(payload),
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2, # persistente
|
||||
)
|
||||
)
|
||||
connection.close()
|
||||
from video_render.config import load_settings
|
||||
from video_render.logging_utils import setup_logging
|
||||
from video_render.messaging import RabbitMQWorker
|
||||
from video_render.pipeline import VideoPipeline
|
||||
|
||||
def main():
|
||||
print(' [*] Esperando mensagens. Para sair: CTRL+C')
|
||||
while True:
|
||||
body = get_next_message()
|
||||
if body is None:
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
try:
|
||||
data = json.loads(body)
|
||||
filename = data.get("filename")
|
||||
times = data.get("times", [])
|
||||
url = data.get("url")
|
||||
video_id = data.get("videoId")
|
||||
print(f"Processando vídeo: {filename}")
|
||||
def main() -> None:
|
||||
setup_logging()
|
||||
settings = load_settings()
|
||||
|
||||
processed_files = process_full_video(filename, times)
|
||||
pipeline = VideoPipeline(settings)
|
||||
worker = RabbitMQWorker(settings)
|
||||
worker.consume_forever(pipeline.process_message)
|
||||
|
||||
payload = {
|
||||
"videosProcessedQuantity": len(processed_files),
|
||||
"filename": filename,
|
||||
"processedFiles": processed_files,
|
||||
"url": url,
|
||||
"videoId": video_id,
|
||||
"error": False,
|
||||
}
|
||||
except Exception as e:
|
||||
payload = {
|
||||
"videosProcessedQuantity": 0,
|
||||
"filename": filename if 'filename' in locals() else None,
|
||||
"processedFiles": [],
|
||||
"url": url if 'url' in locals() else None,
|
||||
"videoId": video_id if 'video_id' in locals() else None,
|
||||
"error": str(e),
|
||||
}
|
||||
print(f"Erro no processamento: {e}")
|
||||
|
||||
try:
|
||||
publish_to_queue(payload)
|
||||
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
|
||||
except Exception as publish_err:
|
||||
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
111
prompts/generate.txt
Normal file
111
prompts/generate.txt
Normal file
@@ -0,0 +1,111 @@
|
||||
# TAREFA: Extrair clips virais de uma transcrição de vídeo
|
||||
|
||||
Você é um especialista em conteúdo viral para TikTok, Instagram Reels e YouTube Shorts.
|
||||
|
||||
## REGRA MAIS IMPORTANTE - DURAÇÃO DOS CLIPS
|
||||
|
||||
**CADA CLIP DEVE TER ENTRE 60 E 120 SEGUNDOS DE DURAÇÃO.**
|
||||
|
||||
- MÍNIMO ABSOLUTO: 60 segundos (end - start >= 60)
|
||||
- MÁXIMO: 120 segundos (end - start <= 120)
|
||||
- IDEAL: 60-90 segundos
|
||||
|
||||
**CLIPS COM MENOS DE 60 SEGUNDOS SERÃO REJEITADOS PELO SISTEMA.**
|
||||
|
||||
Antes de incluir um clip, SEMPRE calcule: end - start >= 60
|
||||
|
||||
## QUANTIDADE DE CLIPS
|
||||
|
||||
Baseado na duração total do vídeo:
|
||||
- Até 10 min: 2-4 clips
|
||||
- 10-20 min: 4-6 clips
|
||||
- 20-30 min: 6-10 clips
|
||||
- 30+ min: 8-15 clips
|
||||
|
||||
## CRITÉRIOS DE SELEÇÃO
|
||||
|
||||
Um bom clip viral possui:
|
||||
|
||||
1. GANCHO FORTE nos primeiros 3 segundos (pergunta, afirmação chocante, promessa)
|
||||
2. EMOÇÃO (humor, surpresa, indignação, curiosidade)
|
||||
3. VALOR (ensina algo, revela segredo, dá dica prática)
|
||||
4. ESTRUTURA (início, meio e fim coerentes)
|
||||
5. RITMO (sem pausas longas, dinâmico)
|
||||
|
||||
## O QUE EVITAR
|
||||
|
||||
- Introduções genéricas ("oi pessoal", "então", "bem")
|
||||
- Trechos com pausas longas (> 3 segundos de silêncio)
|
||||
- Segmentos sem contexto ou conclusão
|
||||
- Explicações técnicas monótonas
|
||||
|
||||
## FORMATO DE RESPOSTA
|
||||
|
||||
Retorne APENAS um JSON válido, sem texto antes ou depois:
|
||||
|
||||
```json
|
||||
{
|
||||
"highlights": [
|
||||
{
|
||||
"start": 0.0,
|
||||
"end": 75.0,
|
||||
"summary": "Descrição do que acontece neste trecho"
|
||||
},
|
||||
{
|
||||
"start": 120.5,
|
||||
"end": 195.0,
|
||||
"summary": "Descrição do que acontece neste trecho"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## REGRAS DO JSON
|
||||
|
||||
- "start" e "end" são números decimais (float) em SEGUNDOS
|
||||
- Use ponto como separador decimal (60.5, não 60,5)
|
||||
- "summary" é uma descrição breve do conteúdo (1-2 frases)
|
||||
- Clips em ordem cronológica (start crescente)
|
||||
- Clips não podem se sobrepor
|
||||
|
||||
## CHECKLIST ANTES DE RESPONDER
|
||||
|
||||
Para CADA clip, verifique:
|
||||
- [ ] end - start >= 60 segundos?
|
||||
- [ ] end - start <= 120 segundos?
|
||||
- [ ] Tem gancho forte no início?
|
||||
- [ ] Faz sentido isolado do resto do vídeo?
|
||||
- [ ] JSON está válido?
|
||||
|
||||
## EXEMPLO
|
||||
|
||||
Se o vídeo tem 15 minutos e você encontrou 4 momentos virais:
|
||||
|
||||
```json
|
||||
{
|
||||
"highlights": [
|
||||
{
|
||||
"start": 60.0,
|
||||
"end": 120.0,
|
||||
"summary": "Revelação sobre como economizar 50% nas compras"
|
||||
},
|
||||
{
|
||||
"start": 180.0,
|
||||
"end": 255.0,
|
||||
"summary": "História engraçada sobre cliente que tentou enganar a loja"
|
||||
},
|
||||
{
|
||||
"start": 400.0,
|
||||
"end": 480.0,
|
||||
"summary": "Dica prática de negociação com fornecedores"
|
||||
},
|
||||
{
|
||||
"start": 600.0,
|
||||
"end": 690.0,
|
||||
"summary": "Conclusão motivacional sobre empreendedorismo"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Agora analise a transcrição fornecida e extraia os clips virais seguindo estas instruções.
|
||||
@@ -1,4 +1,9 @@
|
||||
moviepy==2.2.0
|
||||
pillow==9.5.0
|
||||
numpy>=1.26.0
|
||||
requests
|
||||
pika
|
||||
pika
|
||||
faster-whisper==1.2.0
|
||||
mediapipe==0.10.18
|
||||
opencv-python==4.10.0.84
|
||||
scipy>=1.11.0
|
||||
|
||||
4
video_render/__init__.py
Normal file
4
video_render/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
"""
|
||||
Core package for the revamped video rendering pipeline.
|
||||
"""
|
||||
|
||||
BIN
video_render/__pycache__/__init__.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/__init__.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/config.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/config.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/ffmpeg.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/ffmpeg.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/llm.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/llm.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/logging_utils.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/logging_utils.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/media.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/media.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/messaging.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/messaging.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/pipeline.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/pipeline.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/rendering.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/rendering.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/transcription.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/transcription.cpython-39.pyc
Normal file
Binary file not shown.
BIN
video_render/__pycache__/utils.cpython-39.pyc
Normal file
BIN
video_render/__pycache__/utils.cpython-39.pyc
Normal file
Binary file not shown.
99
video_render/config.py
Normal file
99
video_render/config.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
VIDEOS_ROOT = BASE_DIR / "videos"
|
||||
OUTPUTS_ROOT = BASE_DIR / "outputs"
|
||||
TEMP_ROOT = BASE_DIR / "temp"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RabbitMQSettings:
|
||||
# host: str = os.environ.get("RABBITMQ_HOST", "154.12.229.181")
|
||||
# port: int = int(os.environ.get("RABBITMQ_PORT", 32790))
|
||||
host: str = os.environ.get("RABBITMQ_HOST", "rabbitmq")
|
||||
port: int = int(os.environ.get("RABBITMQ_PORT", 5672))
|
||||
user: str = os.environ.get("RABBITMQ_USER", "admin")
|
||||
password: str = os.environ.get("RABBITMQ_PASS")
|
||||
consume_queue: str = os.environ.get("RABBITMQ_QUEUE", "to-render")
|
||||
publish_queue: str = os.environ.get("RABBITMQ_UPLOAD_QUEUE", "to-upload")
|
||||
prefetch_count: int = int(os.environ.get("RABBITMQ_PREFETCH", 1))
|
||||
heartbeat: int = int(os.environ.get("RABBITMQ_HEARTBEAT", 600))
|
||||
blocked_timeout: int = int(os.environ.get("RABBITMQ_BLOCKED_TIMEOUT", 7200))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OpenRouterSettings:
|
||||
api_key: str = os.environ.get("OPENROUTER_API_KEY", "https://openrouter.ai/api/v1/chat/completions")
|
||||
model: str = os.environ.get(
|
||||
"OPENROUTER_MODEL", "openai/gpt-oss-20b:free"
|
||||
)
|
||||
temperature: float = float(os.environ.get("OPENROUTER_TEMPERATURE", 0.6))
|
||||
prompt_path: str = os.environ.get("OPENROUTER_PROMPT_PATH", "prompts/generate.txt")
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WhisperSettings:
|
||||
model_size: str = os.environ.get("FASTER_WHISPER_MODEL_SIZE", "medium")
|
||||
device: str | None = os.environ.get("FASTER_WHISPER_DEVICE")
|
||||
compute_type: str | None = os.environ.get("FASTER_WHISPER_COMPUTE_TYPE")
|
||||
download_root: Path = Path(
|
||||
os.environ.get("FASTER_WHISPER_DOWNLOAD_ROOT", str(BASE_DIR / ".whisper"))
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RenderingSettings:
|
||||
frame_width: int = int(os.environ.get("RENDER_WIDTH", 1080))
|
||||
frame_height: int = int(os.environ.get("RENDER_HEIGHT", 1920))
|
||||
fps: int = int(os.environ.get("RENDER_FPS", 30))
|
||||
video_codec: str = os.environ.get("RENDER_CODEC", "libx264")
|
||||
audio_codec: str = os.environ.get("RENDER_AUDIO_CODEC", "aac")
|
||||
bitrate: str = os.environ.get("RENDER_BITRATE", "5000k")
|
||||
preset: str = os.environ.get("RENDER_PRESET", "faster")
|
||||
highlight_color: str = os.environ.get("SUBTITLE_HIGHLIGHT_COLOR", "#00FF00")
|
||||
base_color: str = os.environ.get("SUBTITLE_BASE_COLOR", "#FFFFFF")
|
||||
font_path: Path = Path(os.environ.get("RENDER_FONT_PATH", "./Montserrat.ttf"))
|
||||
title_font_size: int = int(os.environ.get("RENDER_TITLE_FONT_SIZE", 110))
|
||||
subtitle_font_size: int = int(os.environ.get("RENDER_SUBTITLE_FONT_SIZE", 64))
|
||||
caption_min_words: int = int(os.environ.get("CAPTION_MIN_WORDS", 2))
|
||||
caption_max_words: int = int(os.environ.get("CAPTION_MAX_WORDS", 2))
|
||||
enable_smart_framing: bool = os.environ.get("ENABLE_SMART_FRAMING", "true").lower() in ("true", "1", "yes")
|
||||
smart_framing_min_confidence: float = float(os.environ.get("SMART_FRAMING_MIN_CONFIDENCE", 0.3))
|
||||
smart_framing_smoothing_window: int = int(os.environ.get("SMART_FRAMING_SMOOTHING_WINDOW", 30))
|
||||
smart_framing_frame_skip: int = int(os.environ.get("SMART_FRAMING_FRAME_SKIP", 1))
|
||||
smart_framing_max_velocity: int = int(os.environ.get("SMART_FRAMING_MAX_VELOCITY", 25))
|
||||
smart_framing_person_switch_cooldown: int = int(os.environ.get("SMART_FRAMING_PERSON_SWITCH_COOLDOWN", 30))
|
||||
smart_framing_response_time: float = float(os.environ.get("SMART_FRAMING_RESPONSE_TIME", 0.6))
|
||||
smart_framing_group_padding: float = float(os.environ.get("SMART_FRAMING_GROUP_PADDING", 0.15))
|
||||
smart_framing_max_zoom_out: float = float(os.environ.get("SMART_FRAMING_MAX_ZOOM_OUT", 2.0))
|
||||
smart_framing_dead_zone: int = int(os.environ.get("SMART_FRAMING_DEAD_ZONE", 60))
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Settings:
|
||||
rabbitmq: RabbitMQSettings = RabbitMQSettings()
|
||||
openrouter: OpenRouterSettings = OpenRouterSettings()
|
||||
whisper: WhisperSettings = WhisperSettings()
|
||||
rendering: RenderingSettings = RenderingSettings()
|
||||
|
||||
videos_dir: Path = VIDEOS_ROOT
|
||||
outputs_dir: Path = OUTPUTS_ROOT
|
||||
temp_dir: Path = TEMP_ROOT
|
||||
|
||||
|
||||
def load_settings() -> Settings:
|
||||
settings = Settings()
|
||||
|
||||
if not settings.rabbitmq.password:
|
||||
raise RuntimeError("RABBITMQ_PASS must be provided")
|
||||
|
||||
settings.videos_dir.mkdir(parents=True, exist_ok=True)
|
||||
settings.outputs_dir.mkdir(parents=True, exist_ok=True)
|
||||
settings.temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
return settings
|
||||
844
video_render/context_detection.py
Normal file
844
video_render/context_detection.py
Normal file
@@ -0,0 +1,844 @@
|
||||
"""
|
||||
Context detection module for video analysis.
|
||||
|
||||
This module provides functionality to detect faces, track people,
|
||||
and identify who is speaking in video content using MediaPipe and audio analysis.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
import cv2
|
||||
import mediapipe as mp
|
||||
import numpy as np
|
||||
from scipy import signal
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FaceDetection:
|
||||
"""Represents a detected face in a frame."""
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
confidence: float
|
||||
center_x: int
|
||||
center_y: int
|
||||
landmarks: Optional[List[Tuple[int, int]]] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PersonTracking:
|
||||
"""Tracks a person across frames."""
|
||||
person_id: int
|
||||
face: FaceDetection
|
||||
is_speaking: bool
|
||||
speaking_confidence: float
|
||||
frame_number: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class GroupBoundingBox:
|
||||
"""Bounding box containing all tracked faces."""
|
||||
x: int
|
||||
y: int
|
||||
width: int
|
||||
height: int
|
||||
center_x: int
|
||||
center_y: int
|
||||
face_count: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameContext:
|
||||
"""Context information for a video frame."""
|
||||
frame_number: int
|
||||
timestamp: float
|
||||
detected_faces: List[FaceDetection]
|
||||
active_speakers: List[int] # indices of speaking faces
|
||||
primary_focus: Optional[Tuple[int, int]] # (x, y) center point
|
||||
layout_mode: str # "single", "dual_split", "grid"
|
||||
selected_people: List[int] = field(default_factory=list) # indices of people selected for display
|
||||
group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces
|
||||
|
||||
|
||||
class MediaPipeDetector:
|
||||
"""Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback."""
|
||||
|
||||
def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3):
|
||||
self.min_detection_confidence = min_detection_confidence
|
||||
self.min_tracking_confidence = min_tracking_confidence
|
||||
self.mp_face_detection = mp.solutions.face_detection
|
||||
self.mp_face_mesh = mp.solutions.face_mesh
|
||||
|
||||
# MediaPipe detectors with lower confidence for better cartoon detection
|
||||
self.face_detection = self.mp_face_detection.FaceDetection(
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons)
|
||||
)
|
||||
|
||||
self.face_mesh = self.mp_face_mesh.FaceMesh(
|
||||
max_num_faces=5,
|
||||
min_detection_confidence=min_detection_confidence,
|
||||
min_tracking_confidence=min_tracking_confidence,
|
||||
static_image_mode=False
|
||||
)
|
||||
|
||||
# OpenCV Haar Cascade as fallback for cartoon/anime faces
|
||||
self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
||||
|
||||
# Alternative cascade for profile/side faces
|
||||
self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
|
||||
|
||||
logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)")
|
||||
|
||||
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
|
||||
"""
|
||||
Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade).
|
||||
|
||||
Args:
|
||||
frame: RGB image array
|
||||
|
||||
Returns:
|
||||
List of detected faces
|
||||
"""
|
||||
height, width = frame.shape[:2]
|
||||
|
||||
if len(frame.shape) == 2:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
|
||||
elif frame.shape[2] == 4:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
|
||||
else:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
# Try MediaPipe first
|
||||
results = self.face_detection.process(frame_rgb)
|
||||
|
||||
faces = []
|
||||
if results.detections:
|
||||
for detection in results.detections:
|
||||
bbox = detection.location_data.relative_bounding_box
|
||||
|
||||
x = int(bbox.xmin * width)
|
||||
y = int(bbox.ymin * height)
|
||||
w = int(bbox.width * width)
|
||||
h = int(bbox.height * height)
|
||||
|
||||
x = max(0, min(x, width - 1))
|
||||
y = max(0, min(y, height - 1))
|
||||
w = min(w, width - x)
|
||||
h = min(h, height - y)
|
||||
|
||||
center_x = x + w // 2
|
||||
center_y = y + h // 2
|
||||
|
||||
confidence = detection.score[0] if detection.score else 0.0
|
||||
|
||||
faces.append(FaceDetection(
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=confidence,
|
||||
center_x=center_x,
|
||||
center_y=center_y
|
||||
))
|
||||
|
||||
# Fallback to OpenCV Haar Cascade if MediaPipe found nothing
|
||||
if not faces:
|
||||
faces = self._detect_faces_haar_cascade(frame, width, height)
|
||||
|
||||
return faces
|
||||
|
||||
def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]:
|
||||
"""
|
||||
Detect faces using OpenCV Haar Cascade (works better with cartoons/anime).
|
||||
|
||||
Args:
|
||||
frame: Image frame (BGR format)
|
||||
width: Frame width
|
||||
height: Frame height
|
||||
|
||||
Returns:
|
||||
List of detected faces
|
||||
"""
|
||||
# Convert to grayscale for Haar Cascade
|
||||
if len(frame.shape) == 3:
|
||||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||||
else:
|
||||
gray = frame
|
||||
|
||||
# Detect frontal faces with more sensitive parameters
|
||||
frontal_faces = self.haar_cascade.detectMultiScale(
|
||||
gray,
|
||||
scaleFactor=1.05, # More sensitive to size variations
|
||||
minNeighbors=3, # Lower threshold for detection (more permissive)
|
||||
minSize=(30, 30), # Smaller minimum size
|
||||
flags=cv2.CASCADE_SCALE_IMAGE
|
||||
)
|
||||
|
||||
# Also try profile faces
|
||||
profile_faces = self.haar_cascade_profile.detectMultiScale(
|
||||
gray,
|
||||
scaleFactor=1.1,
|
||||
minNeighbors=3,
|
||||
minSize=(30, 30),
|
||||
flags=cv2.CASCADE_SCALE_IMAGE
|
||||
)
|
||||
|
||||
# Combine frontal and profile detections
|
||||
all_faces = []
|
||||
|
||||
for (x, y, w, h) in frontal_faces:
|
||||
x = max(0, min(x, width - 1))
|
||||
y = max(0, min(y, height - 1))
|
||||
w = min(w, width - x)
|
||||
h = min(h, height - y)
|
||||
|
||||
center_x = x + w // 2
|
||||
center_y = y + h // 2
|
||||
|
||||
all_faces.append(FaceDetection(
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value
|
||||
center_x=center_x,
|
||||
center_y=center_y
|
||||
))
|
||||
|
||||
for (x, y, w, h) in profile_faces:
|
||||
# Check if this face overlaps significantly with any frontal face
|
||||
overlap = False
|
||||
for existing_face in all_faces:
|
||||
# Calculate IoU (Intersection over Union)
|
||||
x1_overlap = max(x, existing_face.x)
|
||||
y1_overlap = max(y, existing_face.y)
|
||||
x2_overlap = min(x + w, existing_face.x + existing_face.width)
|
||||
y2_overlap = min(y + h, existing_face.y + existing_face.height)
|
||||
|
||||
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
|
||||
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
|
||||
face_area = w * h
|
||||
if overlap_area / face_area > 0.3: # 30% overlap threshold
|
||||
overlap = True
|
||||
break
|
||||
|
||||
if not overlap:
|
||||
x = max(0, min(x, width - 1))
|
||||
y = max(0, min(y, height - 1))
|
||||
w = min(w, width - x)
|
||||
h = min(h, height - y)
|
||||
|
||||
center_x = x + w // 2
|
||||
center_y = y + h // 2
|
||||
|
||||
all_faces.append(FaceDetection(
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=0.6, # Slightly lower confidence for profile
|
||||
center_x=center_x,
|
||||
center_y=center_y
|
||||
))
|
||||
|
||||
if all_faces:
|
||||
logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)")
|
||||
|
||||
return all_faces
|
||||
|
||||
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
|
||||
"""
|
||||
Detect faces with landmarks for lip sync detection.
|
||||
|
||||
Args:
|
||||
frame: RGB image array
|
||||
|
||||
Returns:
|
||||
List of detected faces with landmark information
|
||||
"""
|
||||
height, width = frame.shape[:2]
|
||||
|
||||
if len(frame.shape) == 2:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
|
||||
elif frame.shape[2] == 4:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
|
||||
else:
|
||||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||||
|
||||
results = self.face_mesh.process(frame_rgb)
|
||||
|
||||
faces = []
|
||||
if results.multi_face_landmarks:
|
||||
for face_landmarks in results.multi_face_landmarks:
|
||||
xs = [lm.x for lm in face_landmarks.landmark]
|
||||
ys = [lm.y for lm in face_landmarks.landmark]
|
||||
|
||||
x_min, x_max = min(xs), max(xs)
|
||||
y_min, y_max = min(ys), max(ys)
|
||||
|
||||
x = int(x_min * width)
|
||||
y = int(y_min * height)
|
||||
w = int((x_max - x_min) * width)
|
||||
h = int((y_max - y_min) * height)
|
||||
|
||||
center_x = x + w // 2
|
||||
center_y = y + h // 2
|
||||
|
||||
lip_landmarks = []
|
||||
for idx in [13, 14, 78, 308]:
|
||||
lm = face_landmarks.landmark[idx]
|
||||
lip_landmarks.append((int(lm.x * width), int(lm.y * height)))
|
||||
|
||||
faces.append(FaceDetection(
|
||||
x=x,
|
||||
y=y,
|
||||
width=w,
|
||||
height=h,
|
||||
confidence=1.0,
|
||||
center_x=center_x,
|
||||
center_y=center_y,
|
||||
landmarks=lip_landmarks
|
||||
))
|
||||
|
||||
return faces
|
||||
|
||||
def close(self):
|
||||
"""Release MediaPipe resources."""
|
||||
self.face_detection.close()
|
||||
self.face_mesh.close()
|
||||
|
||||
|
||||
class AudioActivityDetector:
|
||||
"""Detects speech activity in audio."""
|
||||
|
||||
def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30):
|
||||
self.sample_rate = sample_rate
|
||||
self.frame_duration_ms = frame_duration_ms
|
||||
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
|
||||
|
||||
logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)")
|
||||
|
||||
def detect_speaking_periods(
|
||||
self,
|
||||
audio_samples: np.ndarray,
|
||||
threshold: float = 0.01, # Reduced from 0.02 for better speech detection
|
||||
min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances
|
||||
) -> List[Tuple[float, float]]:
|
||||
"""
|
||||
Detect periods of speech in audio.
|
||||
|
||||
Args:
|
||||
audio_samples: Audio samples array
|
||||
threshold: Energy threshold for speech detection
|
||||
min_speech_duration: Minimum duration of speech in seconds
|
||||
|
||||
Returns:
|
||||
List of (start_time, end_time) tuples in seconds
|
||||
"""
|
||||
if audio_samples.ndim > 1:
|
||||
audio_samples = audio_samples.mean(axis=1)
|
||||
|
||||
energies = []
|
||||
for i in range(0, len(audio_samples), self.frame_size):
|
||||
frame = audio_samples[i:i + self.frame_size]
|
||||
if len(frame) > 0:
|
||||
energy = np.sqrt(np.mean(frame ** 2))
|
||||
energies.append(energy)
|
||||
|
||||
speaking_frames = [e > threshold for e in energies]
|
||||
|
||||
periods = []
|
||||
start_frame = None
|
||||
|
||||
for i, is_speaking in enumerate(speaking_frames):
|
||||
if is_speaking and start_frame is None:
|
||||
start_frame = i
|
||||
elif not is_speaking and start_frame is not None:
|
||||
start_time = start_frame * self.frame_duration_ms / 1000
|
||||
end_time = i * self.frame_duration_ms / 1000
|
||||
|
||||
if end_time - start_time >= min_speech_duration:
|
||||
periods.append((start_time, end_time))
|
||||
|
||||
start_frame = None
|
||||
|
||||
if start_frame is not None:
|
||||
start_time = start_frame * self.frame_duration_ms / 1000
|
||||
end_time = len(speaking_frames) * self.frame_duration_ms / 1000
|
||||
if end_time - start_time >= min_speech_duration:
|
||||
periods.append((start_time, end_time))
|
||||
|
||||
# Log detected speech periods for debugging
|
||||
if periods:
|
||||
total_speech_time = sum(end - start for start, end in periods)
|
||||
logger.info(f"Audio speech detection: {len(periods)} periods found, "
|
||||
f"total {total_speech_time:.1f}s of speech (threshold={threshold})")
|
||||
else:
|
||||
max_energy = max(energies) if energies else 0
|
||||
logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} "
|
||||
f"(try lowering threshold if speech should be present)")
|
||||
|
||||
return periods
|
||||
|
||||
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
|
||||
"""Check if there is speech activity at a given time."""
|
||||
for start, end in speaking_periods:
|
||||
if start <= time <= end:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class ContextAnalyzer:
|
||||
"""Analyzes video context to determine focus and layout."""
|
||||
|
||||
def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3):
|
||||
self.detector = MediaPipeDetector()
|
||||
self.audio_detector = AudioActivityDetector()
|
||||
self.previous_faces: List[FaceDetection] = []
|
||||
self.min_face_confidence = min_face_confidence
|
||||
|
||||
# Person tracking state
|
||||
self.current_selected_people: List[int] = [] # Indices of people currently on screen
|
||||
self.last_switch_frame: int = -999 # Frame when we last switched people
|
||||
self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching
|
||||
|
||||
# Stability tracking to prevent flip-flopping
|
||||
self.desired_people_history: List[List[int]] = [] # Track recent desired selections
|
||||
self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability)
|
||||
self.last_switched_people: List[int] = [] # People we just switched FROM
|
||||
|
||||
self.focus_history: List[Tuple[int, int]] = []
|
||||
self.focus_history_size: int = 20
|
||||
self.focus_dead_zone: int = 60
|
||||
|
||||
# Debug logging
|
||||
self.frame_log_interval = 30 # Log every N frames
|
||||
|
||||
logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})")
|
||||
|
||||
def analyze_frame(
|
||||
self,
|
||||
frame: np.ndarray,
|
||||
timestamp: float,
|
||||
frame_number: int,
|
||||
speaking_periods: Optional[List[Tuple[float, float]]] = None
|
||||
) -> FrameContext:
|
||||
"""
|
||||
Analyze a single frame to extract context information.
|
||||
|
||||
Args:
|
||||
frame: Video frame (BGR format from OpenCV)
|
||||
timestamp: Frame timestamp in seconds
|
||||
frame_number: Frame index
|
||||
speaking_periods: List of (start, end) times where speech is detected
|
||||
|
||||
Returns:
|
||||
FrameContext with detection results
|
||||
"""
|
||||
faces = self.detector.detect_face_landmarks(frame)
|
||||
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
|
||||
|
||||
if not faces:
|
||||
faces = self.detector.detect_faces(frame)
|
||||
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
|
||||
|
||||
# Determine who is speaking
|
||||
active_speakers = []
|
||||
has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp)
|
||||
|
||||
for i, face in enumerate(faces):
|
||||
is_speaking = False
|
||||
|
||||
# Prefer visual cues when multiple faces are present.
|
||||
if face.landmarks and len(self.previous_faces) > i:
|
||||
is_speaking = self._detect_lip_movement(face, self.previous_faces[i])
|
||||
|
||||
# Audio can confirm speech when there's only one face.
|
||||
if has_audio_speech and len(faces) == 1:
|
||||
is_speaking = True
|
||||
|
||||
if is_speaking:
|
||||
active_speakers.append(i)
|
||||
|
||||
# Debug: Log speech detection
|
||||
if frame_number % 30 == 0: # Every second at 30fps
|
||||
logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, "
|
||||
f"speakers={active_speakers}, total_faces={len(faces)}")
|
||||
|
||||
if active_speakers:
|
||||
selected_people = active_speakers[:4]
|
||||
if len(selected_people) == 1:
|
||||
layout_mode = "single"
|
||||
elif len(selected_people) == 2:
|
||||
layout_mode = "dual_split"
|
||||
else:
|
||||
layout_mode = "grid"
|
||||
else:
|
||||
# Select THE person to focus on (always single person)
|
||||
# Priority: 1) Who is speaking, 2) Who is most centered
|
||||
selected_people = self._select_person_to_focus(
|
||||
faces,
|
||||
active_speakers,
|
||||
frame_number,
|
||||
frame.shape[1], # frame width for center calculation
|
||||
frame.shape[0] # frame height for center calculation
|
||||
)
|
||||
layout_mode = "single"
|
||||
|
||||
# Calculate group bounding box for ALL detected faces (multi-person support)
|
||||
group_bounds = self._calculate_group_bounding_box(faces)
|
||||
|
||||
# For multi-person mode, use group center as primary focus
|
||||
if group_bounds and group_bounds.face_count > 1:
|
||||
primary_focus = (group_bounds.center_x, group_bounds.center_y)
|
||||
else:
|
||||
primary_focus = self._calculate_focus_point(faces, selected_people)
|
||||
|
||||
# Debug logging every N frames
|
||||
if frame_number % self.frame_log_interval == 0:
|
||||
focus_reason = "speaker" if active_speakers else "no_speech_detected"
|
||||
group_info = f", group={group_bounds.face_count} faces" if group_bounds else ""
|
||||
logger.info(f"Frame {frame_number}: {len(faces)} faces, "
|
||||
f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}")
|
||||
|
||||
self.previous_faces = faces
|
||||
|
||||
return FrameContext(
|
||||
frame_number=frame_number,
|
||||
timestamp=timestamp,
|
||||
detected_faces=faces,
|
||||
active_speakers=active_speakers,
|
||||
primary_focus=primary_focus,
|
||||
layout_mode=layout_mode,
|
||||
selected_people=selected_people,
|
||||
group_bounds=group_bounds
|
||||
)
|
||||
|
||||
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
|
||||
"""
|
||||
Detect lip movement by comparing landmarks between frames.
|
||||
|
||||
Args:
|
||||
current_face: Current frame face detection
|
||||
previous_face: Previous frame face detection
|
||||
|
||||
Returns:
|
||||
True if significant lip movement detected
|
||||
"""
|
||||
if not current_face.landmarks or not previous_face.landmarks:
|
||||
return False
|
||||
|
||||
def lip_distance(landmarks):
|
||||
if len(landmarks) < 4:
|
||||
return 0
|
||||
|
||||
upper = np.array(landmarks[0:2])
|
||||
lower = np.array(landmarks[2:4])
|
||||
return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0))
|
||||
|
||||
current_dist = lip_distance(current_face.landmarks)
|
||||
previous_dist = lip_distance(previous_face.landmarks)
|
||||
|
||||
threshold = 2.0
|
||||
return abs(current_dist - previous_dist) > threshold
|
||||
|
||||
def _select_person_to_focus(
|
||||
self,
|
||||
faces: List[FaceDetection],
|
||||
active_speakers: List[int],
|
||||
frame_number: int,
|
||||
frame_width: int,
|
||||
frame_height: int
|
||||
) -> List[int]:
|
||||
"""
|
||||
Select THE single person to focus on.
|
||||
Priority: 1) Who is speaking, 2) Who is most centered in frame
|
||||
|
||||
Args:
|
||||
faces: List of detected faces
|
||||
active_speakers: Indices of people currently speaking
|
||||
frame_number: Current frame number
|
||||
frame_width: Frame width for center calculation
|
||||
frame_height: Frame height for center calculation
|
||||
|
||||
Returns:
|
||||
List with single person index [idx], or empty list if no faces
|
||||
"""
|
||||
if not faces:
|
||||
self.current_selected_people = []
|
||||
return []
|
||||
|
||||
if len(faces) == 1:
|
||||
self.current_selected_people = [0]
|
||||
return [0]
|
||||
|
||||
frames_since_last_switch = frame_number - self.last_switch_frame
|
||||
can_switch = frames_since_last_switch >= self.person_switch_cooldown
|
||||
|
||||
desired_person_idx = None
|
||||
|
||||
if active_speakers:
|
||||
if self.current_selected_people and self.current_selected_people[0] in active_speakers:
|
||||
desired_person_idx = self.current_selected_people[0]
|
||||
else:
|
||||
if can_switch or not self.current_selected_people:
|
||||
desired_person_idx = active_speakers[0]
|
||||
if self.current_selected_people and desired_person_idx != self.current_selected_people[0]:
|
||||
logger.info(f"Switching focus to speaker: {desired_person_idx}")
|
||||
self.last_switch_frame = frame_number
|
||||
else:
|
||||
desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0]
|
||||
else:
|
||||
if self.current_selected_people and len(self.current_selected_people) > 0:
|
||||
current_idx = self.current_selected_people[0]
|
||||
if current_idx < len(faces):
|
||||
desired_person_idx = current_idx
|
||||
else:
|
||||
if self.previous_faces and current_idx < len(self.previous_faces):
|
||||
prev_face = self.previous_faces[current_idx]
|
||||
best_match_idx = None
|
||||
best_match_score = float('inf')
|
||||
for idx, face in enumerate(faces):
|
||||
dx = face.center_x - prev_face.center_x
|
||||
dy = face.center_y - prev_face.center_y
|
||||
dist = np.sqrt(dx**2 + dy**2)
|
||||
size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height)
|
||||
score = dist + size_diff * 0.5
|
||||
if score < best_match_score:
|
||||
best_match_score = score
|
||||
best_match_idx = idx
|
||||
|
||||
if best_match_idx is not None and best_match_score < 1000:
|
||||
desired_person_idx = best_match_idx
|
||||
else:
|
||||
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
||||
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
||||
desired_person_idx = face_confidences[0][0]
|
||||
else:
|
||||
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
||||
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
||||
desired_person_idx = face_confidences[0][0]
|
||||
else:
|
||||
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
||||
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
||||
desired_person_idx = face_confidences[0][0]
|
||||
|
||||
desired_people = [desired_person_idx] if desired_person_idx is not None else []
|
||||
|
||||
if not self.current_selected_people:
|
||||
self.current_selected_people = desired_people
|
||||
self.last_switch_frame = frame_number
|
||||
logger.info(f"Frame {frame_number}: Locked on person {desired_people}")
|
||||
else:
|
||||
self.current_selected_people = desired_people
|
||||
|
||||
return self.current_selected_people.copy()
|
||||
|
||||
def _ensure_distinct_people(
|
||||
self,
|
||||
faces: List[FaceDetection],
|
||||
people_indices: List[int]
|
||||
) -> List[int]:
|
||||
"""
|
||||
Ensure selected people are distinct by checking minimum distance between them.
|
||||
Prevents showing the same person twice due to duplicate detection.
|
||||
|
||||
Args:
|
||||
faces: List of detected faces
|
||||
people_indices: Indices of people to validate
|
||||
|
||||
Returns:
|
||||
List of distinct people indices (max 2)
|
||||
"""
|
||||
if len(people_indices) <= 1:
|
||||
return people_indices
|
||||
|
||||
distinct_people = []
|
||||
|
||||
for idx in people_indices:
|
||||
if idx >= len(faces):
|
||||
continue
|
||||
|
||||
current_face = faces[idx]
|
||||
is_distinct = True
|
||||
|
||||
# Check if this person is too close to any already selected person
|
||||
for selected_idx in distinct_people:
|
||||
selected_face = faces[selected_idx]
|
||||
|
||||
# Calculate distance between face centers
|
||||
dx = current_face.center_x - selected_face.center_x
|
||||
dy = current_face.center_y - selected_face.center_y
|
||||
distance = np.sqrt(dx**2 + dy**2)
|
||||
|
||||
# Also check overlap via IoU (Intersection over Union)
|
||||
x1_overlap = max(current_face.x, selected_face.x)
|
||||
y1_overlap = max(current_face.y, selected_face.y)
|
||||
x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width)
|
||||
y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height)
|
||||
|
||||
overlap_area = 0
|
||||
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
|
||||
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
|
||||
|
||||
# Calculate areas
|
||||
area1 = current_face.width * current_face.height
|
||||
area2 = selected_face.width * selected_face.height
|
||||
min_area = min(area1, area2)
|
||||
|
||||
# If faces are very close OR significantly overlapping, they're likely the same person
|
||||
# Minimum distance: 1/4 of average face width
|
||||
min_distance = (current_face.width + selected_face.width) / 8
|
||||
overlap_threshold = 0.3 # 30% overlap
|
||||
|
||||
if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold):
|
||||
is_distinct = False
|
||||
logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})")
|
||||
break
|
||||
|
||||
if is_distinct:
|
||||
distinct_people.append(idx)
|
||||
|
||||
# Stop at 2 distinct people
|
||||
if len(distinct_people) >= 2:
|
||||
break
|
||||
|
||||
# If we couldn't find 2 distinct people, return at most 1
|
||||
if len(distinct_people) < 2 and len(people_indices) >= 2:
|
||||
logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections")
|
||||
|
||||
return distinct_people
|
||||
|
||||
def _calculate_focus_point(
|
||||
self,
|
||||
faces: List[FaceDetection],
|
||||
selected_people: List[int]
|
||||
) -> Optional[Tuple[int, int]]:
|
||||
"""
|
||||
Calculate the primary focus point based on selected people with temporal smoothing.
|
||||
|
||||
Args:
|
||||
faces: List of detected faces
|
||||
selected_people: Indices of people selected for display
|
||||
|
||||
Returns:
|
||||
(x, y) tuple of focus center, or None if no faces
|
||||
"""
|
||||
if not faces or not selected_people:
|
||||
return None
|
||||
|
||||
# Calculate raw focus point
|
||||
raw_focus_x = 0
|
||||
raw_focus_y = 0
|
||||
|
||||
if len(selected_people) == 1:
|
||||
# Single person - focus on them
|
||||
if selected_people[0] < len(faces):
|
||||
primary = faces[selected_people[0]]
|
||||
raw_focus_x = primary.center_x
|
||||
raw_focus_y = primary.center_y
|
||||
else:
|
||||
# Fallback
|
||||
most_confident = max(faces, key=lambda f: f.confidence)
|
||||
raw_focus_x = most_confident.center_x
|
||||
raw_focus_y = most_confident.center_y
|
||||
else:
|
||||
# Multiple people - focus on the CENTER between them for stability
|
||||
# This prevents jarring movements when switching focus between people
|
||||
valid_people = [idx for idx in selected_people if idx < len(faces)]
|
||||
if valid_people:
|
||||
centers_x = [faces[idx].center_x for idx in valid_people]
|
||||
centers_y = [faces[idx].center_y for idx in valid_people]
|
||||
raw_focus_x = int(np.mean(centers_x))
|
||||
raw_focus_y = int(np.mean(centers_y))
|
||||
else:
|
||||
# Fallback
|
||||
most_confident = max(faces, key=lambda f: f.confidence)
|
||||
raw_focus_x = most_confident.center_x
|
||||
raw_focus_y = most_confident.center_y
|
||||
|
||||
if self.focus_history:
|
||||
last_x, last_y = self.focus_history[-1]
|
||||
dx = abs(raw_focus_x - last_x)
|
||||
dy = abs(raw_focus_y - last_y)
|
||||
if dx < self.focus_dead_zone and dy < self.focus_dead_zone:
|
||||
return self.focus_history[-1]
|
||||
|
||||
self.focus_history.append((raw_focus_x, raw_focus_y))
|
||||
if len(self.focus_history) > self.focus_history_size:
|
||||
self.focus_history.pop(0)
|
||||
|
||||
if len(self.focus_history) >= 5:
|
||||
xs = [x for x, y in self.focus_history]
|
||||
ys = [y for x, y in self.focus_history]
|
||||
median_x = int(np.median(xs))
|
||||
median_y = int(np.median(ys))
|
||||
return (median_x, median_y)
|
||||
else:
|
||||
return (raw_focus_x, raw_focus_y)
|
||||
|
||||
def _calculate_group_bounding_box(
|
||||
self,
|
||||
faces: List[FaceDetection],
|
||||
padding_percent: float = 0.15,
|
||||
max_faces: int = 6
|
||||
) -> Optional[GroupBoundingBox]:
|
||||
"""
|
||||
Calculate bounding box containing all detected faces with padding.
|
||||
|
||||
Args:
|
||||
faces: List of detected faces
|
||||
padding_percent: Padding around group as percentage of bbox dimensions
|
||||
max_faces: Maximum faces to include (use most confident if exceeded)
|
||||
|
||||
Returns:
|
||||
GroupBoundingBox or None if no faces
|
||||
"""
|
||||
if not faces:
|
||||
return None
|
||||
|
||||
# If too many faces, use most confident ones
|
||||
if len(faces) > max_faces:
|
||||
faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces]
|
||||
|
||||
# Calculate bounding box containing all faces
|
||||
min_x = min(f.x for f in faces)
|
||||
max_x = max(f.x + f.width for f in faces)
|
||||
min_y = min(f.y for f in faces)
|
||||
max_y = max(f.y + f.height for f in faces)
|
||||
|
||||
# Add padding
|
||||
width = max_x - min_x
|
||||
height = max_y - min_y
|
||||
pad_x = int(width * padding_percent)
|
||||
pad_y = int(height * padding_percent)
|
||||
|
||||
final_x = max(0, min_x - pad_x)
|
||||
final_y = max(0, min_y - pad_y)
|
||||
final_width = width + 2 * pad_x
|
||||
final_height = height + 2 * pad_y
|
||||
|
||||
return GroupBoundingBox(
|
||||
x=final_x,
|
||||
y=final_y,
|
||||
width=final_width,
|
||||
height=final_height,
|
||||
center_x=final_x + final_width // 2,
|
||||
center_y=final_y + final_height // 2,
|
||||
face_count=len(faces)
|
||||
)
|
||||
|
||||
def close(self):
|
||||
"""Release resources."""
|
||||
self.detector.close()
|
||||
# Clear tracking state to free memory
|
||||
self.previous_faces.clear()
|
||||
self.current_selected_people.clear()
|
||||
self.focus_history.clear()
|
||||
54
video_render/ffmpeg.py
Normal file
54
video_render/ffmpeg.py
Normal file
@@ -0,0 +1,54 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import shlex
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Sequence
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _run_ffmpeg(args: Sequence[str]) -> None:
|
||||
cmd = ["ffmpeg", "-hide_banner", "-loglevel", "error", *args]
|
||||
logger.debug("Executando ffmpeg: %s", " ".join(shlex.quote(part) for part in cmd))
|
||||
completed = subprocess.run(cmd, check=False)
|
||||
if completed.returncode != 0:
|
||||
raise RuntimeError(f"ffmpeg falhou com exit code {completed.returncode}")
|
||||
|
||||
|
||||
def extract_audio_to_wav(input_video: Path, output_wav: Path) -> Path:
|
||||
_run_ffmpeg(
|
||||
[
|
||||
"-y",
|
||||
"-i",
|
||||
str(input_video),
|
||||
"-ac",
|
||||
"1",
|
||||
"-ar",
|
||||
"16000",
|
||||
"-vn",
|
||||
str(output_wav),
|
||||
]
|
||||
)
|
||||
return output_wav
|
||||
|
||||
|
||||
def create_video_segment(input_video: Path, start: float, end: float, output_path: Path) -> Path:
|
||||
duration = max(0.01, end - start)
|
||||
_run_ffmpeg(
|
||||
[
|
||||
"-y",
|
||||
"-i",
|
||||
str(input_video),
|
||||
"-ss",
|
||||
f"{start:.3f}",
|
||||
"-t",
|
||||
f"{duration:.3f}",
|
||||
"-c",
|
||||
"copy",
|
||||
str(output_path),
|
||||
]
|
||||
)
|
||||
return output_path
|
||||
|
||||
258
video_render/llm.py
Normal file
258
video_render/llm.py
Normal file
@@ -0,0 +1,258 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
import requests
|
||||
|
||||
from video_render.config import BASE_DIR, Settings
|
||||
from video_render.transcription import TranscriptionResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
OPENROUTER_ENDPOINT = os.environ.get("OPENROUTER_API_URL", "https://openrouter.ai/api/v1/chat/completions")
|
||||
|
||||
|
||||
class OpenRouterCopywriter:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
if not settings.openrouter.api_key:
|
||||
raise RuntimeError("OPENROUTER_API_KEY nao foi definido")
|
||||
self.settings = settings
|
||||
prompt_path = Path(settings.openrouter.prompt_path)
|
||||
|
||||
if not prompt_path.is_absolute():
|
||||
prompt_path = BASE_DIR / prompt_path
|
||||
if not prompt_path.exists():
|
||||
raise FileNotFoundError(f"Prompt nao encontrado: {prompt_path}")
|
||||
self.highlights_prompt_template = prompt_path.read_text(encoding="utf-8")
|
||||
|
||||
def generate_highlights(self, transcription: TranscriptionResult) -> List[Dict]:
|
||||
"""Generate video highlights using OpenRouter GPT-OSS with retry logic."""
|
||||
payload = {
|
||||
"transcript": transcription.full_text,
|
||||
"segments": [
|
||||
{
|
||||
"start": segment.start,
|
||||
"end": segment.end,
|
||||
"text": segment.text,
|
||||
}
|
||||
for segment in transcription.segments
|
||||
],
|
||||
}
|
||||
|
||||
body = {
|
||||
"model": self.settings.openrouter.model,
|
||||
"temperature": self.settings.openrouter.temperature,
|
||||
"messages": [
|
||||
{"role": "system", "content": self.highlights_prompt_template},
|
||||
{
|
||||
"role": "user",
|
||||
"content": json.dumps(payload, ensure_ascii=False),
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.settings.openrouter.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
"X-Title": "Video Render - Highlights Detection"
|
||||
}
|
||||
|
||||
logger.info(f"Calling OpenRouter with model: {self.settings.openrouter.model}")
|
||||
logger.debug(f"Request payload keys: transcript_length={len(payload['transcript'])}, segments_count={len(payload['segments'])}")
|
||||
|
||||
# Retry configuration for rate limits (especially free tier)
|
||||
max_retries = 5
|
||||
base_delay = 5 # Start with 5s delay
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = requests.post(
|
||||
url=OPENROUTER_ENDPOINT,
|
||||
data=json.dumps(body),
|
||||
headers=headers,
|
||||
timeout=120,
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
break
|
||||
|
||||
except requests.exceptions.HTTPError as exc:
|
||||
if exc.response.status_code == 429:
|
||||
if attempt < max_retries - 1:
|
||||
# Exponential backoff: 5s, 10s, 20s, 40s, 80s
|
||||
delay = base_delay * (2 ** attempt)
|
||||
logger.warning(f"Rate limit atingido (429). Aguardando {delay}s antes de tentar novamente (tentativa {attempt + 1}/{max_retries})")
|
||||
time.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
logger.error("Rate limit atingido apos todas as tentativas")
|
||||
logger.error("Solucao: Use um modelo pago ou adicione creditos na OpenRouter")
|
||||
raise RuntimeError("OpenRouter rate limit excedido") from exc
|
||||
else:
|
||||
logger.error(f"OpenRouter API request falhou com status {exc.response.status_code}: {exc}")
|
||||
raise RuntimeError("OpenRouter API request falhou") from exc
|
||||
|
||||
except Exception as exc:
|
||||
logger.error("OpenRouter API request falhou: %s", exc)
|
||||
raise RuntimeError("OpenRouter API request falhou") from exc
|
||||
|
||||
# Debug: log response structure
|
||||
logger.info(f"OpenRouter response keys: {list(data.keys())}")
|
||||
if "error" in data:
|
||||
logger.error(f"OpenRouter API error: {data.get('error')}")
|
||||
raise RuntimeError(f"OpenRouter API error: {data.get('error')}")
|
||||
|
||||
choices = data.get("choices") or []
|
||||
if not choices:
|
||||
logger.error(f"OpenRouter response completa: {json.dumps(data, indent=2)}")
|
||||
raise RuntimeError("OpenRouter nao retornou escolhas")
|
||||
|
||||
message = choices[0].get("message", {}).get("content")
|
||||
if not message:
|
||||
raise RuntimeError("Resposta do OpenRouter sem conteudo")
|
||||
|
||||
parsed = self._extract_json(message)
|
||||
highlights = parsed.get("highlights")
|
||||
if not isinstance(highlights, list):
|
||||
raise ValueError("Resposta do OpenRouter invalida: campo 'highlights' ausente")
|
||||
|
||||
valid_highlights = []
|
||||
for highlight in highlights:
|
||||
try:
|
||||
start = float(highlight.get("start", 0))
|
||||
end = float(highlight.get("end", 0))
|
||||
summary = str(highlight.get("summary", "")).strip()
|
||||
|
||||
if start < 0 or end < 0:
|
||||
logger.warning(f"Highlight ignorado: timestamps negativos (start={start}, end={end})")
|
||||
continue
|
||||
|
||||
if end <= start:
|
||||
logger.warning(f"Highlight ignorado: end <= start (start={start}, end={end})")
|
||||
continue
|
||||
|
||||
duration = end - start
|
||||
if duration < 60:
|
||||
logger.warning(f"Highlight ignorado: muito curto ({duration}s, minimo 45s)")
|
||||
continue
|
||||
|
||||
if duration > 120:
|
||||
logger.warning(f"Highlight ignorado: muito longo ({duration}s, maximo 90s)")
|
||||
continue
|
||||
|
||||
if not summary:
|
||||
logger.warning(f"Highlight ignorado: summary vazio")
|
||||
continue
|
||||
|
||||
valid_highlights.append({
|
||||
"start": start,
|
||||
"end": end,
|
||||
"summary": summary
|
||||
})
|
||||
|
||||
except (TypeError, ValueError) as e:
|
||||
logger.warning(f"Highlight invalido ignorado: {highlight} - {e}")
|
||||
continue
|
||||
|
||||
if not valid_highlights:
|
||||
logger.warning("Nenhum highlight valido retornado pelo OpenRouter")
|
||||
total_duration = 75.0
|
||||
if transcription.segments:
|
||||
total_duration = max(seg.end for seg in transcription.segments)
|
||||
|
||||
fallback_end = min(75.0, total_duration)
|
||||
if fallback_end < 60.0:
|
||||
fallback_end = min(60.0, total_duration)
|
||||
|
||||
return [{
|
||||
"start": 0.0,
|
||||
"end": fallback_end,
|
||||
"summary": "Trecho inicial do video (fallback automatico)"
|
||||
}]
|
||||
|
||||
logger.info(f"OpenRouter retornou {len(valid_highlights)} highlights validos")
|
||||
return valid_highlights
|
||||
|
||||
def generate_titles(self, highlights: List[Dict]) -> List[str]:
|
||||
if not highlights:
|
||||
return []
|
||||
|
||||
prompt = (
|
||||
"Voce e um copywriter especializado em titulos curtos e virais para reels.\n"
|
||||
"Recebera uma lista de trechos destacados de um video com resumo e tempo.\n"
|
||||
"Produza um titulo envolvente (ate 60 caracteres) para cada item.\n"
|
||||
"Responda apenas em JSON com a seguinte estrutura:\n"
|
||||
'{"titles": ["titulo 1", "titulo 2"]}\n'
|
||||
"Titulos devem ser em portugues, usar verbos fortes e refletir o resumo."
|
||||
)
|
||||
|
||||
user_payload = {
|
||||
"highlights": [
|
||||
{
|
||||
"start": item.get("start"),
|
||||
"end": item.get("end"),
|
||||
"summary": item.get("summary"),
|
||||
}
|
||||
for item in highlights
|
||||
]
|
||||
}
|
||||
|
||||
body = {
|
||||
"model": self.settings.openrouter.model,
|
||||
"temperature": self.settings.openrouter.temperature,
|
||||
"messages": [
|
||||
{"role": "system", "content": prompt},
|
||||
{
|
||||
"role": "user",
|
||||
"content": json.dumps(user_payload, ensure_ascii=False),
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.settings.openrouter.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
url=OPENROUTER_ENDPOINT,
|
||||
data=json.dumps(body),
|
||||
headers=headers,
|
||||
timeout=120,
|
||||
)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
choices = data.get("choices") or []
|
||||
|
||||
if not choices:
|
||||
raise RuntimeError("OpenRouter nao retornou escolhas")
|
||||
|
||||
message = choices[0].get("message", {}).get("content")
|
||||
|
||||
if not message:
|
||||
raise RuntimeError("Resposta do OpenRouter sem conteudo")
|
||||
|
||||
parsed = self._extract_json(message)
|
||||
titles = parsed.get("titles")
|
||||
|
||||
if not isinstance(titles, list):
|
||||
raise ValueError("Resposta do OpenRouter invalida: campo 'titles'")
|
||||
|
||||
return [str(title) for title in titles]
|
||||
|
||||
@staticmethod
|
||||
def _extract_json(response_text: str) -> Dict:
|
||||
try:
|
||||
return json.loads(response_text)
|
||||
except json.JSONDecodeError:
|
||||
start = response_text.find("{")
|
||||
end = response_text.rfind("}")
|
||||
if start == -1 or end == -1:
|
||||
raise
|
||||
subset = response_text[start : end + 1]
|
||||
return json.loads(subset)
|
||||
13
video_render/logging_utils.py
Normal file
13
video_render/logging_utils.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
||||
def setup_logging() -> None:
|
||||
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
|
||||
logging.basicConfig(
|
||||
level=log_level,
|
||||
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
|
||||
)
|
||||
|
||||
88
video_render/media.py
Normal file
88
video_render/media.py
Normal file
@@ -0,0 +1,88 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from video_render.config import Settings
|
||||
from video_render.ffmpeg import extract_audio_to_wav
|
||||
from video_render.utils import ensure_workspace, remove_paths, sanitize_filename
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VideoWorkspace:
|
||||
original_filename: str
|
||||
sanitized_name: str
|
||||
workspace_dir: Path
|
||||
output_dir: Path
|
||||
source_path: Path
|
||||
working_video_path: Path
|
||||
audio_path: Path
|
||||
|
||||
|
||||
class MediaPreparer:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
|
||||
def prepare(self, filename: str) -> VideoWorkspace:
|
||||
source_path = self.settings.videos_dir / filename
|
||||
if not source_path.exists():
|
||||
raise FileNotFoundError(f"Arquivo de vídeo não encontrado: {source_path}")
|
||||
|
||||
sanitized_name = sanitize_filename(Path(filename).stem)
|
||||
workspace_dir = ensure_workspace(self.settings.videos_dir, sanitized_name)
|
||||
|
||||
transcription_json = workspace_dir / "transcription.json"
|
||||
transcription_txt = workspace_dir / "transcription.txt"
|
||||
temp_transcription_json = None
|
||||
temp_transcription_txt = None
|
||||
|
||||
if transcription_json.exists():
|
||||
temp_transcription_json = workspace_dir.parent / f".{sanitized_name}_transcription.json.tmp"
|
||||
shutil.copy2(transcription_json, temp_transcription_json)
|
||||
if transcription_txt.exists():
|
||||
temp_transcription_txt = workspace_dir.parent / f".{sanitized_name}_transcription.txt.tmp"
|
||||
shutil.copy2(transcription_txt, temp_transcription_txt)
|
||||
|
||||
existing_children = list(workspace_dir.iterdir())
|
||||
if existing_children:
|
||||
logger.info("Limpando workspace existente para %s", sanitized_name)
|
||||
try:
|
||||
remove_paths(existing_children)
|
||||
except Exception as e:
|
||||
logger.warning(f"Não foi possível limpar workspace (não crítico): {e}")
|
||||
|
||||
if temp_transcription_json and temp_transcription_json.exists():
|
||||
shutil.move(str(temp_transcription_json), str(transcription_json))
|
||||
logger.info("Transcrição preservada em %s", transcription_json)
|
||||
if temp_transcription_txt and temp_transcription_txt.exists():
|
||||
shutil.move(str(temp_transcription_txt), str(transcription_txt))
|
||||
|
||||
destination_name = f"{sanitized_name}{source_path.suffix.lower()}"
|
||||
working_video_path = workspace_dir / destination_name
|
||||
shutil.copy2(source_path, working_video_path)
|
||||
logger.info("Cópia do vídeo criada em %s", working_video_path)
|
||||
|
||||
output_dir = ensure_workspace(self.settings.outputs_dir, sanitized_name)
|
||||
existing_outputs = list(output_dir.iterdir())
|
||||
if existing_outputs:
|
||||
try:
|
||||
remove_paths(existing_outputs)
|
||||
except Exception as e:
|
||||
logger.warning(f"Não foi possível limpar outputs antigos (não crítico): {e}")
|
||||
|
||||
audio_path = workspace_dir / "audio.wav"
|
||||
extract_audio_to_wav(working_video_path, audio_path)
|
||||
|
||||
return VideoWorkspace(
|
||||
original_filename=filename,
|
||||
sanitized_name=sanitized_name,
|
||||
workspace_dir=workspace_dir,
|
||||
output_dir=output_dir,
|
||||
source_path=source_path,
|
||||
working_video_path=working_video_path,
|
||||
audio_path=audio_path,
|
||||
)
|
||||
136
video_render/messaging.py
Normal file
136
video_render/messaging.py
Normal file
@@ -0,0 +1,136 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Callable, Dict
|
||||
|
||||
import pika
|
||||
|
||||
from video_render.config import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MessageHandler = Callable[[Dict[str, Any]], Dict[str, Any]]
|
||||
|
||||
|
||||
def _safe_ack(
|
||||
channel: pika.adapters.blocking_connection.BlockingChannel, delivery_tag
|
||||
) -> bool:
|
||||
if not channel.is_open:
|
||||
logger.warning(
|
||||
"Canal fechado antes do ACK; mensagem sera reprocessada apos reconexao"
|
||||
)
|
||||
return False
|
||||
try:
|
||||
channel.basic_ack(delivery_tag=delivery_tag)
|
||||
return True
|
||||
except Exception:
|
||||
logger.exception("Falha ao confirmar mensagem")
|
||||
return False
|
||||
|
||||
|
||||
class RabbitMQWorker:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self._params = pika.ConnectionParameters(
|
||||
host=settings.rabbitmq.host,
|
||||
port=settings.rabbitmq.port,
|
||||
credentials=pika.PlainCredentials(
|
||||
settings.rabbitmq.user, settings.rabbitmq.password
|
||||
),
|
||||
heartbeat=settings.rabbitmq.heartbeat,
|
||||
blocked_connection_timeout=settings.rabbitmq.blocked_timeout,
|
||||
)
|
||||
|
||||
def consume_forever(self, handler: MessageHandler) -> None:
|
||||
while True:
|
||||
try:
|
||||
with pika.BlockingConnection(self._params) as connection:
|
||||
channel = connection.channel()
|
||||
channel.queue_declare(
|
||||
queue=self.settings.rabbitmq.consume_queue, durable=True
|
||||
)
|
||||
channel.queue_declare(
|
||||
queue=self.settings.rabbitmq.publish_queue, durable=True
|
||||
)
|
||||
channel.basic_qos(
|
||||
prefetch_count=self.settings.rabbitmq.prefetch_count
|
||||
)
|
||||
|
||||
def _on_message(
|
||||
ch: pika.adapters.blocking_connection.BlockingChannel,
|
||||
method,
|
||||
properties,
|
||||
body,
|
||||
) -> None:
|
||||
"""Consume message, ACK immediately, then process."""
|
||||
try:
|
||||
message = json.loads(body)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("Mensagem invalida recebida: %s", body)
|
||||
_safe_ack(ch, method.delivery_tag)
|
||||
return
|
||||
|
||||
if not _safe_ack(ch, method.delivery_tag):
|
||||
logger.warning(
|
||||
"Nao foi possivel confirmar mensagem; abortando processamento"
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Mensagem recebida: %s",
|
||||
message.get("filename", "<sem_nome>"),
|
||||
)
|
||||
|
||||
try:
|
||||
response = handler(message)
|
||||
except Exception:
|
||||
logger.exception("Erro nao tratado durante o processamento")
|
||||
response = {
|
||||
"hasError": True,
|
||||
"error": "Erro nao tratado no pipeline",
|
||||
"filename": message.get("filename"),
|
||||
"videoId": message.get("videoId"),
|
||||
"url": message.get("url"),
|
||||
"processedFiles": [],
|
||||
}
|
||||
|
||||
self._publish_response(response)
|
||||
|
||||
channel.basic_consume(
|
||||
queue=self.settings.rabbitmq.consume_queue,
|
||||
on_message_callback=_on_message,
|
||||
auto_ack=False,
|
||||
)
|
||||
logger.info("Consumidor iniciado. Aguardando mensagens...")
|
||||
channel.start_consuming()
|
||||
except pika.exceptions.AMQPConnectionError:
|
||||
logger.exception(
|
||||
"Conexao com RabbitMQ perdida. Tentando reconectar..."
|
||||
)
|
||||
except pika.exceptions.AMQPError:
|
||||
logger.exception("Erro AMQP inesperado. Reiniciando consumo...")
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Encerrando consumidor por interrupcao do usuario.")
|
||||
break
|
||||
|
||||
def _publish_response(self, response: Dict[str, Any]) -> None:
|
||||
payload = json.dumps(response)
|
||||
try:
|
||||
with pika.BlockingConnection(self._params) as publish_connection:
|
||||
publish_channel = publish_connection.channel()
|
||||
publish_channel.queue_declare(
|
||||
queue=self.settings.rabbitmq.publish_queue, durable=True
|
||||
)
|
||||
publish_channel.basic_publish(
|
||||
exchange="",
|
||||
routing_key=self.settings.rabbitmq.publish_queue,
|
||||
body=payload,
|
||||
properties=pika.BasicProperties(delivery_mode=2),
|
||||
)
|
||||
logger.info(
|
||||
"Resposta publicada para '%s'",
|
||||
self.settings.rabbitmq.publish_queue,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Falha ao publicar a resposta na fila de upload apos ACK")
|
||||
260
video_render/pipeline.py
Normal file
260
video_render/pipeline.py
Normal file
@@ -0,0 +1,260 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from video_render.config import Settings
|
||||
from video_render.llm import OpenRouterCopywriter
|
||||
from video_render.media import MediaPreparer, VideoWorkspace
|
||||
from video_render.transcription import TranscriptionResult, TranscriptionService
|
||||
from video_render.utils import remove_paths, sanitize_filename
|
||||
from video_render.rendering import VideoRenderer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class JobMessage:
|
||||
filename: str
|
||||
url: Optional[str]
|
||||
video_id: Optional[str]
|
||||
extras: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HighlightWindow:
|
||||
start: float
|
||||
end: float
|
||||
summary: str
|
||||
title: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class RenderedClip:
|
||||
path: Path
|
||||
start: float
|
||||
end: float
|
||||
title: str
|
||||
summary: str
|
||||
index: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineContext:
|
||||
job: JobMessage
|
||||
workspace: Optional[VideoWorkspace] = None
|
||||
transcription: Optional[TranscriptionResult] = None
|
||||
highlight_windows: List[HighlightWindow] = field(default_factory=list)
|
||||
rendered_clips: List[RenderedClip] = field(default_factory=list)
|
||||
|
||||
|
||||
class VideoPipeline:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self.media_preparer = MediaPreparer(settings)
|
||||
self.transcriber = TranscriptionService(settings)
|
||||
self.llm_service = OpenRouterCopywriter(settings) # Using OpenRouter for both highlights and titles
|
||||
self.renderer = VideoRenderer(settings)
|
||||
|
||||
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
|
||||
context = PipelineContext(job=self._parse_job(message))
|
||||
try:
|
||||
self._prepare_workspace(context)
|
||||
self._generate_transcription(context)
|
||||
self._determine_highlights(context)
|
||||
self._render_clips(context)
|
||||
|
||||
return self._build_success_payload(context)
|
||||
except Exception as exc:
|
||||
logger.exception("Falha ao processar vídeo %s", context.job.filename)
|
||||
return self._handle_failure(context, exc)
|
||||
|
||||
def _parse_job(self, message: Dict[str, Any]) -> JobMessage:
|
||||
filename = message.get("filename")
|
||||
|
||||
if not filename:
|
||||
raise ValueError("Mensagem inválida: 'filename' é obrigatório")
|
||||
|
||||
url = message.get("url")
|
||||
video_id = message.get("videoId") or message.get("video_id")
|
||||
extras = {
|
||||
key: value
|
||||
for key, value in message.items()
|
||||
if key not in {"filename", "url", "videoId", "video_id"}
|
||||
}
|
||||
return JobMessage(filename=filename, url=url, video_id=video_id, extras=extras)
|
||||
|
||||
def _prepare_workspace(self, context: PipelineContext) -> None:
|
||||
context.workspace = self.media_preparer.prepare(context.job.filename)
|
||||
|
||||
def _generate_transcription(self, context: PipelineContext) -> None:
|
||||
if not context.workspace:
|
||||
raise RuntimeError("Workspace não preparado")
|
||||
existing = TranscriptionService.load(context.workspace.workspace_dir)
|
||||
if existing:
|
||||
logger.info(
|
||||
"Transcricao existente encontrada em %s; reutilizando resultado",
|
||||
context.workspace.workspace_dir,
|
||||
)
|
||||
context.transcription = existing
|
||||
return
|
||||
|
||||
transcription = self.transcriber.transcribe(
|
||||
context.workspace.audio_path,
|
||||
output_dir=context.workspace.workspace_dir
|
||||
)
|
||||
TranscriptionService.persist(transcription, context.workspace.workspace_dir)
|
||||
context.transcription = transcription
|
||||
|
||||
# Unload Whisper model immediately after transcription to free memory (1-3GB)
|
||||
self.transcriber.unload_model()
|
||||
|
||||
def _determine_highlights(self, context: PipelineContext) -> None:
|
||||
if not context.transcription:
|
||||
raise RuntimeError("Transcricao nao disponivel")
|
||||
|
||||
try:
|
||||
highlights_raw = self.llm_service.generate_highlights(context.transcription)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Falha ao gerar destaques com OpenRouter; aplicando fallback padrao."
|
||||
)
|
||||
context.highlight_windows = [self._build_fallback_highlight(context)]
|
||||
return
|
||||
|
||||
windows: List[HighlightWindow] = []
|
||||
|
||||
for item in highlights_raw:
|
||||
try:
|
||||
start = float(item.get("start", 0)) # type: ignore[arg-type]
|
||||
end = float(item.get("end", start)) # type: ignore[arg-type]
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("Highlight invalido ignorado: %s", item)
|
||||
continue
|
||||
|
||||
summary = str(item.get("summary", "")).strip()
|
||||
title = str(item.get("title", summary[:60])).strip()
|
||||
|
||||
if end <= start:
|
||||
logger.debug("Highlight com intervalo invalido ignorado: %s", item)
|
||||
continue
|
||||
|
||||
windows.append(HighlightWindow(start=start, end=end, summary=summary, title=title))
|
||||
|
||||
if not windows:
|
||||
windows.append(self._build_fallback_highlight(context))
|
||||
|
||||
context.highlight_windows = windows
|
||||
|
||||
def _generate_titles(self, context: PipelineContext) -> None:
|
||||
"""DEPRECATED: Titles are now generated together with highlights.
|
||||
|
||||
This method is kept for backwards compatibility but does nothing.
|
||||
Titles are extracted from highlights in _determine_highlights().
|
||||
"""
|
||||
pass
|
||||
|
||||
def _build_fallback_highlight(self, context: PipelineContext) -> HighlightWindow:
|
||||
if not context.transcription:
|
||||
raise RuntimeError("Transcricao nao disponivel para criar fallback")
|
||||
|
||||
last_end = (
|
||||
context.transcription.segments[-1].end
|
||||
if context.transcription.segments
|
||||
else 0.0
|
||||
)
|
||||
return HighlightWindow(
|
||||
start=0.0,
|
||||
end=max(last_end, 10.0),
|
||||
summary="Sem destaque identificado; fallback automatico.",
|
||||
title="Confira este momento",
|
||||
)
|
||||
|
||||
def _render_clips(self, context: PipelineContext) -> None:
|
||||
if not context.workspace or not context.highlight_windows or not context.transcription:
|
||||
return
|
||||
|
||||
titles = [
|
||||
window.title or window.summary for window in context.highlight_windows
|
||||
]
|
||||
|
||||
render_results = self.renderer.render(
|
||||
workspace_path=str(context.workspace.working_video_path),
|
||||
highlight_windows=context.highlight_windows,
|
||||
transcription=context.transcription,
|
||||
titles=titles,
|
||||
output_dir=context.workspace.output_dir,
|
||||
)
|
||||
|
||||
context.rendered_clips = [
|
||||
RenderedClip(
|
||||
path=Path(path),
|
||||
start=start,
|
||||
end=end,
|
||||
title=title,
|
||||
summary=summary,
|
||||
index=index,
|
||||
)
|
||||
for path, start, end, title, summary, index in render_results
|
||||
]
|
||||
|
||||
def _build_success_payload(self, context: PipelineContext) -> Dict[str, Any]:
|
||||
return {
|
||||
"hasError": False,
|
||||
"videosProcessedQuantity": len(context.rendered_clips),
|
||||
"filename": context.job.filename,
|
||||
"videoId": context.job.video_id,
|
||||
"url": context.job.url,
|
||||
"workspaceFolder": context.workspace.sanitized_name if context.workspace else None,
|
||||
"outputDirectory": self._relative_path(context.workspace.output_dir) if context.workspace else None,
|
||||
"processedFiles": [
|
||||
{
|
||||
"path": self._relative_path(clip.path),
|
||||
"start": clip.start,
|
||||
"end": clip.end,
|
||||
"title": clip.title,
|
||||
"summary": clip.summary,
|
||||
"clipIndex": clip.index,
|
||||
}
|
||||
for clip in context.rendered_clips
|
||||
],
|
||||
}
|
||||
|
||||
def _handle_failure(self, context: PipelineContext, exc: Exception) -> Dict[str, Any]:
|
||||
logger.error("Erro na pipeline: %s", exc)
|
||||
cleanup_targets: List[Path] = []
|
||||
|
||||
if context.workspace:
|
||||
cleanup_targets.append(context.workspace.workspace_dir)
|
||||
cleanup_targets.append(context.workspace.output_dir)
|
||||
original_path = context.workspace.source_path
|
||||
if original_path.exists():
|
||||
cleanup_targets.append(original_path)
|
||||
else:
|
||||
sanitized = sanitize_filename(Path(context.job.filename).stem)
|
||||
job_output_dir = self.settings.outputs_dir / sanitized
|
||||
if job_output_dir.exists():
|
||||
cleanup_targets.append(job_output_dir)
|
||||
original_path = self.settings.videos_dir / context.job.filename
|
||||
if original_path.exists():
|
||||
cleanup_targets.append(original_path)
|
||||
|
||||
remove_paths(cleanup_targets)
|
||||
|
||||
return {
|
||||
"hasError": True,
|
||||
"error": str(exc),
|
||||
"filename": context.job.filename,
|
||||
"videoId": context.job.video_id,
|
||||
"url": context.job.url,
|
||||
"processedFiles": [],
|
||||
}
|
||||
|
||||
def _relative_path(self, path: Path) -> str:
|
||||
base = self.settings.videos_dir.parent
|
||||
try:
|
||||
return str(path.relative_to(base))
|
||||
except ValueError:
|
||||
return str(path)
|
||||
826
video_render/rendering.py
Normal file
826
video_render/rendering.py
Normal file
@@ -0,0 +1,826 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, Iterable, List, Sequence, Tuple, Optional
|
||||
|
||||
import numpy as np
|
||||
from moviepy.audio.AudioClip import AudioArrayClip, AudioClip
|
||||
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
||||
from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip
|
||||
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
|
||||
from moviepy.video.io.VideoFileClip import VideoFileClip
|
||||
from PIL import Image, ImageColor, ImageDraw, ImageFont
|
||||
|
||||
from video_render.config import Settings
|
||||
from video_render.transcription import TranscriptionResult, WordTiming
|
||||
from video_render.smart_framing import SmartFramer, extract_audio_samples
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def clamp_time(value: float, minimum: float = 0.0) -> float:
|
||||
return max(minimum, float(value))
|
||||
|
||||
|
||||
@dataclass
|
||||
class CaptionClipSet:
|
||||
base: ImageClip
|
||||
highlights: List[ImageClip]
|
||||
|
||||
|
||||
class CaptionBuilder:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self.font_path = settings.rendering.font_path
|
||||
|
||||
if not self.font_path.exists():
|
||||
raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}")
|
||||
|
||||
self.font = ImageFont.truetype(
|
||||
str(self.font_path), settings.rendering.subtitle_font_size
|
||||
)
|
||||
self.base_color = ImageColor.getrgb(settings.rendering.base_color)
|
||||
self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color)
|
||||
self.canvas_width = settings.rendering.frame_width - 160
|
||||
self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2)
|
||||
self.min_words = settings.rendering.caption_min_words
|
||||
self.max_words = settings.rendering.caption_max_words
|
||||
|
||||
bbox = self.font.getbbox("Ay")
|
||||
|
||||
self.text_height = bbox[3] - bbox[1]
|
||||
self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1]
|
||||
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
|
||||
|
||||
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
|
||||
# Filter out empty, whitespace-only, or very short words (likely noise)
|
||||
valid_words = [
|
||||
w for w in words
|
||||
if w.word
|
||||
and w.word.strip()
|
||||
and len(w.word.strip()) >= 2 # At least 2 characters
|
||||
and not w.word.strip() in ['...', '..', '.', ',', '-', 'hmm', 'hm', 'ah', 'eh', 'uh'] # Not just punctuation or filler
|
||||
]
|
||||
|
||||
# Note: We don't filter out words based on gaps here
|
||||
# Gap detection is handled in _group_words_with_gaps
|
||||
# This ensures captions disappear during silence naturally
|
||||
filtered_words = valid_words
|
||||
|
||||
# Calculate speech density (words per second)
|
||||
# If density is too low, it's likely just noise/silence being misinterpreted
|
||||
if filtered_words:
|
||||
first_word_time = filtered_words[0].start
|
||||
last_word_time = filtered_words[-1].end
|
||||
duration = last_word_time - first_word_time
|
||||
|
||||
if duration > 0:
|
||||
words_per_second = len(filtered_words) / duration
|
||||
# Typical speech is 2-3 words per second
|
||||
# If less than 0.5 words/second, it's probably silence/noise
|
||||
if words_per_second < 0.5:
|
||||
logger.debug(f"Captions suprimidas: densidade muito baixa ({words_per_second:.2f} palavras/seg)")
|
||||
return []
|
||||
|
||||
# Only show captions if we have at least 3 valid words (reduced from 5 for 2-word groups)
|
||||
# This prevents showing captions for noise/mumbling
|
||||
if len(filtered_words) < 3:
|
||||
return []
|
||||
|
||||
grouped = self._group_words_with_gaps(filtered_words)
|
||||
clip_sets: List[CaptionClipSet] = []
|
||||
|
||||
for group in grouped:
|
||||
group_start = clamp_time(group[0].start, minimum=clip_start)
|
||||
group_end = clamp_time(group[-1].end, minimum=group_start + 0.05)
|
||||
duration = max(0.05, group_end - group_start)
|
||||
start_offset = group_start - clip_start
|
||||
|
||||
base_image, highlight_images = self._render_group(group)
|
||||
|
||||
base_clip = (
|
||||
ImageClip(np.array(base_image))
|
||||
.with_start(start_offset)
|
||||
.with_duration(duration)
|
||||
)
|
||||
|
||||
highlight_clips: List[ImageClip] = []
|
||||
|
||||
for word, image in zip(group, highlight_images):
|
||||
h_start = clamp_time(word.start, minimum=clip_start) - clip_start
|
||||
h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start
|
||||
h_duration = max(0.05, h_end - h_start)
|
||||
highlight_clip = (
|
||||
ImageClip(np.array(image))
|
||||
.with_start(h_start)
|
||||
.with_duration(h_duration)
|
||||
)
|
||||
highlight_clips.append(highlight_clip)
|
||||
|
||||
clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips))
|
||||
|
||||
return clip_sets
|
||||
|
||||
def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]:
|
||||
texts = [self._clean_word(word.word) for word in group]
|
||||
widths = []
|
||||
|
||||
for text in texts:
|
||||
bbox = self.font.getbbox(text)
|
||||
widths.append(bbox[2] - bbox[0])
|
||||
|
||||
total_width = sum(widths)
|
||||
|
||||
if len(widths) > 1:
|
||||
total_width += self.space_width * (len(widths) - 1)
|
||||
|
||||
# Check if text needs to wrap to multiple lines
|
||||
# If total width exceeds canvas width, break into 2 lines
|
||||
needs_wrap = total_width > self.canvas_width
|
||||
|
||||
if needs_wrap:
|
||||
# Split into 2 lines - try to balance the lines
|
||||
mid_point = len(texts) // 2
|
||||
line1_texts = texts[:mid_point]
|
||||
line2_texts = texts[mid_point:]
|
||||
line1_widths = widths[:mid_point]
|
||||
line2_widths = widths[mid_point:]
|
||||
|
||||
# Calculate widths for each line
|
||||
line1_width = sum(line1_widths)
|
||||
if len(line1_widths) > 1:
|
||||
line1_width += self.space_width * (len(line1_widths) - 1)
|
||||
|
||||
line2_width = sum(line2_widths)
|
||||
if len(line2_widths) > 1:
|
||||
line2_width += self.space_width * (len(line2_widths) - 1)
|
||||
|
||||
# Double the canvas height for 2 lines
|
||||
canvas_height = self.canvas_height * 2
|
||||
base_image = Image.new("RGBA", (self.canvas_width, canvas_height), (0, 0, 0, 0))
|
||||
base_draw = ImageDraw.Draw(base_image)
|
||||
highlight_images: List[Image.Image] = []
|
||||
|
||||
# Stroke settings: 8px black stroke for better readability
|
||||
stroke_width = 8
|
||||
stroke_color = (0, 0, 0, 255) # Black
|
||||
|
||||
# Draw line 1
|
||||
x = max(0, (self.canvas_width - line1_width) // 2)
|
||||
y = self.baseline
|
||||
for i, (text, width) in enumerate(zip(line1_texts, line1_widths)):
|
||||
base_draw.text(
|
||||
(x, y),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.base_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
|
||||
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
|
||||
highlight_draw = ImageDraw.Draw(highlight_image)
|
||||
highlight_draw.text(
|
||||
(x, y),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.highlight_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
highlight_images.append(highlight_image)
|
||||
x += width + self.space_width
|
||||
|
||||
# Draw line 2
|
||||
x = max(0, (self.canvas_width - line2_width) // 2)
|
||||
y = self.baseline + self.text_height + 5 # 5px spacing between lines
|
||||
for i, (text, width) in enumerate(zip(line2_texts, line2_widths)):
|
||||
base_draw.text(
|
||||
(x, y),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.base_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
|
||||
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
|
||||
highlight_draw = ImageDraw.Draw(highlight_image)
|
||||
highlight_draw.text(
|
||||
(x, y),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.highlight_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
highlight_images.append(highlight_image)
|
||||
x += width + self.space_width
|
||||
|
||||
return base_image, highlight_images
|
||||
|
||||
# Single line rendering (original code)
|
||||
start_x = max(0, (self.canvas_width - total_width) // 2)
|
||||
|
||||
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
|
||||
base_draw = ImageDraw.Draw(base_image)
|
||||
highlight_images: List[Image.Image] = []
|
||||
x = start_x
|
||||
|
||||
# Stroke settings: 8px black stroke for better readability
|
||||
stroke_width = 8
|
||||
stroke_color = (0, 0, 0, 255) # Black
|
||||
|
||||
for text, width in zip(texts, widths):
|
||||
# Draw base text with stroke
|
||||
base_draw.text(
|
||||
(x, self.baseline),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.base_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
|
||||
# Draw highlight text with stroke
|
||||
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
|
||||
highlight_draw = ImageDraw.Draw(highlight_image)
|
||||
highlight_draw.text(
|
||||
(x, self.baseline),
|
||||
text,
|
||||
font=self.font,
|
||||
fill=self.highlight_color,
|
||||
stroke_width=stroke_width,
|
||||
stroke_fill=stroke_color
|
||||
)
|
||||
highlight_images.append(highlight_image)
|
||||
|
||||
x += width + self.space_width
|
||||
|
||||
return base_image, highlight_images
|
||||
|
||||
def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
|
||||
if not words:
|
||||
return []
|
||||
|
||||
grouped: List[List[WordTiming]] = []
|
||||
buffer: List[WordTiming] = []
|
||||
|
||||
for word in words:
|
||||
buffer.append(word)
|
||||
|
||||
if len(buffer) == self.max_words:
|
||||
grouped.append(buffer)
|
||||
buffer = []
|
||||
|
||||
if buffer:
|
||||
if len(buffer) == 1 and grouped:
|
||||
grouped[-1].extend(buffer)
|
||||
else:
|
||||
grouped.append(buffer)
|
||||
|
||||
for idx, group in enumerate(grouped[:-1]):
|
||||
if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words:
|
||||
deficit = self.min_words - len(group)
|
||||
transfer = grouped[idx + 1][:deficit]
|
||||
grouped[idx] = group + transfer
|
||||
grouped[idx + 1] = grouped[idx + 1][deficit:]
|
||||
|
||||
grouped = [grp for grp in grouped if grp]
|
||||
|
||||
return grouped
|
||||
|
||||
def _group_words_with_gaps(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
|
||||
"""
|
||||
Group words into 2-word chunks, respecting silence gaps.
|
||||
Creates natural breaks where there are pauses > 1.5s
|
||||
"""
|
||||
if not words:
|
||||
return []
|
||||
|
||||
grouped: List[List[WordTiming]] = []
|
||||
buffer: List[WordTiming] = []
|
||||
|
||||
for i, word in enumerate(words):
|
||||
# Check if there's a long pause before this word
|
||||
if i > 0:
|
||||
gap = word.start - words[i-1].end
|
||||
# If gap > 1.5s, finish current buffer and start new group
|
||||
if gap > 1.5:
|
||||
if buffer:
|
||||
grouped.append(buffer)
|
||||
buffer = []
|
||||
|
||||
buffer.append(word)
|
||||
|
||||
# Group into 2 words maximum
|
||||
if len(buffer) == 2:
|
||||
grouped.append(buffer)
|
||||
buffer = []
|
||||
|
||||
# Handle remaining words
|
||||
if buffer:
|
||||
if len(buffer) == 1 and grouped:
|
||||
# Add single remaining word to last group
|
||||
grouped[-1].append(buffer[0])
|
||||
else:
|
||||
grouped.append(buffer)
|
||||
|
||||
return [grp for grp in grouped if grp]
|
||||
|
||||
@staticmethod
|
||||
def _clean_word(text: str) -> str:
|
||||
text = text.strip()
|
||||
text = re.sub(r"\s+", " ", text)
|
||||
return text or "..."
|
||||
|
||||
|
||||
class VideoRenderer:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self.captions = CaptionBuilder(settings)
|
||||
self.smart_framer = SmartFramer(
|
||||
target_width=settings.rendering.frame_width,
|
||||
target_height=settings.rendering.frame_height,
|
||||
frame_skip=settings.rendering.smart_framing_frame_skip,
|
||||
smoothing_window=settings.rendering.smart_framing_smoothing_window,
|
||||
max_velocity=settings.rendering.smart_framing_max_velocity,
|
||||
person_switch_cooldown=settings.rendering.smart_framing_person_switch_cooldown,
|
||||
response_time=settings.rendering.smart_framing_response_time,
|
||||
group_padding=settings.rendering.smart_framing_group_padding,
|
||||
max_zoom_out=settings.rendering.smart_framing_max_zoom_out,
|
||||
dead_zone=settings.rendering.smart_framing_dead_zone,
|
||||
min_face_confidence=settings.rendering.smart_framing_min_confidence
|
||||
)
|
||||
|
||||
def render(
|
||||
self,
|
||||
workspace_path: str,
|
||||
highlight_windows: Sequence,
|
||||
transcription: TranscriptionResult,
|
||||
titles: Sequence[str],
|
||||
output_dir,
|
||||
) -> List[Tuple[str, float, float, str, str, int]]:
|
||||
results: List[Tuple[str, float, float, str, str, int]] = []
|
||||
|
||||
with VideoFileClip(workspace_path) as base_clip:
|
||||
video_duration = base_clip.duration or 0
|
||||
|
||||
for index, window in enumerate(highlight_windows, start=1):
|
||||
start = clamp_time(window.start)
|
||||
end = clamp_time(window.end)
|
||||
start = min(start, video_duration)
|
||||
end = min(end, video_duration)
|
||||
|
||||
if end <= start:
|
||||
logger.info("Janela ignorada por intervalo invalido: %s", window)
|
||||
|
||||
continue
|
||||
|
||||
subclip = base_clip.subclipped(start, end)
|
||||
|
||||
try:
|
||||
rendered_path = self._render_single_clip(
|
||||
subclip=subclip,
|
||||
start=start,
|
||||
end=end,
|
||||
title=titles[index - 1] if index - 1 < len(titles) else window.summary,
|
||||
summary=window.summary,
|
||||
index=index,
|
||||
transcription=transcription,
|
||||
output_dir=output_dir,
|
||||
source_path=workspace_path,
|
||||
)
|
||||
finally:
|
||||
subclip.close()
|
||||
|
||||
results.append(
|
||||
(
|
||||
rendered_path,
|
||||
float(start),
|
||||
float(end),
|
||||
titles[index - 1] if index - 1 < len(titles) else window.summary,
|
||||
window.summary,
|
||||
index,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _render_single_clip(
|
||||
self,
|
||||
subclip: VideoFileClip,
|
||||
start: float,
|
||||
end: float,
|
||||
title: str,
|
||||
summary: str,
|
||||
index: int,
|
||||
transcription: TranscriptionResult,
|
||||
output_dir,
|
||||
source_path: str,
|
||||
) -> str:
|
||||
duration = end - start
|
||||
frame_w = self.settings.rendering.frame_width
|
||||
frame_h = self.settings.rendering.frame_height
|
||||
# Removed top panel - no longer showing title
|
||||
bottom_h = int(frame_h * 0.20)
|
||||
|
||||
# Use smart framing to create intelligent 9:16 video (if enabled)
|
||||
if self.settings.rendering.enable_smart_framing:
|
||||
logger.info(f"Creating smart framing plan for clip {index} ({start:.2f}s - {end:.2f}s)")
|
||||
|
||||
try:
|
||||
# Extract audio for speech detection
|
||||
audio_samples = extract_audio_samples(source_path, start, end)
|
||||
|
||||
# Create framing plan
|
||||
framing_plan = self.smart_framer.create_framing_plan(
|
||||
video_path=source_path,
|
||||
start_time=start,
|
||||
end_time=end,
|
||||
audio_samples=audio_samples
|
||||
)
|
||||
|
||||
# Apply smart framing (always single-person focus)
|
||||
video_clip = self.smart_framer.apply_framing(
|
||||
video_clip=subclip,
|
||||
framing_plan=framing_plan
|
||||
)
|
||||
|
||||
logger.info(f"Smart framing applied: layout={framing_plan.layout_mode}, "
|
||||
f"faces_detected={len(framing_plan.frame_contexts[0].detected_faces) if framing_plan.frame_contexts else 0}")
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning(f"Smart framing failed for clip {index}, falling back to center crop: {exc}", exc_info=True)
|
||||
|
||||
# Fallback to center crop (maintains aspect ratio, crops to fit)
|
||||
video_area_h = max(1, frame_h - bottom_h)
|
||||
|
||||
# Use MAX to ensure video covers entire area (will crop excess)
|
||||
scale_factor = max(
|
||||
frame_w / subclip.w,
|
||||
video_area_h / subclip.h,
|
||||
)
|
||||
|
||||
# Resize to cover area
|
||||
resized_clip = subclip.resized(scale_factor)
|
||||
|
||||
# Calculate crop region (center crop)
|
||||
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
|
||||
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
|
||||
crop_x2 = crop_x1 + frame_w
|
||||
crop_y2 = crop_y1 + video_area_h
|
||||
|
||||
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
|
||||
cropped_clip = resized_clip.cropped(
|
||||
x1=crop_x1,
|
||||
y1=crop_y1,
|
||||
x2=crop_x2,
|
||||
y2=crop_y2
|
||||
)
|
||||
|
||||
video_clip = cropped_clip.with_position((0, 0))
|
||||
resized_clip.close()
|
||||
else:
|
||||
# Use center crop (smart framing disabled)
|
||||
logger.info(f"Using center crop for clip {index} (smart framing disabled)")
|
||||
video_area_h = max(1, frame_h - bottom_h)
|
||||
|
||||
# Use MAX to ensure video covers entire area (will crop excess)
|
||||
scale_factor = max(
|
||||
frame_w / subclip.w,
|
||||
video_area_h / subclip.h,
|
||||
)
|
||||
|
||||
# Resize to cover area
|
||||
resized_clip = subclip.resized(scale_factor)
|
||||
|
||||
# Calculate crop region (center crop)
|
||||
crop_x1 = max(0, (resized_clip.w - frame_w) // 2)
|
||||
crop_y1 = max(0, (resized_clip.h - video_area_h) // 2)
|
||||
crop_x2 = crop_x1 + frame_w
|
||||
crop_y2 = crop_y1 + video_area_h
|
||||
|
||||
# Crop to fit target dimensions using MoviePy crop(x1, y1, x2, y2)
|
||||
cropped_clip = resized_clip.cropped(
|
||||
x1=crop_x1,
|
||||
y1=crop_y1,
|
||||
x2=crop_x2,
|
||||
y2=crop_y2
|
||||
)
|
||||
|
||||
video_clip = cropped_clip.with_position((0, 0))
|
||||
resized_clip.close()
|
||||
|
||||
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
|
||||
# Removed top panel and title - no longer needed
|
||||
bottom_panel = (
|
||||
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
|
||||
.with_position((0, frame_h - bottom_h))
|
||||
.with_duration(duration)
|
||||
.with_opacity(0.85)
|
||||
)
|
||||
|
||||
words = self._collect_words(transcription, start, end)
|
||||
|
||||
# Calculate speech coverage: how much of the clip has actual speech?
|
||||
# If less than 30% of the clip has speech, don't show captions
|
||||
clip_duration = end - start
|
||||
if words and clip_duration > 0:
|
||||
# Calculate total time with speech
|
||||
total_speech_time = sum(w.end - w.start for w in words)
|
||||
speech_coverage = total_speech_time / clip_duration
|
||||
|
||||
if speech_coverage < 0.3: # Less than 30% speech
|
||||
logger.debug(f"Captions suprimidas: cobertura de fala baixa ({speech_coverage:.1%})")
|
||||
words = [] # Clear words to prevent captions
|
||||
|
||||
# Only build captions if there are actual words to display
|
||||
# This prevents empty/placeholder captions from appearing
|
||||
caption_sets = self.captions.build(words, clip_start=start) if words else []
|
||||
|
||||
caption_clips = []
|
||||
caption_resources: List[ImageClip] = []
|
||||
|
||||
# Position captions 120px below center (for 1920px height, center is 960px, so 1080px)
|
||||
# This ensures they're visible, well-positioned, and don't interfere with faces
|
||||
# Range: 100-150px as requested, using 120px for optimal positioning
|
||||
center_y = frame_h // 2
|
||||
caption_y = center_y + 120
|
||||
caption_margin = 20
|
||||
|
||||
# Ensure captions stay within reasonable bounds (no top panel now)
|
||||
min_caption_y = caption_margin
|
||||
max_caption_y = frame_h - bottom_h - self.captions.canvas_height - caption_margin
|
||||
|
||||
if max_caption_y < min_caption_y:
|
||||
caption_y = min_caption_y
|
||||
else:
|
||||
caption_y = min(max(caption_y, min_caption_y), max_caption_y)
|
||||
|
||||
for clip_set in caption_sets:
|
||||
base_positioned = clip_set.base.with_position(("center", caption_y))
|
||||
caption_clips.append(base_positioned)
|
||||
caption_resources.append(clip_set.base)
|
||||
for highlight in clip_set.highlights:
|
||||
positioned = highlight.with_position(("center", caption_y))
|
||||
caption_clips.append(positioned)
|
||||
caption_resources.append(highlight)
|
||||
|
||||
# No fallback captions - if there are no dynamic captions, show nothing
|
||||
# This matches Opus Clip behavior where captions only appear when there's actual speech
|
||||
|
||||
audio_clip, audio_needs_close = self._materialize_audio(
|
||||
source_path=source_path,
|
||||
start=start,
|
||||
end=end,
|
||||
duration=duration,
|
||||
fallback_audio=video_clip.audio or subclip.audio,
|
||||
)
|
||||
|
||||
# Composite with background, bottom panel, video, and captions only (no top panel or title)
|
||||
composite = CompositeVideoClip(
|
||||
[background, bottom_panel, video_clip, *caption_clips],
|
||||
size=(frame_w, frame_h),
|
||||
)
|
||||
if audio_clip is not None:
|
||||
composite = self._with_audio(composite, audio_clip)
|
||||
|
||||
output_path = output_dir / f"clip_{index:02d}.mp4"
|
||||
self._write_with_fallback(
|
||||
composite=composite,
|
||||
output_path=output_path,
|
||||
index=index,
|
||||
output_dir=output_dir,
|
||||
)
|
||||
|
||||
composite.close()
|
||||
video_clip.close()
|
||||
background.close()
|
||||
bottom_panel.close()
|
||||
for clip in caption_clips:
|
||||
clip.close()
|
||||
for clip in caption_resources:
|
||||
clip.close()
|
||||
if audio_clip is not None and audio_needs_close:
|
||||
audio_clip.close()
|
||||
|
||||
# Force garbage collection to free memory after rendering
|
||||
import gc
|
||||
gc.collect()
|
||||
|
||||
return str(output_path)
|
||||
|
||||
def _materialize_audio(
|
||||
self,
|
||||
*,
|
||||
source_path: str,
|
||||
start: float,
|
||||
end: float,
|
||||
duration: float,
|
||||
fallback_audio,
|
||||
) -> Tuple[Optional[AudioClip], bool]:
|
||||
try:
|
||||
with AudioFileClip(source_path) as audio_file:
|
||||
segment = audio_file.subclipped(start, end)
|
||||
fps = (
|
||||
getattr(segment, "fps", None)
|
||||
or getattr(audio_file, "fps", None)
|
||||
or 44100
|
||||
)
|
||||
samples = segment.to_soundarray(fps=fps)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Falha ao carregar audio independente; utilizando fluxo original",
|
||||
exc_info=True,
|
||||
)
|
||||
return fallback_audio, False
|
||||
|
||||
audio_clip = AudioArrayClip(samples, fps=fps).with_duration(duration)
|
||||
return audio_clip, True
|
||||
|
||||
def _collect_words(
|
||||
self, transcription: TranscriptionResult, start: float, end: float
|
||||
) -> List[WordTiming]:
|
||||
collected: List[WordTiming] = []
|
||||
for segment in transcription.segments:
|
||||
if segment.end < start or segment.start > end:
|
||||
continue
|
||||
|
||||
if segment.words:
|
||||
for word in segment.words:
|
||||
if word.end < start or word.start > end:
|
||||
continue
|
||||
collected.append(
|
||||
WordTiming(
|
||||
start=max(start, word.start),
|
||||
end=min(end, word.end),
|
||||
word=word.word,
|
||||
)
|
||||
)
|
||||
else:
|
||||
collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end))
|
||||
|
||||
collected.sort(key=lambda w: w.start)
|
||||
return collected
|
||||
|
||||
def _fallback_words(
|
||||
self,
|
||||
text: str,
|
||||
segment_start: float,
|
||||
segment_end: float,
|
||||
window_start: float,
|
||||
window_end: float,
|
||||
) -> Iterable[WordTiming]:
|
||||
words = [w for w in re.split(r"\s+", text.strip()) if w]
|
||||
if not words:
|
||||
return []
|
||||
|
||||
seg_start = max(segment_start, window_start)
|
||||
seg_end = min(segment_end, window_end)
|
||||
duration = max(0.01, seg_end - seg_start)
|
||||
step = duration / len(words)
|
||||
|
||||
timings: List[WordTiming] = []
|
||||
for idx, word in enumerate(words):
|
||||
w_start = seg_start + idx * step
|
||||
w_end = min(seg_end, w_start + step)
|
||||
timings.append(WordTiming(start=w_start, end=w_end, word=word))
|
||||
return timings
|
||||
|
||||
@staticmethod
|
||||
def _wrap_text(text: str, max_width: int) -> str:
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
words = text.split()
|
||||
lines: List[str] = []
|
||||
current: List[str] = []
|
||||
for word in words:
|
||||
current.append(word)
|
||||
if len(" ".join(current)) > max_width // 18:
|
||||
lines.append(" ".join(current[:-1]))
|
||||
current = [current[-1]]
|
||||
if current:
|
||||
lines.append(" ".join(current))
|
||||
return "\n".join(lines)
|
||||
|
||||
def _write_with_fallback(
|
||||
self,
|
||||
*,
|
||||
composite: CompositeVideoClip,
|
||||
output_path,
|
||||
index: int,
|
||||
output_dir,
|
||||
) -> None:
|
||||
attempts = self._encoding_attempts()
|
||||
temp_audio_path = output_dir / f"temp_audio_{index:02d}.m4a"
|
||||
last_error: Exception | None = None
|
||||
|
||||
for attempt in attempts:
|
||||
codec = attempt["codec"]
|
||||
bitrate = attempt["bitrate"]
|
||||
preset = attempt["preset"]
|
||||
|
||||
ffmpeg_params = ["-pix_fmt", "yuv420p"]
|
||||
if preset:
|
||||
ffmpeg_params = ["-preset", preset, "-pix_fmt", "yuv420p"]
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
"Renderizando clip %02d com codec %s (bitrate=%s, preset=%s)",
|
||||
index,
|
||||
codec,
|
||||
bitrate,
|
||||
preset or "default",
|
||||
)
|
||||
composite.write_videofile(
|
||||
str(output_path),
|
||||
codec=codec,
|
||||
audio_codec=self.settings.rendering.audio_codec,
|
||||
fps=self.settings.rendering.fps,
|
||||
bitrate=bitrate,
|
||||
ffmpeg_params=ffmpeg_params,
|
||||
temp_audiofile=str(temp_audio_path),
|
||||
remove_temp=True,
|
||||
threads=4,
|
||||
)
|
||||
return
|
||||
except Exception as exc: # noqa: BLE001 - propagate after fallbacks
|
||||
last_error = exc
|
||||
logger.warning(
|
||||
"Falha ao renderizar com codec %s: %s", codec, exc, exc_info=True
|
||||
)
|
||||
if output_path.exists():
|
||||
output_path.unlink(missing_ok=True)
|
||||
if temp_audio_path.exists():
|
||||
temp_audio_path.unlink(missing_ok=True)
|
||||
|
||||
raise RuntimeError("Todas as tentativas de renderizacao falharam") from last_error
|
||||
|
||||
def _encoding_attempts(self) -> List[Dict[str, str | None]]:
|
||||
settings = self.settings.rendering
|
||||
attempts: List[Dict[str, str | None]] = []
|
||||
|
||||
attempts.append(
|
||||
{
|
||||
"codec": settings.video_codec,
|
||||
"bitrate": settings.bitrate,
|
||||
"preset": settings.preset,
|
||||
}
|
||||
)
|
||||
|
||||
deduped: List[Dict[str, str | None]] = []
|
||||
seen = set()
|
||||
for attempt in attempts:
|
||||
key = (attempt["codec"], attempt["bitrate"], attempt["preset"])
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
deduped.append(attempt)
|
||||
|
||||
return deduped
|
||||
|
||||
@staticmethod
|
||||
def _with_audio(
|
||||
composite: CompositeVideoClip,
|
||||
audio_clip,
|
||||
) -> CompositeVideoClip:
|
||||
"""Attach audio to a composite clip across MoviePy versions."""
|
||||
if hasattr(composite, "with_audio"):
|
||||
return composite.with_audio(audio_clip)
|
||||
if hasattr(composite, "set_audio"):
|
||||
return composite.set_audio(audio_clip)
|
||||
raise AttributeError("CompositeVideoClip does not support audio assignment")
|
||||
|
||||
@staticmethod
|
||||
def _make_textclip(
|
||||
*,
|
||||
text: str,
|
||||
font_path,
|
||||
font_size: int,
|
||||
color: str,
|
||||
size: Tuple[int, int],
|
||||
) -> TextClip:
|
||||
"""Create a TextClip compatible with MoviePy 1.x and 2.x.
|
||||
|
||||
MoviePy 2.x removed the 'align' keyword from TextClip. We try with
|
||||
'align' for older versions and fall back to a call without it when
|
||||
unsupported.
|
||||
"""
|
||||
kwargs = dict(
|
||||
text=text,
|
||||
font=str(font_path),
|
||||
font_size=font_size,
|
||||
color=color,
|
||||
method="caption",
|
||||
size=size,
|
||||
)
|
||||
try:
|
||||
return TextClip(**kwargs, align="center") # MoviePy 1.x style
|
||||
except TypeError:
|
||||
logger.debug("TextClip 'align' not supported; falling back without it")
|
||||
return TextClip(**kwargs) # MoviePy 2.x style
|
||||
1153
video_render/smart_framing.py
Normal file
1153
video_render/smart_framing.py
Normal file
File diff suppressed because it is too large
Load Diff
333
video_render/transcription.py
Normal file
333
video_render/transcription.py
Normal file
@@ -0,0 +1,333 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
from video_render.config import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class WordTiming:
|
||||
start: float
|
||||
end: float
|
||||
word: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TranscriptSegment:
|
||||
id: int
|
||||
start: float
|
||||
end: float
|
||||
text: str
|
||||
words: List[WordTiming]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TranscriptionResult:
|
||||
segments: List[TranscriptSegment]
|
||||
full_text: str
|
||||
|
||||
|
||||
class TranscriptionService:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self._model: Optional[WhisperModel] = None
|
||||
|
||||
def _load_model(self) -> WhisperModel:
|
||||
if self._model is None:
|
||||
logger.info(
|
||||
"Carregando modelo Faster-Whisper '%s' (device=%s, compute_type=%s)",
|
||||
self.settings.whisper.model_size,
|
||||
self.settings.whisper.device or "auto",
|
||||
self.settings.whisper.compute_type or "default",
|
||||
)
|
||||
self._model = WhisperModel(
|
||||
self.settings.whisper.model_size,
|
||||
device=self.settings.whisper.device or "auto",
|
||||
compute_type=self.settings.whisper.compute_type or "default",
|
||||
download_root=str(self.settings.whisper.download_root),
|
||||
)
|
||||
return self._model
|
||||
|
||||
def unload_model(self) -> None:
|
||||
"""Unload the Whisper model to free memory (reduces RAM usage by 1-3GB)."""
|
||||
if self._model is not None:
|
||||
logger.info("Descarregando modelo Whisper para liberar memória...")
|
||||
del self._model
|
||||
self._model = None
|
||||
# Force garbage collection to immediately free GPU/CPU memory
|
||||
import gc
|
||||
gc.collect()
|
||||
logger.info("Modelo Whisper descarregado com sucesso")
|
||||
|
||||
def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult:
|
||||
if output_dir is not None:
|
||||
existing_transcription = self.load(output_dir)
|
||||
if existing_transcription is not None:
|
||||
logger.info("Transcrição já existe em %s, reutilizando...", output_dir)
|
||||
return existing_transcription
|
||||
|
||||
# Get audio duration to decide if we need chunked processing
|
||||
audio_duration = self._get_audio_duration(audio_path)
|
||||
chunk_duration_minutes = 30 # Process in 30-minute chunks for long videos
|
||||
chunk_duration_seconds = chunk_duration_minutes * 60
|
||||
|
||||
# For videos longer than 30 minutes, use chunked processing to avoid OOM
|
||||
if audio_duration > chunk_duration_seconds:
|
||||
logger.info(
|
||||
f"Áudio longo detectado ({audio_duration/60:.1f} min). "
|
||||
f"Processando em chunks de {chunk_duration_minutes} min para evitar erro de memória..."
|
||||
)
|
||||
return self._transcribe_chunked(audio_path, chunk_duration_seconds)
|
||||
else:
|
||||
logger.info(f"Iniciando transcrição do áudio ({audio_duration/60:.1f} min) com FasterWhisper...")
|
||||
return self._transcribe_full(audio_path)
|
||||
|
||||
def _get_audio_duration(self, audio_path: Path) -> float:
|
||||
"""Get audio duration in seconds."""
|
||||
try:
|
||||
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
||||
with AudioFileClip(str(audio_path)) as audio:
|
||||
return audio.duration or 0.0
|
||||
except Exception as e:
|
||||
logger.warning(f"Falha ao obter duração do áudio, assumindo curto: {e}")
|
||||
return 0.0 # Assume short if we can't determine
|
||||
|
||||
def _transcribe_full(self, audio_path: Path) -> TranscriptionResult:
|
||||
"""Transcribe entire audio at once (for shorter videos)."""
|
||||
model = self._load_model()
|
||||
segments, _ = model.transcribe(
|
||||
str(audio_path),
|
||||
beam_size=5,
|
||||
word_timestamps=True,
|
||||
)
|
||||
|
||||
parsed_segments: List[TranscriptSegment] = []
|
||||
full_text_parts: List[str] = []
|
||||
|
||||
for idx, segment in enumerate(segments):
|
||||
words = [
|
||||
WordTiming(start=w.start, end=w.end, word=w.word.strip())
|
||||
for w in segment.words or []
|
||||
if w.word.strip()
|
||||
]
|
||||
text = segment.text.strip()
|
||||
full_text_parts.append(text)
|
||||
parsed_segments.append(
|
||||
TranscriptSegment(
|
||||
id=idx,
|
||||
start=segment.start,
|
||||
end=segment.end,
|
||||
text=text,
|
||||
words=words,
|
||||
)
|
||||
)
|
||||
|
||||
return TranscriptionResult(
|
||||
segments=parsed_segments,
|
||||
full_text=" ".join(full_text_parts).strip(),
|
||||
)
|
||||
|
||||
def _transcribe_chunked(self, audio_path: Path, chunk_duration: float) -> TranscriptionResult:
|
||||
"""Transcribe audio in chunks to avoid OOM on long videos."""
|
||||
import subprocess
|
||||
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
||||
|
||||
model = self._load_model()
|
||||
all_segments: List[TranscriptSegment] = []
|
||||
full_text_parts: List[str] = []
|
||||
segment_id_counter = 0
|
||||
|
||||
# Get total duration
|
||||
total_duration = self._get_audio_duration(audio_path)
|
||||
num_chunks = int(np.ceil(total_duration / chunk_duration))
|
||||
|
||||
logger.info(f"Processando áudio em {num_chunks} chunks...")
|
||||
|
||||
for chunk_idx in range(num_chunks):
|
||||
start_time = chunk_idx * chunk_duration
|
||||
end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
|
||||
|
||||
logger.info(
|
||||
f"Processando chunk {chunk_idx + 1}/{num_chunks} "
|
||||
f"({start_time/60:.1f}min - {end_time/60:.1f}min)..."
|
||||
)
|
||||
|
||||
# Extract chunk using ffmpeg directly (more reliable than moviepy subclip)
|
||||
temp_chunk_path = audio_path.parent / f"temp_chunk_{chunk_idx}.wav"
|
||||
try:
|
||||
# Use ffmpeg to extract the chunk
|
||||
chunk_duration_actual = end_time - start_time
|
||||
ffmpeg_cmd = [
|
||||
'ffmpeg',
|
||||
'-y', # Overwrite output file
|
||||
'-ss', str(start_time), # Start time
|
||||
'-i', str(audio_path), # Input file
|
||||
'-t', str(chunk_duration_actual), # Duration
|
||||
'-acodec', 'pcm_s16le', # Audio codec
|
||||
'-ar', '44100', # Sample rate
|
||||
'-ac', '2', # Stereo
|
||||
'-loglevel', 'error', # Only show errors
|
||||
str(temp_chunk_path)
|
||||
]
|
||||
|
||||
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
|
||||
|
||||
# Transcribe chunk
|
||||
segments, _ = model.transcribe(
|
||||
str(temp_chunk_path),
|
||||
beam_size=5,
|
||||
word_timestamps=True,
|
||||
)
|
||||
|
||||
# Process segments with time offset
|
||||
for segment in segments:
|
||||
words = [
|
||||
WordTiming(
|
||||
start=w.start + start_time,
|
||||
end=w.end + start_time,
|
||||
word=w.word.strip()
|
||||
)
|
||||
for w in segment.words or []
|
||||
if w.word.strip()
|
||||
]
|
||||
text = segment.text.strip()
|
||||
full_text_parts.append(text)
|
||||
all_segments.append(
|
||||
TranscriptSegment(
|
||||
id=segment_id_counter,
|
||||
start=segment.start + start_time,
|
||||
end=segment.end + start_time,
|
||||
text=text,
|
||||
words=words,
|
||||
)
|
||||
)
|
||||
segment_id_counter += 1
|
||||
|
||||
# Force garbage collection after each chunk
|
||||
import gc
|
||||
gc.collect()
|
||||
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"Erro ao extrair chunk {chunk_idx}: {e.stderr.decode() if e.stderr else str(e)}")
|
||||
raise
|
||||
finally:
|
||||
# Clean up temp chunk
|
||||
if temp_chunk_path.exists():
|
||||
temp_chunk_path.unlink()
|
||||
|
||||
logger.info(f"Transcrição em chunks concluída: {len(all_segments)} segmentos processados")
|
||||
|
||||
return TranscriptionResult(
|
||||
segments=all_segments,
|
||||
full_text=" ".join(full_text_parts).strip(),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def persist(result: TranscriptionResult, destination: Path) -> None:
|
||||
json_path = destination / "transcription.json"
|
||||
text_path = destination / "transcription.txt"
|
||||
|
||||
payload = {
|
||||
"segments": [
|
||||
{
|
||||
"id": segment.id,
|
||||
"start": segment.start,
|
||||
"end": segment.end,
|
||||
"text": segment.text,
|
||||
"words": [
|
||||
{"start": word.start, "end": word.end, "text": word.word}
|
||||
for word in segment.words
|
||||
],
|
||||
}
|
||||
for segment in result.segments
|
||||
],
|
||||
"full_text": result.full_text,
|
||||
}
|
||||
|
||||
with json_path.open("w", encoding="utf-8") as fp:
|
||||
json.dump(payload, fp, ensure_ascii=False, indent=2)
|
||||
|
||||
with text_path.open("w", encoding="utf-8") as fp:
|
||||
fp.write(result.full_text)
|
||||
|
||||
logger.info("Transcricao salva em %s", destination)
|
||||
|
||||
@staticmethod
|
||||
def load(source: Path) -> Optional[TranscriptionResult]:
|
||||
json_path = source / "transcription.json"
|
||||
if not json_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
with json_path.open("r", encoding="utf-8") as fp:
|
||||
payload = json.load(fp)
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
logger.warning(
|
||||
"Falha ao carregar transcricao existente de %s: %s", json_path, exc
|
||||
)
|
||||
return None
|
||||
|
||||
segments_payload = payload.get("segments", [])
|
||||
if not isinstance(segments_payload, list):
|
||||
logger.warning(
|
||||
"Formato inesperado ao carregar transcricao de %s: 'segments' invalido",
|
||||
json_path,
|
||||
)
|
||||
return None
|
||||
|
||||
segments: List[TranscriptSegment] = []
|
||||
for idx, segment_data in enumerate(segments_payload):
|
||||
if not isinstance(segment_data, dict):
|
||||
logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data)
|
||||
continue
|
||||
try:
|
||||
segment_id = int(segment_data.get("id", idx))
|
||||
start = float(segment_data["start"])
|
||||
end = float(segment_data["end"])
|
||||
except (KeyError, TypeError, ValueError):
|
||||
logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data)
|
||||
continue
|
||||
|
||||
text = str(segment_data.get("text", "")).strip()
|
||||
words_payload = segment_data.get("words", [])
|
||||
words: List[WordTiming] = []
|
||||
|
||||
if isinstance(words_payload, list):
|
||||
for word_data in words_payload:
|
||||
if not isinstance(word_data, dict):
|
||||
continue
|
||||
try:
|
||||
w_start = float(word_data["start"])
|
||||
w_end = float(word_data["end"])
|
||||
except (KeyError, TypeError, ValueError):
|
||||
logger.debug(
|
||||
"Palavra sem dados obrigatorios ignorada: %s", word_data
|
||||
)
|
||||
continue
|
||||
word_text = str(word_data.get("text", "")).strip()
|
||||
if not word_text:
|
||||
continue
|
||||
words.append(WordTiming(start=w_start, end=w_end, word=word_text))
|
||||
|
||||
segments.append(
|
||||
TranscriptSegment(
|
||||
id=segment_id,
|
||||
start=start,
|
||||
end=end,
|
||||
text=text,
|
||||
words=words,
|
||||
)
|
||||
)
|
||||
|
||||
full_text = str(payload.get("full_text", "")).strip()
|
||||
return TranscriptionResult(segments=segments, full_text=full_text)
|
||||
|
||||
80
video_render/utils.py
Normal file
80
video_render/utils.py
Normal file
@@ -0,0 +1,80 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import unicodedata
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
|
||||
def sanitize_filename(name: str) -> str:
|
||||
normalized = unicodedata.normalize("NFKD", name)
|
||||
ascii_text = normalized.encode("ASCII", "ignore").decode()
|
||||
ascii_text = ascii_text.lower()
|
||||
ascii_text = ascii_text.replace(" ", "_")
|
||||
ascii_text = re.sub(r"[^a-z0-9_\-\.]", "", ascii_text)
|
||||
ascii_text = re.sub(r"_+", "_", ascii_text)
|
||||
return ascii_text.strip("_") or "video"
|
||||
|
||||
|
||||
def ensure_workspace(root: Path, folder_name: str) -> Path:
|
||||
workspace = root / folder_name
|
||||
workspace.mkdir(parents=True, exist_ok=True)
|
||||
return workspace
|
||||
|
||||
|
||||
def remove_paths(paths: Iterable[Path]) -> None:
|
||||
import logging
|
||||
import time
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
for path in paths:
|
||||
if not path.exists():
|
||||
continue
|
||||
|
||||
# Try to remove with retries and better error handling
|
||||
max_retries = 3
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
if path.is_file() or path.is_symlink():
|
||||
path.unlink(missing_ok=True)
|
||||
else:
|
||||
for child in sorted(path.rglob("*"), reverse=True):
|
||||
if child.is_file() or child.is_symlink():
|
||||
try:
|
||||
child.unlink(missing_ok=True)
|
||||
except PermissionError:
|
||||
logger.warning(f"Não foi possível deletar {child}: sem permissão")
|
||||
# Try to change permissions and retry
|
||||
try:
|
||||
child.chmod(0o777)
|
||||
child.unlink(missing_ok=True)
|
||||
except Exception as e:
|
||||
logger.warning(f"Falha ao forçar deleção de {child}: {e}")
|
||||
elif child.is_dir():
|
||||
try:
|
||||
child.rmdir()
|
||||
except (PermissionError, OSError) as e:
|
||||
logger.warning(f"Não foi possível remover diretório {child}: {e}")
|
||||
|
||||
try:
|
||||
path.rmdir()
|
||||
except (PermissionError, OSError) as e:
|
||||
logger.warning(f"Não foi possível remover diretório {path}: {e}")
|
||||
break # Success, exit retry loop
|
||||
|
||||
except PermissionError as e:
|
||||
if attempt < max_retries - 1:
|
||||
logger.warning(f"Tentativa {attempt + 1}/{max_retries} falhou ao deletar {path}: {e}. Tentando novamente...")
|
||||
time.sleep(0.5) # Wait a bit before retry
|
||||
# Try to change permissions
|
||||
try:
|
||||
path.chmod(0o777)
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
logger.error(f"Não foi possível deletar {path} após {max_retries} tentativas: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Erro inesperado ao deletar {path}: {e}")
|
||||
break # Don't retry on unexpected errors
|
||||
|
||||
Reference in New Issue
Block a user