from __future__ import annotations import logging import re from dataclasses import dataclass from typing import Iterable, List, Sequence, Tuple import numpy as np from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip from moviepy.video.io.VideoFileClip import VideoFileClip from PIL import Image, ImageColor, ImageDraw, ImageFont from video_render.config import Settings from video_render.transcription import TranscriptionResult, WordTiming logger = logging.getLogger(__name__) def clamp_time(value: float, minimum: float = 0.0) -> float: return max(minimum, float(value)) @dataclass class CaptionClipSet: base: ImageClip highlights: List[ImageClip] class CaptionBuilder: def __init__(self, settings: Settings) -> None: self.settings = settings self.font_path = settings.rendering.font_path if not self.font_path.exists(): raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}") self.font = ImageFont.truetype( str(self.font_path), settings.rendering.subtitle_font_size ) self.base_color = ImageColor.getrgb(settings.rendering.base_color) self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color) self.canvas_width = settings.rendering.frame_width - 160 self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2) self.min_words = settings.rendering.caption_min_words self.max_words = settings.rendering.caption_max_words bbox = self.font.getbbox("Ay") self.text_height = bbox[3] - bbox[1] self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1] self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0] def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]: grouped = self._group_words(words) clip_sets: List[CaptionClipSet] = [] for group in grouped: group_start = clamp_time(group[0].start, minimum=clip_start) group_end = clamp_time(group[-1].end, minimum=group_start + 0.05) duration = max(0.05, group_end - group_start) start_offset = group_start - clip_start base_image, highlight_images = self._render_group(group) base_clip = ( ImageClip(np.array(base_image)) .with_start(start_offset) .with_duration(duration) ) highlight_clips: List[ImageClip] = [] for word, image in zip(group, highlight_images): h_start = clamp_time(word.start, minimum=clip_start) - clip_start h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start h_duration = max(0.05, h_end - h_start) highlight_clip = ( ImageClip(np.array(image)) .with_start(h_start) .with_duration(h_duration) ) highlight_clips.append(highlight_clip) clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips)) return clip_sets def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]: texts = [self._clean_word(word.word) for word in group] widths = [] for text in texts: bbox = self.font.getbbox(text) widths.append(bbox[2] - bbox[0]) total_width = sum(widths) if len(widths) > 1: total_width += self.space_width * (len(widths) - 1) start_x = max(0, (self.canvas_width - total_width) // 2) base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0)) base_draw = ImageDraw.Draw(base_image) highlight_images: List[Image.Image] = [] x = start_x for text, width in zip(texts, widths): base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color) highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0)) highlight_draw = ImageDraw.Draw(highlight_image) highlight_draw.text( (x, self.baseline), text, font=self.font, fill=self.highlight_color ) highlight_images.append(highlight_image) x += width + self.space_width return base_image, highlight_images def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]: if not words: return [] grouped: List[List[WordTiming]] = [] buffer: List[WordTiming] = [] for word in words: buffer.append(word) if len(buffer) == self.max_words: grouped.append(buffer) buffer = [] if buffer: if len(buffer) == 1 and grouped: grouped[-1].extend(buffer) else: grouped.append(buffer) for idx, group in enumerate(grouped[:-1]): if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words: deficit = self.min_words - len(group) transfer = grouped[idx + 1][:deficit] grouped[idx] = group + transfer grouped[idx + 1] = grouped[idx + 1][deficit:] grouped = [grp for grp in grouped if grp] return grouped @staticmethod def _clean_word(text: str) -> str: text = text.strip() text = re.sub(r"\s+", " ", text) return text or "..." class VideoRenderer: def __init__(self, settings: Settings) -> None: self.settings = settings self.captions = CaptionBuilder(settings) def render( self, workspace_path: str, highlight_windows: Sequence, transcription: TranscriptionResult, titles: Sequence[str], output_dir, ) -> List[Tuple[str, float, float, str, str, int]]: results: List[Tuple[str, float, float, str, str, int]] = [] with VideoFileClip(workspace_path) as base_clip: video_duration = base_clip.duration or 0 for index, window in enumerate(highlight_windows, start=1): start = clamp_time(window.start) end = clamp_time(window.end) start = min(start, video_duration) end = min(end, video_duration) if end <= start: logger.info("Janela ignorada por intervalo invalido: %s", window) continue subclip = base_clip.subclipped(start, end) try: rendered_path = self._render_single_clip( subclip=subclip, start=start, end=end, title=titles[index - 1] if index - 1 < len(titles) else window.summary, summary=window.summary, index=index, transcription=transcription, output_dir=output_dir, ) finally: subclip.close() results.append( ( rendered_path, float(start), float(end), titles[index - 1] if index - 1 < len(titles) else window.summary, window.summary, index, ) ) return results def _render_single_clip( self, subclip: VideoFileClip, start: float, end: float, title: str, summary: str, index: int, transcription: TranscriptionResult, output_dir, ) -> str: duration = end - start frame_w = self.settings.rendering.frame_width frame_h = self.settings.rendering.frame_height top_h = int(frame_h * 0.18) bottom_h = int(frame_h * 0.20) video_area_h = frame_h - top_h - bottom_h scale_factor = min( frame_w / subclip.w, video_area_h / subclip.h, ) resized_clip = subclip.resized(scale_factor) video_y = top_h + (video_area_h - resized_clip.h) // 2 video_clip = resized_clip.with_position( ((frame_w - resized_clip.w) // 2, video_y) ) background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration) top_panel = ( ColorClip(size=(frame_w, top_h), color=(12, 12, 12)) .with_duration(duration) .with_opacity(0.85) ) bottom_panel = ( ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12)) .with_position((0, frame_h - bottom_h)) .with_duration(duration) .with_opacity(0.85) ) title_text = title or summary wrapped_title = self._wrap_text(title_text, max_width=frame_w - 160) title_clip = ( TextClip( text=wrapped_title, font=str(self.settings.rendering.font_path), font_size=self.settings.rendering.title_font_size, color=self.settings.rendering.base_color, method="caption", size=(frame_w - 160, top_h - 40), ) .with_duration(duration) ) title_clip = title_clip.with_position( ((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2) ) words = self._collect_words(transcription, start, end) caption_sets = self.captions.build(words, clip_start=start) caption_clips = [] caption_resources: List[ImageClip] = [] caption_y = frame_h - bottom_h + (bottom_h - self.captions.canvas_height) // 2 for clip_set in caption_sets: base_positioned = clip_set.base.with_position(("center", caption_y)) caption_clips.append(base_positioned) caption_resources.append(clip_set.base) for highlight in clip_set.highlights: positioned = highlight.with_position(("center", caption_y)) caption_clips.append(positioned) caption_resources.append(highlight) if not caption_clips: fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160) caption_clips.append( TextClip( text=fallback_text, font=str(self.settings.rendering.font_path), font_size=self.settings.rendering.subtitle_font_size, color=self.settings.rendering.base_color, method="caption", size=(frame_w - 160, bottom_h - 40), ) .with_duration(duration) .with_position(("center", caption_y)) ) composite = CompositeVideoClip( [background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips], size=(frame_w, frame_h), ) output_path = output_dir / f"clip_{index:02d}.mp4" composite.write_videofile( str(output_path), codec=self.settings.rendering.video_codec, audio_codec=self.settings.rendering.audio_codec, fps=self.settings.rendering.fps, bitrate=self.settings.rendering.bitrate, ffmpeg_params=[ "-preset", self.settings.rendering.preset, "-pix_fmt", "yuv420p", ], temp_audiofile=str(output_dir / f"temp_audio_{index:02d}.m4a"), remove_temp=True, threads=4, ) composite.close() resized_clip.close() video_clip.close() title_clip.close() background.close() top_panel.close() bottom_panel.close() for clip in caption_clips: clip.close() for clip in caption_resources: clip.close() return str(output_path) def _collect_words( self, transcription: TranscriptionResult, start: float, end: float ) -> List[WordTiming]: collected: List[WordTiming] = [] for segment in transcription.segments: if segment.end < start or segment.start > end: continue if segment.words: for word in segment.words: if word.end < start or word.start > end: continue collected.append( WordTiming( start=max(start, word.start), end=min(end, word.end), word=word.word, ) ) else: collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end)) collected.sort(key=lambda w: w.start) return collected def _fallback_words( self, text: str, segment_start: float, segment_end: float, window_start: float, window_end: float, ) -> Iterable[WordTiming]: words = [w for w in re.split(r"\s+", text.strip()) if w] if not words: return [] seg_start = max(segment_start, window_start) seg_end = min(segment_end, window_end) duration = max(0.01, seg_end - seg_start) step = duration / len(words) timings: List[WordTiming] = [] for idx, word in enumerate(words): w_start = seg_start + idx * step w_end = min(seg_end, w_start + step) timings.append(WordTiming(start=w_start, end=w_end, word=word)) return timings @staticmethod def _wrap_text(text: str, max_width: int) -> str: text = text.strip() if not text: return "" words = text.split() lines: List[str] = [] current: List[str] = [] for word in words: current.append(word) if len(" ".join(current)) > max_width // 18: lines.append(" ".join(current[:-1])) current = [current[-1]] if current: lines.append(" ".join(current)) return "\n".join(lines)