Cria novos components
This commit is contained in:
406
video_render/rendering.py
Normal file
406
video_render/rendering.py
Normal file
@@ -0,0 +1,406 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import math
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterable, List, Sequence, Tuple
|
||||
|
||||
import numpy as np
|
||||
from moviepy.editor import (
|
||||
ColorClip,
|
||||
CompositeVideoClip,
|
||||
ImageClip,
|
||||
TextClip,
|
||||
VideoFileClip,
|
||||
)
|
||||
from PIL import Image, ImageColor, ImageDraw, ImageFont
|
||||
|
||||
from .config import Settings
|
||||
from .transcription import TranscriptionResult, WordTiming
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def clamp_time(value: float, minimum: float = 0.0) -> float:
|
||||
return max(minimum, float(value))
|
||||
|
||||
|
||||
@dataclass
|
||||
class CaptionClipSet:
|
||||
base: ImageClip
|
||||
highlights: List[ImageClip]
|
||||
|
||||
|
||||
class CaptionBuilder:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self.font_path = settings.rendering.font_path
|
||||
if not self.font_path.exists():
|
||||
raise FileNotFoundError(f"Fonte nao encontrada: {self.font_path}")
|
||||
|
||||
self.font = ImageFont.truetype(
|
||||
str(self.font_path), settings.rendering.subtitle_font_size
|
||||
)
|
||||
self.base_color = ImageColor.getrgb(settings.rendering.base_color)
|
||||
self.highlight_color = ImageColor.getrgb(settings.rendering.highlight_color)
|
||||
self.canvas_width = settings.rendering.frame_width - 160
|
||||
self.canvas_height = int(settings.rendering.subtitle_font_size * 2.2)
|
||||
self.min_words = settings.rendering.caption_min_words
|
||||
self.max_words = settings.rendering.caption_max_words
|
||||
|
||||
bbox = self.font.getbbox("Ay")
|
||||
self.text_height = bbox[3] - bbox[1]
|
||||
self.baseline = (self.canvas_height - self.text_height) // 2 - bbox[1]
|
||||
self.space_width = self.font.getbbox(" ")[2] - self.font.getbbox(" ")[0]
|
||||
|
||||
def build(self, words: Sequence[WordTiming], clip_start: float) -> List[CaptionClipSet]:
|
||||
grouped = self._group_words(words)
|
||||
clip_sets: List[CaptionClipSet] = []
|
||||
|
||||
for group in grouped:
|
||||
group_start = clamp_time(group[0].start, minimum=clip_start)
|
||||
group_end = clamp_time(group[-1].end, minimum=group_start + 0.05)
|
||||
duration = max(0.05, group_end - group_start)
|
||||
start_offset = group_start - clip_start
|
||||
|
||||
base_image, highlight_images = self._render_group(group)
|
||||
|
||||
base_clip = (
|
||||
ImageClip(np.array(base_image))
|
||||
.with_start(start_offset)
|
||||
.with_duration(duration)
|
||||
)
|
||||
|
||||
highlight_clips: List[ImageClip] = []
|
||||
for word, image in zip(group, highlight_images):
|
||||
h_start = clamp_time(word.start, minimum=clip_start) - clip_start
|
||||
h_end = clamp_time(word.end, minimum=word.start + 0.02) - clip_start
|
||||
h_duration = max(0.05, h_end - h_start)
|
||||
highlight_clip = (
|
||||
ImageClip(np.array(image))
|
||||
.with_start(h_start)
|
||||
.with_duration(h_duration)
|
||||
)
|
||||
highlight_clips.append(highlight_clip)
|
||||
|
||||
clip_sets.append(CaptionClipSet(base=base_clip, highlights=highlight_clips))
|
||||
|
||||
return clip_sets
|
||||
|
||||
def _render_group(self, group: Sequence[WordTiming]) -> Tuple[Image.Image, List[Image.Image]]:
|
||||
texts = [self._clean_word(word.word) for word in group]
|
||||
|
||||
widths = []
|
||||
for text in texts:
|
||||
bbox = self.font.getbbox(text)
|
||||
widths.append(bbox[2] - bbox[0])
|
||||
|
||||
total_width = sum(widths)
|
||||
if len(widths) > 1:
|
||||
total_width += self.space_width * (len(widths) - 1)
|
||||
|
||||
start_x = max(0, (self.canvas_width - total_width) // 2)
|
||||
|
||||
base_image = Image.new("RGBA", (self.canvas_width, self.canvas_height), (0, 0, 0, 0))
|
||||
base_draw = ImageDraw.Draw(base_image)
|
||||
highlight_images: List[Image.Image] = []
|
||||
|
||||
x = start_x
|
||||
for text, width in zip(texts, widths):
|
||||
base_draw.text((x, self.baseline), text, font=self.font, fill=self.base_color)
|
||||
|
||||
highlight_image = Image.new("RGBA", base_image.size, (0, 0, 0, 0))
|
||||
highlight_draw = ImageDraw.Draw(highlight_image)
|
||||
highlight_draw.text(
|
||||
(x, self.baseline), text, font=self.font, fill=self.highlight_color
|
||||
)
|
||||
highlight_images.append(highlight_image)
|
||||
|
||||
x += width + self.space_width
|
||||
|
||||
return base_image, highlight_images
|
||||
|
||||
def _group_words(self, words: Sequence[WordTiming]) -> List[List[WordTiming]]:
|
||||
if not words:
|
||||
return []
|
||||
|
||||
grouped: List[List[WordTiming]] = []
|
||||
buffer: List[WordTiming] = []
|
||||
|
||||
for word in words:
|
||||
buffer.append(word)
|
||||
if len(buffer) == self.max_words:
|
||||
grouped.append(buffer)
|
||||
buffer = []
|
||||
|
||||
if buffer:
|
||||
if len(buffer) == 1 and grouped:
|
||||
grouped[-1].extend(buffer)
|
||||
else:
|
||||
grouped.append(buffer)
|
||||
|
||||
# Rebalance groups to respect minimum size when possible
|
||||
for idx, group in enumerate(grouped[:-1]):
|
||||
if len(group) < self.min_words and len(grouped[idx + 1]) > self.min_words:
|
||||
deficit = self.min_words - len(group)
|
||||
transfer = grouped[idx + 1][:deficit]
|
||||
grouped[idx] = group + transfer
|
||||
grouped[idx + 1] = grouped[idx + 1][deficit:]
|
||||
|
||||
grouped = [grp for grp in grouped if grp]
|
||||
return grouped
|
||||
|
||||
@staticmethod
|
||||
def _clean_word(text: str) -> str:
|
||||
text = text.strip()
|
||||
text = re.sub(r"\s+", " ", text)
|
||||
return text or "..."
|
||||
|
||||
|
||||
class VideoRenderer:
|
||||
def __init__(self, settings: Settings) -> None:
|
||||
self.settings = settings
|
||||
self.captions = CaptionBuilder(settings)
|
||||
|
||||
def render(
|
||||
self,
|
||||
workspace_path: str,
|
||||
highlight_windows: Sequence,
|
||||
transcription: TranscriptionResult,
|
||||
titles: Sequence[str],
|
||||
output_dir,
|
||||
) -> List[Tuple[str, float, float, str, str, int]]:
|
||||
results: List[Tuple[str, float, float, str, str, int]] = []
|
||||
|
||||
with VideoFileClip(workspace_path) as base_clip:
|
||||
video_duration = base_clip.duration or 0
|
||||
for index, window in enumerate(highlight_windows, start=1):
|
||||
start = clamp_time(window.start)
|
||||
end = clamp_time(window.end)
|
||||
start = min(start, video_duration)
|
||||
end = min(end, video_duration)
|
||||
if end <= start:
|
||||
logger.info("Janela ignorada por intervalo invalido: %s", window)
|
||||
continue
|
||||
|
||||
subclip = base_clip.subclipped(start, end)
|
||||
try:
|
||||
rendered_path = self._render_single_clip(
|
||||
subclip=subclip,
|
||||
start=start,
|
||||
end=end,
|
||||
title=titles[index - 1] if index - 1 < len(titles) else window.summary,
|
||||
summary=window.summary,
|
||||
index=index,
|
||||
transcription=transcription,
|
||||
output_dir=output_dir,
|
||||
)
|
||||
finally:
|
||||
subclip.close()
|
||||
|
||||
results.append(
|
||||
(
|
||||
rendered_path,
|
||||
float(start),
|
||||
float(end),
|
||||
titles[index - 1] if index - 1 < len(titles) else window.summary,
|
||||
window.summary,
|
||||
index,
|
||||
)
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
def _render_single_clip(
|
||||
self,
|
||||
subclip: VideoFileClip,
|
||||
start: float,
|
||||
end: float,
|
||||
title: str,
|
||||
summary: str,
|
||||
index: int,
|
||||
transcription: TranscriptionResult,
|
||||
output_dir,
|
||||
) -> str:
|
||||
duration = end - start
|
||||
frame_w = self.settings.rendering.frame_width
|
||||
frame_h = self.settings.rendering.frame_height
|
||||
top_h = int(frame_h * 0.18)
|
||||
bottom_h = int(frame_h * 0.20)
|
||||
video_area_h = frame_h - top_h - bottom_h
|
||||
|
||||
scale_factor = min(
|
||||
frame_w / subclip.w,
|
||||
video_area_h / subclip.h,
|
||||
)
|
||||
resized_clip = subclip.resized(scale_factor)
|
||||
video_y = top_h + (video_area_h - resized_clip.h) // 2
|
||||
|
||||
video_clip = resized_clip.with_position(
|
||||
((frame_w - resized_clip.w) // 2, video_y)
|
||||
)
|
||||
|
||||
background = ColorClip(size=(frame_w, frame_h), color=(0, 0, 0)).with_duration(duration)
|
||||
top_panel = (
|
||||
ColorClip(size=(frame_w, top_h), color=(12, 12, 12))
|
||||
.with_duration(duration)
|
||||
.with_opacity(0.85)
|
||||
)
|
||||
bottom_panel = (
|
||||
ColorClip(size=(frame_w, bottom_h), color=(12, 12, 12))
|
||||
.with_position((0, frame_h - bottom_h))
|
||||
.with_duration(duration)
|
||||
.with_opacity(0.85)
|
||||
)
|
||||
|
||||
title_text = title or summary
|
||||
wrapped_title = self._wrap_text(title_text, max_width=frame_w - 160)
|
||||
title_clip = (
|
||||
TextClip(
|
||||
text=wrapped_title,
|
||||
font=str(self.settings.rendering.font_path),
|
||||
font_size=self.settings.rendering.title_font_size,
|
||||
color=self.settings.rendering.base_color,
|
||||
method="caption",
|
||||
size=(frame_w - 160, top_h - 40),
|
||||
)
|
||||
.with_duration(duration)
|
||||
)
|
||||
title_clip = title_clip.with_position(
|
||||
((frame_w - title_clip.w) // 2, (top_h - title_clip.h) // 2)
|
||||
)
|
||||
|
||||
words = self._collect_words(transcription, start, end)
|
||||
caption_sets = self.captions.build(words, clip_start=start)
|
||||
|
||||
caption_clips = []
|
||||
caption_resources: List[ImageClip] = []
|
||||
caption_y = frame_h - bottom_h + (bottom_h - self.captions.canvas_height) // 2
|
||||
for clip_set in caption_sets:
|
||||
base_positioned = clip_set.base.with_position(("center", caption_y))
|
||||
caption_clips.append(base_positioned)
|
||||
caption_resources.append(clip_set.base)
|
||||
for highlight in clip_set.highlights:
|
||||
positioned = highlight.with_position(("center", caption_y))
|
||||
caption_clips.append(positioned)
|
||||
caption_resources.append(highlight)
|
||||
|
||||
if not caption_clips:
|
||||
fallback_text = self._wrap_text(summary or title, max_width=frame_w - 160)
|
||||
caption_clips.append(
|
||||
TextClip(
|
||||
text=fallback_text,
|
||||
font=str(self.settings.rendering.font_path),
|
||||
font_size=self.settings.rendering.subtitle_font_size,
|
||||
color=self.settings.rendering.base_color,
|
||||
method="caption",
|
||||
size=(frame_w - 160, bottom_h - 40),
|
||||
)
|
||||
.with_duration(duration)
|
||||
.with_position(("center", caption_y))
|
||||
)
|
||||
|
||||
composite = CompositeVideoClip(
|
||||
[background, top_panel, bottom_panel, video_clip, title_clip, *caption_clips],
|
||||
size=(frame_w, frame_h),
|
||||
)
|
||||
|
||||
output_path = output_dir / f"clip_{index:02d}.mp4"
|
||||
composite.write_videofile(
|
||||
str(output_path),
|
||||
codec=self.settings.rendering.video_codec,
|
||||
audio_codec=self.settings.rendering.audio_codec,
|
||||
fps=self.settings.rendering.fps,
|
||||
bitrate=self.settings.rendering.bitrate,
|
||||
ffmpeg_params=[
|
||||
"-preset",
|
||||
self.settings.rendering.preset,
|
||||
"-pix_fmt",
|
||||
"yuv420p",
|
||||
],
|
||||
temp_audiofile=str(output_dir / f"temp_audio_{index:02d}.m4a"),
|
||||
remove_temp=True,
|
||||
threads=4,
|
||||
)
|
||||
|
||||
composite.close()
|
||||
resized_clip.close()
|
||||
video_clip.close()
|
||||
title_clip.close()
|
||||
background.close()
|
||||
top_panel.close()
|
||||
bottom_panel.close()
|
||||
for clip in caption_clips:
|
||||
clip.close()
|
||||
for clip in caption_resources:
|
||||
clip.close()
|
||||
|
||||
return str(output_path)
|
||||
|
||||
def _collect_words(
|
||||
self, transcription: TranscriptionResult, start: float, end: float
|
||||
) -> List[WordTiming]:
|
||||
collected: List[WordTiming] = []
|
||||
for segment in transcription.segments:
|
||||
if segment.end < start or segment.start > end:
|
||||
continue
|
||||
|
||||
if segment.words:
|
||||
for word in segment.words:
|
||||
if word.end < start or word.start > end:
|
||||
continue
|
||||
collected.append(
|
||||
WordTiming(
|
||||
start=max(start, word.start),
|
||||
end=min(end, word.end),
|
||||
word=word.word,
|
||||
)
|
||||
)
|
||||
else:
|
||||
collected.extend(self._fallback_words(segment.text, segment.start, segment.end, start, end))
|
||||
|
||||
collected.sort(key=lambda w: w.start)
|
||||
return collected
|
||||
|
||||
def _fallback_words(
|
||||
self,
|
||||
text: str,
|
||||
segment_start: float,
|
||||
segment_end: float,
|
||||
window_start: float,
|
||||
window_end: float,
|
||||
) -> Iterable[WordTiming]:
|
||||
words = [w for w in re.split(r"\s+", text.strip()) if w]
|
||||
if not words:
|
||||
return []
|
||||
|
||||
seg_start = max(segment_start, window_start)
|
||||
seg_end = min(segment_end, window_end)
|
||||
duration = max(0.01, seg_end - seg_start)
|
||||
step = duration / len(words)
|
||||
|
||||
timings: List[WordTiming] = []
|
||||
for idx, word in enumerate(words):
|
||||
w_start = seg_start + idx * step
|
||||
w_end = min(seg_end, w_start + step)
|
||||
timings.append(WordTiming(start=w_start, end=w_end, word=word))
|
||||
return timings
|
||||
|
||||
@staticmethod
|
||||
def _wrap_text(text: str, max_width: int) -> str:
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
words = text.split()
|
||||
lines: List[str] = []
|
||||
current: List[str] = []
|
||||
for word in words:
|
||||
current.append(word)
|
||||
if len(" ".join(current)) > max_width // 18:
|
||||
lines.append(" ".join(current[:-1]))
|
||||
current = [current[-1]]
|
||||
if current:
|
||||
lines.append(" ".join(current))
|
||||
return "\n".join(lines)
|
||||
Reference in New Issue
Block a user