diff --git a/video_render/messaging.py b/video_render/messaging.py index b61599c..d00283c 100644 --- a/video_render/messaging.py +++ b/video_render/messaging.py @@ -13,6 +13,22 @@ logger = logging.getLogger(__name__) MessageHandler = Callable[[Dict[str, Any]], Dict[str, Any]] +def _safe_ack( + channel: pika.adapters.blocking_connection.BlockingChannel, delivery_tag +) -> bool: + if not channel.is_open: + logger.warning( + "Canal fechado antes do ACK; mensagem sera reprocessada apos reconexao" + ) + return False + try: + channel.basic_ack(delivery_tag=delivery_tag) + return True + except Exception: + logger.exception("Falha ao confirmar mensagem") + return False + + class RabbitMQWorker: def __init__(self, settings: Settings) -> None: self.settings = settings @@ -27,50 +43,59 @@ class RabbitMQWorker: ) def consume_forever(self, handler: MessageHandler) -> None: - while True: try: with pika.BlockingConnection(self._params) as connection: channel = connection.channel() - channel.queue_declare(queue=self.settings.rabbitmq.consume_queue, durable=True) - channel.queue_declare(queue=self.settings.rabbitmq.publish_queue, durable=True) - channel.basic_qos(prefetch_count=self.settings.rabbitmq.prefetch_count) + channel.queue_declare( + queue=self.settings.rabbitmq.consume_queue, durable=True + ) + channel.queue_declare( + queue=self.settings.rabbitmq.publish_queue, durable=True + ) + channel.basic_qos( + prefetch_count=self.settings.rabbitmq.prefetch_count + ) - def _on_message(ch: pika.adapters.blocking_connection.BlockingChannel, method, properties, body): + def _on_message( + ch: pika.adapters.blocking_connection.BlockingChannel, + method, + properties, + body, + ) -> None: + """Consume message, ACK immediately, then process.""" try: message = json.loads(body) except json.JSONDecodeError: - logger.error("Mensagem inválida recebida: %s", body) - ch.basic_ack(delivery_tag=method.delivery_tag) + logger.error("Mensagem invalida recebida: %s", body) + _safe_ack(ch, method.delivery_tag) return - logger.info("Mensagem recebida: %s", message.get("filename", "")) + if not _safe_ack(ch, method.delivery_tag): + logger.warning( + "Nao foi possivel confirmar mensagem; abortando processamento" + ) + return + + logger.info( + "Mensagem recebida: %s", + message.get("filename", ""), + ) + try: response = handler(message) except Exception: - logger.exception("Erro não tratado durante o processamento") + logger.exception("Erro nao tratado durante o processamento") response = { "hasError": True, - "error": "Erro não tratado no pipeline", + "error": "Erro nao tratado no pipeline", "filename": message.get("filename"), "videoId": message.get("videoId"), "url": message.get("url"), "processedFiles": [], } - try: - payload = json.dumps(response) - ch.basic_publish( - exchange="", - routing_key=self.settings.rabbitmq.publish_queue, - body=payload, - properties=pika.BasicProperties(delivery_mode=2), - ) - logger.info("Resposta publicada para '%s'", self.settings.rabbitmq.publish_queue) - except Exception: - logger.exception("Falha ao publicar a resposta na fila de upload") - finally: - ch.basic_ack(delivery_tag=method.delivery_tag) + self._publish_response(response) channel.basic_consume( queue=self.settings.rabbitmq.consume_queue, @@ -80,7 +105,32 @@ class RabbitMQWorker: logger.info("Consumidor iniciado. Aguardando mensagens...") channel.start_consuming() except pika.exceptions.AMQPConnectionError: - logger.exception("Conexão com RabbitMQ perdida. Tentando reconectar...") + logger.exception( + "Conexao com RabbitMQ perdida. Tentando reconectar..." + ) + except pika.exceptions.AMQPError: + logger.exception("Erro AMQP inesperado. Reiniciando consumo...") except KeyboardInterrupt: - logger.info("Encerrando consumidor por interrupção do usuário.") + logger.info("Encerrando consumidor por interrupcao do usuario.") break + + def _publish_response(self, response: Dict[str, Any]) -> None: + payload = json.dumps(response) + try: + with pika.BlockingConnection(self._params) as publish_connection: + publish_channel = publish_connection.channel() + publish_channel.queue_declare( + queue=self.settings.rabbitmq.publish_queue, durable=True + ) + publish_channel.basic_publish( + exchange="", + routing_key=self.settings.rabbitmq.publish_queue, + body=payload, + properties=pika.BasicProperties(delivery_mode=2), + ) + logger.info( + "Resposta publicada para '%s'", + self.settings.rabbitmq.publish_queue, + ) + except Exception: + logger.exception("Falha ao publicar a resposta na fila de upload apos ACK") diff --git a/video_render/pipeline.py b/video_render/pipeline.py index c8e309e..4401771 100644 --- a/video_render/pipeline.py +++ b/video_render/pipeline.py @@ -93,6 +93,15 @@ class VideoPipeline: def _generate_transcription(self, context: PipelineContext) -> None: if not context.workspace: raise RuntimeError("Workspace não preparado") + existing = TranscriptionService.load(context.workspace.workspace_dir) + if existing: + logger.info( + "Transcricao existente encontrada em %s; reutilizando resultado", + context.workspace.workspace_dir, + ) + context.transcription = existing + return + transcription = self.transcriber.transcribe(context.workspace.audio_path) TranscriptionService.persist(transcription, context.workspace.workspace_dir) context.transcription = transcription diff --git a/video_render/rendering.py b/video_render/rendering.py index 723f17d..1a80b9a 100644 --- a/video_render/rendering.py +++ b/video_render/rendering.py @@ -3,9 +3,11 @@ from __future__ import annotations import logging import re from dataclasses import dataclass -from typing import Iterable, List, Sequence, Tuple +from typing import Dict, Iterable, List, Sequence, Tuple, Optional import numpy as np +from moviepy.audio.AudioClip import AudioArrayClip, AudioClip +from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip from moviepy.video.io.VideoFileClip import VideoFileClip @@ -199,6 +201,7 @@ class VideoRenderer: index=index, transcription=transcription, output_dir=output_dir, + source_path=workspace_path, ) finally: subclip.close() @@ -226,13 +229,14 @@ class VideoRenderer: index: int, transcription: TranscriptionResult, output_dir, + source_path: str, ) -> str: duration = end - start frame_w = self.settings.rendering.frame_width frame_h = self.settings.rendering.frame_height top_h = int(frame_h * 0.18) bottom_h = int(frame_h * 0.20) - video_area_h = frame_h - top_h - bottom_h + video_area_h = max(1, frame_h - top_h - bottom_h) scale_factor = min( frame_w / subclip.w, @@ -257,19 +261,12 @@ class VideoRenderer: .with_opacity(0.85) ) - title_text = title or summary - wrapped_title = self._wrap_text(title_text, max_width=frame_w - 160) - title_clip = ( - TextClip( - text=wrapped_title, - font=str(self.settings.rendering.font_path), - font_size=self.settings.rendering.title_font_size, - color=self.settings.rendering.base_color, - method="caption", - size=(frame_w - 160, top_h - 40), - align="center", - ) - .with_duration(duration) + title_clip = self._build_title_clip( + title=title, + summary=summary, + duration=duration, + frame_width=frame_w, + top_panel_height=top_h, ) title_clip = title_clip.with_position( ((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2) @@ -305,43 +302,38 @@ class VideoRenderer: if not caption_clips: fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160) caption_clips.append( - TextClip( + self._make_textclip( text=fallback_text, - font=str(self.settings.rendering.font_path), + font_path=self.settings.rendering.font_path, font_size=self.settings.rendering.subtitle_font_size, color=self.settings.rendering.base_color, - method="caption", - align="center", size=(frame_w - 160, max(40, self.captions.canvas_height)), ) .with_duration(duration) .with_position(("center", caption_y)) ) + audio_clip, audio_needs_close = self._materialize_audio( + source_path=source_path, + start=start, + end=end, + duration=duration, + fallback_audio=video_clip.audio or resized_clip.audio or subclip.audio, + ) + composite = CompositeVideoClip( [background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips], size=(frame_w, frame_h), ) - video_audio = video_clip.audio or resized_clip.audio or subclip.audio - if video_audio is not None: - composite = composite.set_audio(video_audio) + if audio_clip is not None: + composite = self._with_audio(composite, audio_clip) output_path = output_dir / f"clip_{index:02d}.mp4" - composite.write_videofile( - str(output_path), - codec=self.settings.rendering.video_codec, - audio_codec=self.settings.rendering.audio_codec, - fps=self.settings.rendering.fps, - bitrate=self.settings.rendering.bitrate, - ffmpeg_params=[ - "-preset", - self.settings.rendering.preset, - "-pix_fmt", - "yuv420p", - ], - temp_audiofile=str(output_dir / f"temp_audio_{index:02d}.m4a"), - remove_temp=True, - threads=4, + self._write_with_fallback( + composite=composite, + output_path=output_path, + index=index, + output_dir=output_dir, ) composite.close() @@ -355,9 +347,128 @@ class VideoRenderer: clip.close() for clip in caption_resources: clip.close() + if audio_clip is not None and audio_needs_close: + audio_clip.close() return str(output_path) + def _build_title_clip( + self, + *, + title: str, + summary: str, + duration: float, + frame_width: int, + top_panel_height: int, + ) -> ImageClip: + text = (title or summary or "").strip() + if not text: + text = summary or "" + + max_width = max(200, frame_width - 160) + font_size = self.settings.rendering.title_font_size + min_font_size = max(28, int(font_size * 0.6)) + target_height = max(80, top_panel_height - 40) + title_color = ImageColor.getrgb(self.settings.rendering.base_color) + font_path = self.settings.rendering.font_path + + while True: + font = ImageFont.truetype(str(font_path), font_size) + lines = self._split_title_lines(text, font, max_width) + line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1] + spacing = max(4, int(line_height * 0.25)) + text_height = self._measure_text_height(len(lines), line_height, spacing) + + if text_height <= target_height or font_size <= min_font_size: + break + + font_size = max(min_font_size, font_size - 6) + + # Recompute dimensions with final font size to ensure consistency + font = ImageFont.truetype(str(font_path), font_size) + lines = self._split_title_lines(text, font, max_width) + line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1] + spacing = max(4, int(line_height * 0.25)) + text_height = self._measure_text_height(len(lines), line_height, spacing) + canvas_height = max(1, text_height) + + image = Image.new("RGBA", (max_width, canvas_height), (0, 0, 0, 0)) + draw = ImageDraw.Draw(image) + y = 0 + for idx, line in enumerate(lines): + bbox = font.getbbox(line) + line_width = bbox[2] - bbox[0] + x = max(0, (max_width - line_width) // 2) + draw.text((x, y - bbox[1]), line, font=font, fill=title_color) + y += line_height + if idx < len(lines) - 1: + y += spacing + + return ImageClip(np.array(image)).with_duration(duration) + + @staticmethod + def _measure_text_height(line_count: int, line_height: int, spacing: int) -> int: + if line_count <= 0: + return line_height + return line_count * line_height + max(0, line_count - 1) * spacing + + @staticmethod + def _split_title_lines( + text: str, font: ImageFont.FreeTypeFont, max_width: int + ) -> List[str]: + words = text.split() + if not words: + return [""] + + lines: List[str] = [] + current: List[str] = [] + for word in words: + test_line = " ".join(current + [word]) if current else word + bbox = font.getbbox(test_line) + line_width = bbox[2] - bbox[0] + if line_width <= max_width or not current: + current.append(word) + if line_width > max_width and not current[:-1]: + lines.append(" ".join(current)) + current = [] + continue + + lines.append(" ".join(current)) + current = [word] + + if current: + lines.append(" ".join(current)) + + return lines + + def _materialize_audio( + self, + *, + source_path: str, + start: float, + end: float, + duration: float, + fallback_audio, + ) -> Tuple[Optional[AudioClip], bool]: + try: + with AudioFileClip(source_path) as audio_file: + segment = audio_file.subclipped(start, end) + fps = ( + getattr(segment, "fps", None) + or getattr(audio_file, "fps", None) + or 44100 + ) + samples = segment.to_soundarray(fps=fps) + except Exception: + logger.warning( + "Falha ao carregar audio independente; utilizando fluxo original", + exc_info=True, + ) + return fallback_audio, False + + audio_clip = AudioArrayClip(samples, fps=fps).with_duration(duration) + return audio_clip, True + def _collect_words( self, transcription: TranscriptionResult, start: float, end: float ) -> List[WordTiming]: @@ -424,3 +535,120 @@ class VideoRenderer: if current: lines.append(" ".join(current)) return "\n".join(lines) + + def _write_with_fallback( + self, + *, + composite: CompositeVideoClip, + output_path, + index: int, + output_dir, + ) -> None: + attempts = self._encoding_attempts() + temp_audio_path = output_dir / f"temp_audio_{index:02d}.m4a" + last_error: Exception | None = None + + for attempt in attempts: + codec = attempt["codec"] + bitrate = attempt["bitrate"] + preset = attempt["preset"] + + ffmpeg_params = ["-pix_fmt", "yuv420p"] + if preset: + ffmpeg_params = ["-preset", preset, "-pix_fmt", "yuv420p"] + + try: + logger.info( + "Renderizando clip %02d com codec %s (bitrate=%s, preset=%s)", + index, + codec, + bitrate, + preset or "default", + ) + composite.write_videofile( + str(output_path), + codec=codec, + audio_codec=self.settings.rendering.audio_codec, + fps=self.settings.rendering.fps, + bitrate=bitrate, + ffmpeg_params=ffmpeg_params, + temp_audiofile=str(temp_audio_path), + remove_temp=True, + threads=4, + ) + return + except Exception as exc: # noqa: BLE001 - propagate after fallbacks + last_error = exc + logger.warning( + "Falha ao renderizar com codec %s: %s", codec, exc, exc_info=True + ) + if output_path.exists(): + output_path.unlink(missing_ok=True) + if temp_audio_path.exists(): + temp_audio_path.unlink(missing_ok=True) + + raise RuntimeError("Todas as tentativas de renderizacao falharam") from last_error + + def _encoding_attempts(self) -> List[Dict[str, str | None]]: + settings = self.settings.rendering + attempts: List[Dict[str, str | None]] = [] + + attempts.append( + { + "codec": settings.video_codec, + "bitrate": settings.bitrate, + "preset": settings.preset, + } + ) + + deduped: List[Dict[str, str | None]] = [] + seen = set() + for attempt in attempts: + key = (attempt["codec"], attempt["bitrate"], attempt["preset"]) + if key in seen: + continue + seen.add(key) + deduped.append(attempt) + + return deduped + + @staticmethod + def _with_audio( + composite: CompositeVideoClip, + audio_clip, + ) -> CompositeVideoClip: + """Attach audio to a composite clip across MoviePy versions.""" + if hasattr(composite, "with_audio"): + return composite.with_audio(audio_clip) + if hasattr(composite, "set_audio"): + return composite.set_audio(audio_clip) + raise AttributeError("CompositeVideoClip does not support audio assignment") + + @staticmethod + def _make_textclip( + *, + text: str, + font_path, + font_size: int, + color: str, + size: Tuple[int, int], + ) -> TextClip: + """Create a TextClip compatible with MoviePy 1.x and 2.x. + + MoviePy 2.x removed the 'align' keyword from TextClip. We try with + 'align' for older versions and fall back to a call without it when + unsupported. + """ + kwargs = dict( + text=text, + font=str(font_path), + font_size=font_size, + color=color, + method="caption", + size=size, + ) + try: + return TextClip(**kwargs, align="center") # MoviePy 1.x style + except TypeError: + logger.debug("TextClip 'align' not supported; falling back without it") + return TextClip(**kwargs) # MoviePy 2.x style diff --git a/video_render/transcription.py b/video_render/transcription.py index b5d86db..a175659 100644 --- a/video_render/transcription.py +++ b/video_render/transcription.py @@ -118,5 +118,75 @@ class TranscriptionService: with text_path.open("w", encoding="utf-8") as fp: fp.write(result.full_text) - logger.info("Transcrição salva em %s", destination) + logger.info("Transcricao salva em %s", destination) + + @staticmethod + def load(source: Path) -> Optional[TranscriptionResult]: + json_path = source / "transcription.json" + if not json_path.exists(): + return None + + try: + with json_path.open("r", encoding="utf-8") as fp: + payload = json.load(fp) + except (OSError, json.JSONDecodeError) as exc: + logger.warning( + "Falha ao carregar transcricao existente de %s: %s", json_path, exc + ) + return None + + segments_payload = payload.get("segments", []) + if not isinstance(segments_payload, list): + logger.warning( + "Formato inesperado ao carregar transcricao de %s: 'segments' invalido", + json_path, + ) + return None + + segments: List[TranscriptSegment] = [] + for idx, segment_data in enumerate(segments_payload): + if not isinstance(segment_data, dict): + logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data) + continue + try: + segment_id = int(segment_data.get("id", idx)) + start = float(segment_data["start"]) + end = float(segment_data["end"]) + except (KeyError, TypeError, ValueError): + logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data) + continue + + text = str(segment_data.get("text", "")).strip() + words_payload = segment_data.get("words", []) + words: List[WordTiming] = [] + + if isinstance(words_payload, list): + for word_data in words_payload: + if not isinstance(word_data, dict): + continue + try: + w_start = float(word_data["start"]) + w_end = float(word_data["end"]) + except (KeyError, TypeError, ValueError): + logger.debug( + "Palavra sem dados obrigatorios ignorada: %s", word_data + ) + continue + word_text = str(word_data.get("text", "")).strip() + if not word_text: + continue + words.append(WordTiming(start=w_start, end=w_end, word=word_text)) + + segments.append( + TranscriptSegment( + id=segment_id, + start=start, + end=end, + text=text, + words=words, + ) + ) + + full_text = str(payload.get("full_text", "")).strip() + return TranscriptionResult(segments=segments, full_text=full_text)