261 lines
9.2 KiB
Python
261 lines
9.2 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from video_render.config import Settings
|
|
from video_render.llm import OpenRouterCopywriter
|
|
from video_render.media import MediaPreparer, VideoWorkspace
|
|
from video_render.transcription import TranscriptionResult, TranscriptionService
|
|
from video_render.utils import remove_paths, sanitize_filename
|
|
from video_render.rendering import VideoRenderer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class JobMessage:
|
|
filename: str
|
|
url: Optional[str]
|
|
video_id: Optional[str]
|
|
extras: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class HighlightWindow:
|
|
start: float
|
|
end: float
|
|
summary: str
|
|
title: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class RenderedClip:
|
|
path: Path
|
|
start: float
|
|
end: float
|
|
title: str
|
|
summary: str
|
|
index: int
|
|
|
|
|
|
@dataclass
|
|
class PipelineContext:
|
|
job: JobMessage
|
|
workspace: Optional[VideoWorkspace] = None
|
|
transcription: Optional[TranscriptionResult] = None
|
|
highlight_windows: List[HighlightWindow] = field(default_factory=list)
|
|
rendered_clips: List[RenderedClip] = field(default_factory=list)
|
|
|
|
|
|
class VideoPipeline:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self.media_preparer = MediaPreparer(settings)
|
|
self.transcriber = TranscriptionService(settings)
|
|
self.llm_service = OpenRouterCopywriter(settings) # Using OpenRouter for both highlights and titles
|
|
self.renderer = VideoRenderer(settings)
|
|
|
|
def process_message(self, message: Dict[str, Any]) -> Dict[str, Any]:
|
|
context = PipelineContext(job=self._parse_job(message))
|
|
try:
|
|
self._prepare_workspace(context)
|
|
self._generate_transcription(context)
|
|
self._determine_highlights(context)
|
|
self._render_clips(context)
|
|
|
|
return self._build_success_payload(context)
|
|
except Exception as exc:
|
|
logger.exception("Falha ao processar vídeo %s", context.job.filename)
|
|
return self._handle_failure(context, exc)
|
|
|
|
def _parse_job(self, message: Dict[str, Any]) -> JobMessage:
|
|
filename = message.get("filename")
|
|
|
|
if not filename:
|
|
raise ValueError("Mensagem inválida: 'filename' é obrigatório")
|
|
|
|
url = message.get("url")
|
|
video_id = message.get("videoId") or message.get("video_id")
|
|
extras = {
|
|
key: value
|
|
for key, value in message.items()
|
|
if key not in {"filename", "url", "videoId", "video_id"}
|
|
}
|
|
return JobMessage(filename=filename, url=url, video_id=video_id, extras=extras)
|
|
|
|
def _prepare_workspace(self, context: PipelineContext) -> None:
|
|
context.workspace = self.media_preparer.prepare(context.job.filename)
|
|
|
|
def _generate_transcription(self, context: PipelineContext) -> None:
|
|
if not context.workspace:
|
|
raise RuntimeError("Workspace não preparado")
|
|
existing = TranscriptionService.load(context.workspace.workspace_dir)
|
|
if existing:
|
|
logger.info(
|
|
"Transcricao existente encontrada em %s; reutilizando resultado",
|
|
context.workspace.workspace_dir,
|
|
)
|
|
context.transcription = existing
|
|
return
|
|
|
|
transcription = self.transcriber.transcribe(
|
|
context.workspace.audio_path,
|
|
output_dir=context.workspace.workspace_dir
|
|
)
|
|
TranscriptionService.persist(transcription, context.workspace.workspace_dir)
|
|
context.transcription = transcription
|
|
|
|
# Unload Whisper model immediately after transcription to free memory (1-3GB)
|
|
self.transcriber.unload_model()
|
|
|
|
def _determine_highlights(self, context: PipelineContext) -> None:
|
|
if not context.transcription:
|
|
raise RuntimeError("Transcricao nao disponivel")
|
|
|
|
try:
|
|
highlights_raw = self.llm_service.generate_highlights(context.transcription)
|
|
except Exception:
|
|
logger.exception(
|
|
"Falha ao gerar destaques com OpenRouter; aplicando fallback padrao."
|
|
)
|
|
context.highlight_windows = [self._build_fallback_highlight(context)]
|
|
return
|
|
|
|
windows: List[HighlightWindow] = []
|
|
|
|
for item in highlights_raw:
|
|
try:
|
|
start = float(item.get("start", 0)) # type: ignore[arg-type]
|
|
end = float(item.get("end", start)) # type: ignore[arg-type]
|
|
except (TypeError, ValueError):
|
|
logger.warning("Highlight invalido ignorado: %s", item)
|
|
continue
|
|
|
|
summary = str(item.get("summary", "")).strip()
|
|
title = str(item.get("title", summary[:60])).strip()
|
|
|
|
if end <= start:
|
|
logger.debug("Highlight com intervalo invalido ignorado: %s", item)
|
|
continue
|
|
|
|
windows.append(HighlightWindow(start=start, end=end, summary=summary, title=title))
|
|
|
|
if not windows:
|
|
windows.append(self._build_fallback_highlight(context))
|
|
|
|
context.highlight_windows = windows
|
|
|
|
def _generate_titles(self, context: PipelineContext) -> None:
|
|
"""DEPRECATED: Titles are now generated together with highlights.
|
|
|
|
This method is kept for backwards compatibility but does nothing.
|
|
Titles are extracted from highlights in _determine_highlights().
|
|
"""
|
|
pass
|
|
|
|
def _build_fallback_highlight(self, context: PipelineContext) -> HighlightWindow:
|
|
if not context.transcription:
|
|
raise RuntimeError("Transcricao nao disponivel para criar fallback")
|
|
|
|
last_end = (
|
|
context.transcription.segments[-1].end
|
|
if context.transcription.segments
|
|
else 0.0
|
|
)
|
|
return HighlightWindow(
|
|
start=0.0,
|
|
end=max(last_end, 10.0),
|
|
summary="Sem destaque identificado; fallback automatico.",
|
|
title="Confira este momento",
|
|
)
|
|
|
|
def _render_clips(self, context: PipelineContext) -> None:
|
|
if not context.workspace or not context.highlight_windows or not context.transcription:
|
|
return
|
|
|
|
titles = [
|
|
window.title or window.summary for window in context.highlight_windows
|
|
]
|
|
|
|
render_results = self.renderer.render(
|
|
workspace_path=str(context.workspace.working_video_path),
|
|
highlight_windows=context.highlight_windows,
|
|
transcription=context.transcription,
|
|
titles=titles,
|
|
output_dir=context.workspace.output_dir,
|
|
)
|
|
|
|
context.rendered_clips = [
|
|
RenderedClip(
|
|
path=Path(path),
|
|
start=start,
|
|
end=end,
|
|
title=title,
|
|
summary=summary,
|
|
index=index,
|
|
)
|
|
for path, start, end, title, summary, index in render_results
|
|
]
|
|
|
|
def _build_success_payload(self, context: PipelineContext) -> Dict[str, Any]:
|
|
return {
|
|
"hasError": False,
|
|
"videosProcessedQuantity": len(context.rendered_clips),
|
|
"filename": context.job.filename,
|
|
"videoId": context.job.video_id,
|
|
"url": context.job.url,
|
|
"workspaceFolder": context.workspace.sanitized_name if context.workspace else None,
|
|
"outputDirectory": self._relative_path(context.workspace.output_dir) if context.workspace else None,
|
|
"processedFiles": [
|
|
{
|
|
"path": self._relative_path(clip.path),
|
|
"start": clip.start,
|
|
"end": clip.end,
|
|
"title": clip.title,
|
|
"summary": clip.summary,
|
|
"clipIndex": clip.index,
|
|
}
|
|
for clip in context.rendered_clips
|
|
],
|
|
}
|
|
|
|
def _handle_failure(self, context: PipelineContext, exc: Exception) -> Dict[str, Any]:
|
|
logger.error("Erro na pipeline: %s", exc)
|
|
cleanup_targets: List[Path] = []
|
|
|
|
if context.workspace:
|
|
cleanup_targets.append(context.workspace.workspace_dir)
|
|
cleanup_targets.append(context.workspace.output_dir)
|
|
original_path = context.workspace.source_path
|
|
if original_path.exists():
|
|
cleanup_targets.append(original_path)
|
|
else:
|
|
sanitized = sanitize_filename(Path(context.job.filename).stem)
|
|
job_output_dir = self.settings.outputs_dir / sanitized
|
|
if job_output_dir.exists():
|
|
cleanup_targets.append(job_output_dir)
|
|
original_path = self.settings.videos_dir / context.job.filename
|
|
if original_path.exists():
|
|
cleanup_targets.append(original_path)
|
|
|
|
remove_paths(cleanup_targets)
|
|
|
|
return {
|
|
"hasError": True,
|
|
"error": str(exc),
|
|
"filename": context.job.filename,
|
|
"videoId": context.job.video_id,
|
|
"url": context.job.url,
|
|
"processedFiles": [],
|
|
}
|
|
|
|
def _relative_path(self, path: Path) -> str:
|
|
base = self.settings.videos_dir.parent
|
|
try:
|
|
return str(path.relative_to(base))
|
|
except ValueError:
|
|
return str(path)
|