407 lines
14 KiB
Python
407 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import math
|
|
import re
|
|
from dataclasses import dataclass
|
|
from typing import Iterable, List, Sequence, Tuple
|
|
|
|
import numpy as np
|
|
from moviepy.editor import (
|
|
ColorClip,
|
|
CompositeVideoClip,
|
|
ImageClip,
|
|
TextClip,
|
|
VideoFileClip,
|
|
)
|
|
from PIL import Image, ImageColor, ImageDraw, ImageFont
|
|
|
|
from video_render.config import Settings
|
|
from video_render.transcription import TranscriptionResult, WordTiming
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def clamp_time(value: float, minimum: float = 0.0) -> float:
|
|
return max(minimum, float(value))
|
|
|
|
|
|
@dataclass
|
|
class CaptionClipSet:
|
|
base: ImageClip
|
|
highlights: List[ImageClip]
|
|
|
|
|
|
class CaptionBuilder:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self.font_path = settings.rendering.font_path
|
|
if not self.font_path.exists():
|
|
raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}")
|
|
|
|
self.font = ImageFont.truetype(
|
|
str(self.font_path), settings.rendering.subtitle_font_size
|
|
)
|
|
self.base_color = ImageColor.getrgb(settings.rendering.base_color)
|
|
self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color)
|
|
self.canvas_width = settings.rendering.frame_width - 160
|
|
self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2)
|
|
self.min_words = settings.rendering.caption_min_words
|
|
self.max_words = settings.rendering.caption_max_words
|
|
|
|
bbox = self.font.getbbox("Ay")
|
|
self.text_height = bbox[3] - bbox[1]
|
|
self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1]
|
|
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
|
|
|
|
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
|
|
grouped = self._group_words(words)
|
|
clip_sets: List[CaptionClipSet] = []
|
|
|
|
for group in grouped:
|
|
group_start = clamp_time(group[0].start, minimum=clip_start)
|
|
group_end = clamp_time(group[-1].end, minimum=group_start + 0.05)
|
|
duration = max(0.05, group_end - group_start)
|
|
start_offset = group_start - clip_start
|
|
|
|
base_image, highlight_images = self._render_group(group)
|
|
|
|
base_clip = (
|
|
ImageClip(np.array(base_image))
|
|
.with_start(start_offset)
|
|
.with_duration(duration)
|
|
)
|
|
|
|
highlight_clips: List[ImageClip] = []
|
|
for word, image in zip(group, highlight_images):
|
|
h_start = clamp_time(word.start, minimum=clip_start) - clip_start
|
|
h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start
|
|
h_duration = max(0.05, h_end - h_start)
|
|
highlight_clip = (
|
|
ImageClip(np.array(image))
|
|
.with_start(h_start)
|
|
.with_duration(h_duration)
|
|
)
|
|
highlight_clips.append(highlight_clip)
|
|
|
|
clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips))
|
|
|
|
return clip_sets
|
|
|
|
def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]:
|
|
texts = [self._clean_word(word.word) for word in group]
|
|
|
|
widths = []
|
|
for text in texts:
|
|
bbox = self.font.getbbox(text)
|
|
widths.append(bbox[2] - bbox[0])
|
|
|
|
total_width = sum(widths)
|
|
if len(widths) > 1:
|
|
total_width += self.space_width * (len(widths) - 1)
|
|
|
|
start_x = max(0, (self.canvas_width - total_width) // 2)
|
|
|
|
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
|
|
base_draw = ImageDraw.Draw(base_image)
|
|
highlight_images: List[Image.Image] = []
|
|
|
|
x = start_x
|
|
for text, width in zip(texts, widths):
|
|
base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color)
|
|
|
|
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
|
|
highlight_draw = ImageDraw.Draw(highlight_image)
|
|
highlight_draw.text(
|
|
(x, self.baseline), text, font=self.font, fill=self.highlight_color
|
|
)
|
|
highlight_images.append(highlight_image)
|
|
|
|
x += width + self.space_width
|
|
|
|
return base_image, highlight_images
|
|
|
|
def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
|
|
if not words:
|
|
return []
|
|
|
|
grouped: List[List[WordTiming]] = []
|
|
buffer: List[WordTiming] = []
|
|
|
|
for word in words:
|
|
buffer.append(word)
|
|
if len(buffer) == self.max_words:
|
|
grouped.append(buffer)
|
|
buffer = []
|
|
|
|
if buffer:
|
|
if len(buffer) == 1 and grouped:
|
|
grouped[-1].extend(buffer)
|
|
else:
|
|
grouped.append(buffer)
|
|
|
|
# Rebalance groups to respect minimum size when possible
|
|
for idx, group in enumerate(grouped[:-1]):
|
|
if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words:
|
|
deficit = self.min_words - len(group)
|
|
transfer = grouped[idx + 1][:deficit]
|
|
grouped[idx] = group + transfer
|
|
grouped[idx + 1] = grouped[idx + 1][deficit:]
|
|
|
|
grouped = [grp for grp in grouped if grp]
|
|
return grouped
|
|
|
|
@staticmethod
|
|
def _clean_word(text: str) -> str:
|
|
text = text.strip()
|
|
text = re.sub(r"\s+", " ", text)
|
|
return text or "..."
|
|
|
|
|
|
class VideoRenderer:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self.captions = CaptionBuilder(settings)
|
|
|
|
def render(
|
|
self,
|
|
workspace_path: str,
|
|
highlight_windows: Sequence,
|
|
transcription: TranscriptionResult,
|
|
titles: Sequence[str],
|
|
output_dir,
|
|
) -> List[Tuple[str, float, float, str, str, int]]:
|
|
results: List[Tuple[str, float, float, str, str, int]] = []
|
|
|
|
with VideoFileClip(workspace_path) as base_clip:
|
|
video_duration = base_clip.duration or 0
|
|
for index, window in enumerate(highlight_windows, start=1):
|
|
start = clamp_time(window.start)
|
|
end = clamp_time(window.end)
|
|
start = min(start, video_duration)
|
|
end = min(end, video_duration)
|
|
if end <= start:
|
|
logger.info("Janela ignorada por intervalo invalido: %s", window)
|
|
continue
|
|
|
|
subclip = base_clip.subclipped(start, end)
|
|
try:
|
|
rendered_path = self._render_single_clip(
|
|
subclip=subclip,
|
|
start=start,
|
|
end=end,
|
|
title=titles[index - 1] if index - 1 < len(titles) else window.summary,
|
|
summary=window.summary,
|
|
index=index,
|
|
transcription=transcription,
|
|
output_dir=output_dir,
|
|
)
|
|
finally:
|
|
subclip.close()
|
|
|
|
results.append(
|
|
(
|
|
rendered_path,
|
|
float(start),
|
|
float(end),
|
|
titles[index - 1] if index - 1 < len(titles) else window.summary,
|
|
window.summary,
|
|
index,
|
|
)
|
|
)
|
|
|
|
return results
|
|
|
|
def _render_single_clip(
|
|
self,
|
|
subclip: VideoFileClip,
|
|
start: float,
|
|
end: float,
|
|
title: str,
|
|
summary: str,
|
|
index: int,
|
|
transcription: TranscriptionResult,
|
|
output_dir,
|
|
) -> str:
|
|
duration = end - start
|
|
frame_w = self.settings.rendering.frame_width
|
|
frame_h = self.settings.rendering.frame_height
|
|
top_h = int(frame_h * 0.18)
|
|
bottom_h = int(frame_h * 0.20)
|
|
video_area_h = frame_h - top_h - bottom_h
|
|
|
|
scale_factor = min(
|
|
frame_w / subclip.w,
|
|
video_area_h / subclip.h,
|
|
)
|
|
resized_clip = subclip.resized(scale_factor)
|
|
video_y = top_h + (video_area_h - resized_clip.h) // 2
|
|
|
|
video_clip = resized_clip.with_position(
|
|
((frame_w - resized_clip.w) // 2, video_y)
|
|
)
|
|
|
|
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
|
|
top_panel = (
|
|
ColorClip(size=(frame_w, top_h), color=(12, 12, 12))
|
|
.with_duration(duration)
|
|
.with_opacity(0.85)
|
|
)
|
|
bottom_panel = (
|
|
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
|
|
.with_position((0, frame_h - bottom_h))
|
|
.with_duration(duration)
|
|
.with_opacity(0.85)
|
|
)
|
|
|
|
title_text = title or summary
|
|
wrapped_title = self._wrap_text(title_text, max_width=frame_w - 160)
|
|
title_clip = (
|
|
TextClip(
|
|
text=wrapped_title,
|
|
font=str(self.settings.rendering.font_path),
|
|
font_size=self.settings.rendering.title_font_size,
|
|
color=self.settings.rendering.base_color,
|
|
method="caption",
|
|
size=(frame_w - 160, top_h - 40),
|
|
)
|
|
.with_duration(duration)
|
|
)
|
|
title_clip = title_clip.with_position(
|
|
((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2)
|
|
)
|
|
|
|
words = self._collect_words(transcription, start, end)
|
|
caption_sets = self.captions.build(words, clip_start=start)
|
|
|
|
caption_clips = []
|
|
caption_resources: List[ImageClip] = []
|
|
caption_y = frame_h - bottom_h + (bottom_h - self.captions.canvas_height) // 2
|
|
for clip_set in caption_sets:
|
|
base_positioned = clip_set.base.with_position(("center", caption_y))
|
|
caption_clips.append(base_positioned)
|
|
caption_resources.append(clip_set.base)
|
|
for highlight in clip_set.highlights:
|
|
positioned = highlight.with_position(("center", caption_y))
|
|
caption_clips.append(positioned)
|
|
caption_resources.append(highlight)
|
|
|
|
if not caption_clips:
|
|
fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160)
|
|
caption_clips.append(
|
|
TextClip(
|
|
text=fallback_text,
|
|
font=str(self.settings.rendering.font_path),
|
|
font_size=self.settings.rendering.subtitle_font_size,
|
|
color=self.settings.rendering.base_color,
|
|
method="caption",
|
|
size=(frame_w - 160, bottom_h - 40),
|
|
)
|
|
.with_duration(duration)
|
|
.with_position(("center", caption_y))
|
|
)
|
|
|
|
composite = CompositeVideoClip(
|
|
[background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips],
|
|
size=(frame_w, frame_h),
|
|
)
|
|
|
|
output_path = output_dir / f"clip_{index:02d}.mp4"
|
|
composite.write_videofile(
|
|
str(output_path),
|
|
codec=self.settings.rendering.video_codec,
|
|
audio_codec=self.settings.rendering.audio_codec,
|
|
fps=self.settings.rendering.fps,
|
|
bitrate=self.settings.rendering.bitrate,
|
|
ffmpeg_params=[
|
|
"-preset",
|
|
self.settings.rendering.preset,
|
|
"-pix_fmt",
|
|
"yuv420p",
|
|
],
|
|
temp_audiofile=str(output_dir / f"temp_audio_{index:02d}.m4a"),
|
|
remove_temp=True,
|
|
threads=4,
|
|
)
|
|
|
|
composite.close()
|
|
resized_clip.close()
|
|
video_clip.close()
|
|
title_clip.close()
|
|
background.close()
|
|
top_panel.close()
|
|
bottom_panel.close()
|
|
for clip in caption_clips:
|
|
clip.close()
|
|
for clip in caption_resources:
|
|
clip.close()
|
|
|
|
return str(output_path)
|
|
|
|
def _collect_words(
|
|
self, transcription: TranscriptionResult, start: float, end: float
|
|
) -> List[WordTiming]:
|
|
collected: List[WordTiming] = []
|
|
for segment in transcription.segments:
|
|
if segment.end < start or segment.start > end:
|
|
continue
|
|
|
|
if segment.words:
|
|
for word in segment.words:
|
|
if word.end < start or word.start > end:
|
|
continue
|
|
collected.append(
|
|
WordTiming(
|
|
start=max(start, word.start),
|
|
end=min(end, word.end),
|
|
word=word.word,
|
|
)
|
|
)
|
|
else:
|
|
collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end))
|
|
|
|
collected.sort(key=lambda w: w.start)
|
|
return collected
|
|
|
|
def _fallback_words(
|
|
self,
|
|
text: str,
|
|
segment_start: float,
|
|
segment_end: float,
|
|
window_start: float,
|
|
window_end: float,
|
|
) -> Iterable[WordTiming]:
|
|
words = [w for w in re.split(r"\s+", text.strip()) if w]
|
|
if not words:
|
|
return []
|
|
|
|
seg_start = max(segment_start, window_start)
|
|
seg_end = min(segment_end, window_end)
|
|
duration = max(0.01, seg_end - seg_start)
|
|
step = duration / len(words)
|
|
|
|
timings: List[WordTiming] = []
|
|
for idx, word in enumerate(words):
|
|
w_start = seg_start + idx * step
|
|
w_end = min(seg_end, w_start + step)
|
|
timings.append(WordTiming(start=w_start, end=w_end, word=word))
|
|
return timings
|
|
|
|
@staticmethod
|
|
def _wrap_text(text: str, max_width: int) -> str:
|
|
text = text.strip()
|
|
if not text:
|
|
return ""
|
|
|
|
words = text.split()
|
|
lines: List[str] = []
|
|
current: List[str] = []
|
|
for word in words:
|
|
current.append(word)
|
|
if len(" ".join(current)) > max_width // 18:
|
|
lines.append(" ".join(current[:-1]))
|
|
current = [current[-1]]
|
|
if current:
|
|
lines.append(" ".join(current))
|
|
return "\n".join(lines)
|