Files
video-render/video_render/rendering.py
2025-10-28 17:34:13 -03:00

655 lines
23 KiB
Python

from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from typing import Dict, Iterable, List, Sequence, Tuple, Optional
import numpy as np
from moviepy.audio.AudioClip import AudioArrayClip, AudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.VideoClip import ColorClip, ImageClip, TextClip
from moviepy.video.compositing.CompositeVideoClip import CompositeVideoClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from PIL import Image, ImageColor, ImageDraw, ImageFont
from video_render.config import Settings
from video_render.transcription import TranscriptionResult, WordTiming
logger = logging.getLogger(__name__)
def clamp_time(value: float, minimum: float = 0.0) -> float:
return max(minimum, float(value))
@dataclass
class CaptionClipSet:
base: ImageClip
highlights: List[ImageClip]
class CaptionBuilder:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.font_path = settings.rendering.font_path
if not self.font_path.exists():
raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}")
self.font = ImageFont.truetype(
str(self.font_path), settings.rendering.subtitle_font_size
)
self.base_color = ImageColor.getrgb(settings.rendering.base_color)
self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color)
self.canvas_width = settings.rendering.frame_width - 160
self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2)
self.min_words = settings.rendering.caption_min_words
self.max_words = settings.rendering.caption_max_words
bbox = self.font.getbbox("Ay")
self.text_height = bbox[3] - bbox[1]
self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1]
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
grouped = self._group_words(words)
clip_sets: List[CaptionClipSet] = []
for group in grouped:
group_start = clamp_time(group[0].start, minimum=clip_start)
group_end = clamp_time(group[-1].end, minimum=group_start + 0.05)
duration = max(0.05, group_end - group_start)
start_offset = group_start - clip_start
base_image, highlight_images = self._render_group(group)
base_clip = (
ImageClip(np.array(base_image))
.with_start(start_offset)
.with_duration(duration)
)
highlight_clips: List[ImageClip] = []
for word, image in zip(group, highlight_images):
h_start = clamp_time(word.start, minimum=clip_start) - clip_start
h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start
h_duration = max(0.05, h_end - h_start)
highlight_clip = (
ImageClip(np.array(image))
.with_start(h_start)
.with_duration(h_duration)
)
highlight_clips.append(highlight_clip)
clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips))
return clip_sets
def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]:
texts = [self._clean_word(word.word) for word in group]
widths = []
for text in texts:
bbox = self.font.getbbox(text)
widths.append(bbox[2] - bbox[0])
total_width = sum(widths)
if len(widths) > 1:
total_width += self.space_width * (len(widths) - 1)
start_x = max(0, (self.canvas_width - total_width) // 2)
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
base_draw = ImageDraw.Draw(base_image)
highlight_images: List[Image.Image] = []
x = start_x
for text, width in zip(texts, widths):
base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color)
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
highlight_draw = ImageDraw.Draw(highlight_image)
highlight_draw.text(
(x, self.baseline), text, font=self.font, fill=self.highlight_color
)
highlight_images.append(highlight_image)
x += width + self.space_width
return base_image, highlight_images
def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
if not words:
return []
grouped: List[List[WordTiming]] = []
buffer: List[WordTiming] = []
for word in words:
buffer.append(word)
if len(buffer) == self.max_words:
grouped.append(buffer)
buffer = []
if buffer:
if len(buffer) == 1 and grouped:
grouped[-1].extend(buffer)
else:
grouped.append(buffer)
for idx, group in enumerate(grouped[:-1]):
if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words:
deficit = self.min_words - len(group)
transfer = grouped[idx + 1][:deficit]
grouped[idx] = group + transfer
grouped[idx + 1] = grouped[idx + 1][deficit:]
grouped = [grp for grp in grouped if grp]
return grouped
@staticmethod
def _clean_word(text: str) -> str:
text = text.strip()
text = re.sub(r"\s+", " ", text)
return text or "..."
class VideoRenderer:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self.captions = CaptionBuilder(settings)
def render(
self,
workspace_path: str,
highlight_windows: Sequence,
transcription: TranscriptionResult,
titles: Sequence[str],
output_dir,
) -> List[Tuple[str, float, float, str, str, int]]:
results: List[Tuple[str, float, float, str, str, int]] = []
with VideoFileClip(workspace_path) as base_clip:
video_duration = base_clip.duration or 0
for index, window in enumerate(highlight_windows, start=1):
start = clamp_time(window.start)
end = clamp_time(window.end)
start = min(start, video_duration)
end = min(end, video_duration)
if end <= start:
logger.info("Janela ignorada por intervalo invalido: %s", window)
continue
subclip = base_clip.subclipped(start, end)
try:
rendered_path = self._render_single_clip(
subclip=subclip,
start=start,
end=end,
title=titles[index - 1] if index - 1 < len(titles) else window.summary,
summary=window.summary,
index=index,
transcription=transcription,
output_dir=output_dir,
source_path=workspace_path,
)
finally:
subclip.close()
results.append(
(
rendered_path,
float(start),
float(end),
titles[index - 1] if index - 1 < len(titles) else window.summary,
window.summary,
index,
)
)
return results
def _render_single_clip(
self,
subclip: VideoFileClip,
start: float,
end: float,
title: str,
summary: str,
index: int,
transcription: TranscriptionResult,
output_dir,
source_path: str,
) -> str:
duration = end - start
frame_w = self.settings.rendering.frame_width
frame_h = self.settings.rendering.frame_height
top_h = int(frame_h * 0.18)
bottom_h = int(frame_h * 0.20)
video_area_h = max(1, frame_h - top_h - bottom_h)
scale_factor = min(
frame_w / subclip.w,
video_area_h / subclip.h,
)
resized_clip = subclip.resized(scale_factor)
video_y = top_h + (video_area_h - resized_clip.h) // 2
video_clip = resized_clip.with_position(
((frame_w - resized_clip.w) // 2, video_y)
)
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
top_panel = (
ColorClip(size=(frame_w, top_h), color=(12, 12, 12))
.with_duration(duration)
.with_opacity(0.85)
)
bottom_panel = (
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
.with_position((0, frame_h - bottom_h))
.with_duration(duration)
.with_opacity(0.85)
)
title_clip = self._build_title_clip(
title=title,
summary=summary,
duration=duration,
frame_width=frame_w,
top_panel_height=top_h,
)
title_clip = title_clip.with_position(
((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2)
)
words = self._collect_words(transcription, start, end)
caption_sets = self.captions.build(words, clip_start=start)
caption_clips = []
caption_resources: List[ImageClip] = []
caption_area_top = frame_h - bottom_h
caption_area_height = bottom_h
caption_margin = 20
raw_caption_y = caption_area_top + (caption_area_height - self.captions.canvas_height) // 2
min_caption_y = caption_area_top + caption_margin
max_caption_y = (
caption_area_top + caption_area_height - self.captions.canvas_height - caption_margin
)
if max_caption_y < min_caption_y:
caption_y = min_caption_y
else:
caption_y = min(max(raw_caption_y, min_caption_y), max_caption_y)
for clip_set in caption_sets:
base_positioned = clip_set.base.with_position(("center", caption_y))
caption_clips.append(base_positioned)
caption_resources.append(clip_set.base)
for highlight in clip_set.highlights:
positioned = highlight.with_position(("center", caption_y))
caption_clips.append(positioned)
caption_resources.append(highlight)
if not caption_clips:
fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160)
caption_clips.append(
self._make_textclip(
text=fallback_text,
font_path=self.settings.rendering.font_path,
font_size=self.settings.rendering.subtitle_font_size,
color=self.settings.rendering.base_color,
size=(frame_w - 160, max(40, self.captions.canvas_height)),
)
.with_duration(duration)
.with_position(("center", caption_y))
)
audio_clip, audio_needs_close = self._materialize_audio(
source_path=source_path,
start=start,
end=end,
duration=duration,
fallback_audio=video_clip.audio or resized_clip.audio or subclip.audio,
)
composite = CompositeVideoClip(
[background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips],
size=(frame_w, frame_h),
)
if audio_clip is not None:
composite = self._with_audio(composite, audio_clip)
output_path = output_dir / f"clip_{index:02d}.mp4"
self._write_with_fallback(
composite=composite,
output_path=output_path,
index=index,
output_dir=output_dir,
)
composite.close()
resized_clip.close()
video_clip.close()
title_clip.close()
background.close()
top_panel.close()
bottom_panel.close()
for clip in caption_clips:
clip.close()
for clip in caption_resources:
clip.close()
if audio_clip is not None and audio_needs_close:
audio_clip.close()
return str(output_path)
def _build_title_clip(
self,
*,
title: str,
summary: str,
duration: float,
frame_width: int,
top_panel_height: int,
) -> ImageClip:
text = (title or summary or "").strip()
if not text:
text = summary or ""
max_width = max(200, frame_width - 160)
font_size = self.settings.rendering.title_font_size
min_font_size = max(28, int(font_size * 0.6))
target_height = max(80, top_panel_height - 40)
title_color = ImageColor.getrgb(self.settings.rendering.base_color)
font_path = self.settings.rendering.font_path
while True:
font = ImageFont.truetype(str(font_path), font_size)
lines = self._split_title_lines(text, font, max_width)
line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1]
spacing = max(4, int(line_height * 0.25))
text_height = self._measure_text_height(len(lines), line_height, spacing)
if text_height <= target_height or font_size <= min_font_size:
break
font_size = max(min_font_size, font_size - 6)
# Recompute dimensions with final font size to ensure consistency
font = ImageFont.truetype(str(font_path), font_size)
lines = self._split_title_lines(text, font, max_width)
line_height = font.getbbox("Ay")[3] - font.getbbox("Ay")[1]
spacing = max(4, int(line_height * 0.25))
text_height = self._measure_text_height(len(lines), line_height, spacing)
canvas_height = max(1, text_height)
image = Image.new("RGBA", (max_width, canvas_height), (0, 0, 0, 0))
draw = ImageDraw.Draw(image)
y = 0
for idx, line in enumerate(lines):
bbox = font.getbbox(line)
line_width = bbox[2] - bbox[0]
x = max(0, (max_width - line_width) // 2)
draw.text((x, y - bbox[1]), line, font=font, fill=title_color)
y += line_height
if idx < len(lines) - 1:
y += spacing
return ImageClip(np.array(image)).with_duration(duration)
@staticmethod
def _measure_text_height(line_count: int, line_height: int, spacing: int) -> int:
if line_count <= 0:
return line_height
return line_count * line_height + max(0, line_count - 1) * spacing
@staticmethod
def _split_title_lines(
text: str, font: ImageFont.FreeTypeFont, max_width: int
) -> List[str]:
words = text.split()
if not words:
return [""]
lines: List[str] = []
current: List[str] = []
for word in words:
test_line = " ".join(current + [word]) if current else word
bbox = font.getbbox(test_line)
line_width = bbox[2] - bbox[0]
if line_width <= max_width or not current:
current.append(word)
if line_width > max_width and not current[:-1]:
lines.append(" ".join(current))
current = []
continue
lines.append(" ".join(current))
current = [word]
if current:
lines.append(" ".join(current))
return lines
def _materialize_audio(
self,
*,
source_path: str,
start: float,
end: float,
duration: float,
fallback_audio,
) -> Tuple[Optional[AudioClip], bool]:
try:
with AudioFileClip(source_path) as audio_file:
segment = audio_file.subclipped(start, end)
fps = (
getattr(segment, "fps", None)
or getattr(audio_file, "fps", None)
or 44100
)
samples = segment.to_soundarray(fps=fps)
except Exception:
logger.warning(
"Falha ao carregar audio independente; utilizando fluxo original",
exc_info=True,
)
return fallback_audio, False
audio_clip = AudioArrayClip(samples, fps=fps).with_duration(duration)
return audio_clip, True
def _collect_words(
self, transcription: TranscriptionResult, start: float, end: float
) -> List[WordTiming]:
collected: List[WordTiming] = []
for segment in transcription.segments:
if segment.end < start or segment.start > end:
continue
if segment.words:
for word in segment.words:
if word.end < start or word.start > end:
continue
collected.append(
WordTiming(
start=max(start, word.start),
end=min(end, word.end),
word=word.word,
)
)
else:
collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end))
collected.sort(key=lambda w: w.start)
return collected
def _fallback_words(
self,
text: str,
segment_start: float,
segment_end: float,
window_start: float,
window_end: float,
) -> Iterable[WordTiming]:
words = [w for w in re.split(r"\s+", text.strip()) if w]
if not words:
return []
seg_start = max(segment_start, window_start)
seg_end = min(segment_end, window_end)
duration = max(0.01, seg_end - seg_start)
step = duration / len(words)
timings: List[WordTiming] = []
for idx, word in enumerate(words):
w_start = seg_start + idx * step
w_end = min(seg_end, w_start + step)
timings.append(WordTiming(start=w_start, end=w_end, word=word))
return timings
@staticmethod
def _wrap_text(text: str, max_width: int) -> str:
text = text.strip()
if not text:
return ""
words = text.split()
lines: List[str] = []
current: List[str] = []
for word in words:
current.append(word)
if len(" ".join(current)) > max_width // 18:
lines.append(" ".join(current[:-1]))
current = [current[-1]]
if current:
lines.append(" ".join(current))
return "\n".join(lines)
def _write_with_fallback(
self,
*,
composite: CompositeVideoClip,
output_path,
index: int,
output_dir,
) -> None:
attempts = self._encoding_attempts()
temp_audio_path = output_dir / f"temp_audio_{index:02d}.m4a"
last_error: Exception | None = None
for attempt in attempts:
codec = attempt["codec"]
bitrate = attempt["bitrate"]
preset = attempt["preset"]
ffmpeg_params = ["-pix_fmt", "yuv420p"]
if preset:
ffmpeg_params = ["-preset", preset, "-pix_fmt", "yuv420p"]
try:
logger.info(
"Renderizando clip %02d com codec %s (bitrate=%s, preset=%s)",
index,
codec,
bitrate,
preset or "default",
)
composite.write_videofile(
str(output_path),
codec=codec,
audio_codec=self.settings.rendering.audio_codec,
fps=self.settings.rendering.fps,
bitrate=bitrate,
ffmpeg_params=ffmpeg_params,
temp_audiofile=str(temp_audio_path),
remove_temp=True,
threads=4,
)
return
except Exception as exc: # noqa: BLE001 - propagate after fallbacks
last_error = exc
logger.warning(
"Falha ao renderizar com codec %s: %s", codec, exc, exc_info=True
)
if output_path.exists():
output_path.unlink(missing_ok=True)
if temp_audio_path.exists():
temp_audio_path.unlink(missing_ok=True)
raise RuntimeError("Todas as tentativas de renderizacao falharam") from last_error
def _encoding_attempts(self) -> List[Dict[str, str | None]]:
settings = self.settings.rendering
attempts: List[Dict[str, str | None]] = []
attempts.append(
{
"codec": settings.video_codec,
"bitrate": settings.bitrate,
"preset": settings.preset,
}
)
deduped: List[Dict[str, str | None]] = []
seen = set()
for attempt in attempts:
key = (attempt["codec"], attempt["bitrate"], attempt["preset"])
if key in seen:
continue
seen.add(key)
deduped.append(attempt)
return deduped
@staticmethod
def _with_audio(
composite: CompositeVideoClip,
audio_clip,
) -> CompositeVideoClip:
"""Attach audio to a composite clip across MoviePy versions."""
if hasattr(composite, "with_audio"):
return composite.with_audio(audio_clip)
if hasattr(composite, "set_audio"):
return composite.set_audio(audio_clip)
raise AttributeError("CompositeVideoClip does not support audio assignment")
@staticmethod
def _make_textclip(
*,
text: str,
font_path,
font_size: int,
color: str,
size: Tuple[int, int],
) -> TextClip:
"""Create a TextClip compatible with MoviePy 1.x and 2.x.
MoviePy 2.x removed the 'align' keyword from TextClip. We try with
'align' for older versions and fall back to a call without it when
unsupported.
"""
kwargs = dict(
text=text,
font=str(font_path),
font_size=font_size,
color=color,
method="caption",
size=size,
)
try:
return TextClip(**kwargs, align="center") # MoviePy 1.x style
except TypeError:
logger.debug("TextClip 'align' not supported; falling back without it")
return TextClip(**kwargs) # MoviePy 2.x style