Files
video-render/video_render/smart_framing.py

697 lines
25 KiB
Python

"""
Smart framing module for intelligent video cropping and composition.
This module provides functionality to create 9:16 vertical videos with
intelligent framing that follows the action and speakers.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple
import cv2
import numpy as np
from moviepy.video.VideoClip import VideoClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from scipy import signal
from video_render.context_detection import ContextAnalyzer, FrameContext, FaceDetection
logger = logging.getLogger(__name__)
@dataclass
class CropRegion:
"""Defines a crop region for a frame."""
x: int
y: int
width: int
height: int
@dataclass
class FramingPlan:
"""Complete framing plan for a video segment."""
frame_contexts: List[FrameContext]
crop_regions: List[CropRegion]
layout_mode: str
fps: float
class SmartFramer:
"""Creates intelligent 9:16 framing for horizontal videos."""
def __init__(
self,
target_width: int = 1080,
target_height: int = 1920,
frame_skip: int = 1,
smoothing_window: int = 30,
max_velocity: int = 20,
person_switch_cooldown: int = 999999
):
self.target_width = target_width
self.target_height = target_height
self.target_aspect = target_height / target_width
self.frame_skip = frame_skip
self.smoothing_window = smoothing_window
self.max_velocity = max_velocity
self.person_switch_cooldown = person_switch_cooldown
logger.info(f"Smart framer initialized (target: {target_width}x{target_height}, frame_skip={frame_skip}, smoothing={smoothing_window}, velocity={max_velocity}, cooldown={person_switch_cooldown})")
def create_framing_plan(
self,
video_path: str,
start_time: float,
end_time: float,
audio_samples: Optional[np.ndarray] = None
) -> FramingPlan:
"""
Analyze video and create a complete framing plan.
Args:
video_path: Path to video file
start_time: Start time in seconds
end_time: End time in seconds
audio_samples: Optional audio samples for speech detection
Returns:
FramingPlan with all frame contexts and crop regions
"""
analyzer = ContextAnalyzer(person_switch_cooldown=self.person_switch_cooldown)
speaking_periods = None
if audio_samples is not None:
speaking_periods = analyzer.audio_detector.detect_speaking_periods(audio_samples)
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_contexts = []
frame_number = start_frame
processed_count = 0
logger.info(f"Analyzing frames {start_frame} to {end_frame} (fps={fps}, skip={self.frame_skip})")
while frame_number < end_frame:
ret, frame = cap.read()
if not ret:
break
if processed_count % self.frame_skip == 0:
timestamp = frame_number / fps
context = analyzer.analyze_frame(frame, timestamp, frame_number, speaking_periods)
frame_contexts.append(context)
frame_number += 1
processed_count += 1
source_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
source_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
analyzer.close()
layout_modes = [ctx.layout_mode for ctx in frame_contexts]
if layout_modes:
overall_layout = max(set(layout_modes), key=layout_modes.count)
else:
overall_layout = "single"
crop_regions = self._calculate_crop_regions(
frame_contexts,
source_width,
source_height
)
framing_plan = FramingPlan(
frame_contexts=frame_contexts,
crop_regions=crop_regions,
layout_mode=overall_layout,
fps=fps
)
import gc
gc.collect()
return framing_plan
def _calculate_crop_regions(
self,
contexts: List[FrameContext],
source_width: int,
source_height: int
) -> List[CropRegion]:
"""
Calculate smooth crop regions for each frame.
Args:
contexts: List of frame contexts
source_width: Source video width
source_height: Source video height
Returns:
List of crop regions
"""
if not contexts:
return []
source_aspect = source_width / source_height
if source_aspect > self.target_aspect:
crop_height = source_height
crop_width = int(crop_height / self.target_aspect)
if crop_width > source_width:
crop_width = source_width
crop_height = int(crop_width * self.target_aspect)
else:
crop_width = source_width
crop_height = int(crop_width * self.target_aspect)
if crop_height > source_height:
crop_height = source_height
crop_width = int(crop_height / self.target_aspect)
safe_zone_margin_x = crop_width * 0.40
safe_zone_margin_y = crop_height * 0.40
dead_zone_threshold = 100
if contexts and contexts[0].primary_focus:
current_crop_center_x = contexts[0].primary_focus[0]
current_crop_center_y = contexts[0].primary_focus[1]
else:
current_crop_center_x = source_width // 2
current_crop_center_y = source_height // 2
center_xs = [current_crop_center_x]
center_ys = [current_crop_center_y]
for ctx in contexts[1:]:
if ctx.primary_focus and ctx.selected_people and len(ctx.detected_faces) > 0:
primary_person_idx = ctx.selected_people[0] if ctx.selected_people else 0
if primary_person_idx < len(ctx.detected_faces):
face = ctx.detected_faces[primary_person_idx]
face_left = face.x
face_right = face.x + face.width
face_top = face.y
face_bottom = face.y + face.height
crop_left = current_crop_center_x - crop_width // 2
crop_right = current_crop_center_x + crop_width // 2
crop_top = current_crop_center_y - crop_height // 2
crop_bottom = current_crop_center_y + crop_height // 2
face_rel_left = face_left - crop_left
face_rel_right = face_right - crop_left
face_rel_top = face_top - crop_top
face_rel_bottom = face_bottom - crop_top
face_left_safe = face_rel_left >= safe_zone_margin_x
face_right_safe = face_rel_right <= (crop_width - safe_zone_margin_x)
face_top_safe = face_rel_top >= safe_zone_margin_y
face_bottom_safe = face_rel_bottom <= (crop_height - safe_zone_margin_y)
face_fully_visible = face_left_safe and face_right_safe and face_top_safe and face_bottom_safe
if face_fully_visible:
center_xs.append(current_crop_center_x)
center_ys.append(current_crop_center_y)
else:
shift_x = 0
shift_y = 0
if not face_left_safe:
shift_x = face_rel_left - safe_zone_margin_x
elif not face_right_safe:
shift_x = face_rel_right - (crop_width - safe_zone_margin_x)
if not face_top_safe:
shift_y = face_rel_top - safe_zone_margin_y
elif not face_bottom_safe:
shift_y = face_rel_bottom - (crop_height - safe_zone_margin_y)
if abs(shift_x) > dead_zone_threshold:
current_crop_center_x += shift_x
if abs(shift_y) > dead_zone_threshold:
current_crop_center_y += shift_y
center_xs.append(current_crop_center_x)
center_ys.append(current_crop_center_y)
else:
center_xs.append(current_crop_center_x)
center_ys.append(current_crop_center_y)
else:
center_xs.append(current_crop_center_x)
center_ys.append(current_crop_center_y)
if len(center_xs) > 1:
alpha = 0.002
smoothed_xs = [center_xs[0]]
smoothed_ys = [center_ys[0]]
for i in range(1, len(center_xs)):
if center_xs[i] != center_xs[i-1] or center_ys[i] != center_ys[i-1]:
smoothed_xs.append(alpha * center_xs[i] + (1 - alpha) * smoothed_xs[i-1])
smoothed_ys.append(alpha * center_ys[i] + (1 - alpha) * smoothed_ys[i-1])
else:
smoothed_xs.append(smoothed_xs[i-1])
smoothed_ys.append(smoothed_ys[i-1])
center_xs = smoothed_xs
center_ys = smoothed_ys
center_xs = self._limit_velocity(center_xs, 2)
center_ys = self._limit_velocity(center_ys, 2)
center_xs = self._apply_dead_zone(center_xs, 5)
center_ys = self._apply_dead_zone(center_ys, 5)
crop_regions = []
for center_x, center_y in zip(center_xs, center_ys):
x = int(center_x - crop_width // 2)
y = int(center_y - crop_height // 2)
x = max(0, min(x, source_width - crop_width))
y = max(0, min(y, source_height - crop_height))
crop_regions.append(CropRegion(
x=x,
y=y,
width=crop_width,
height=crop_height
))
center_xs.clear()
center_ys.clear()
return crop_regions
def _apply_dead_zone(self, positions: List[float], threshold: float) -> List[float]:
"""
Apply dead zone to eliminate micro-movements.
If change is smaller than threshold, keep previous position.
Args:
positions: List of positions
threshold: Minimum change needed to move (pixels)
Returns:
Positions with dead zone applied
"""
if len(positions) <= 1:
return positions
filtered = [positions[0]]
for i in range(1, len(positions)):
delta = abs(positions[i] - filtered[i - 1])
if delta < threshold:
filtered.append(filtered[i - 1])
else:
filtered.append(positions[i])
return filtered
def _limit_velocity(self, positions: List[float], max_velocity: float) -> List[float]:
"""
Limit the velocity of position changes.
Args:
positions: List of positions
max_velocity: Maximum allowed change per frame
Returns:
Smoothed positions
"""
if len(positions) <= 1:
return positions
limited = [positions[0]]
for i in range(1, len(positions)):
delta = positions[i] - limited[i - 1]
if abs(delta) > max_velocity:
delta = max_velocity if delta > 0 else -max_velocity
limited.append(limited[i - 1] + delta)
return limited
def apply_framing(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply smart framing to a video clip.
Always uses single-person focus (no split screen).
Args:
video_clip: Source video clip
framing_plan: Framing plan to apply
Returns:
Reframed video clip
"""
return self._apply_single_framing(video_clip, framing_plan)
def _apply_single_framing(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply single-focus framing (following one person or action).
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Reframed video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
if not framing_plan.crop_regions:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
else:
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
idx_floor = int(exact_frame_idx)
idx_ceil = idx_floor + 1
alpha = exact_frame_idx - idx_floor
idx_floor = max(0, min(idx_floor, len(framing_plan.crop_regions) - 1))
idx_ceil = max(0, min(idx_ceil, len(framing_plan.crop_regions) - 1))
crop1 = framing_plan.crop_regions[idx_floor]
crop2 = framing_plan.crop_regions[idx_ceil]
x = int(crop1.x * (1 - alpha) + crop2.x * alpha)
y = int(crop1.y * (1 - alpha) + crop2.y * alpha)
width = int(crop1.width * (1 - alpha) + crop2.width * alpha)
height = int(crop1.height * (1 - alpha) + crop2.height * alpha)
h, w = frame.shape[:2]
x = max(0, min(x, w - width))
y = max(0, min(y, h - height))
width = min(width, w - x)
height = min(height, h - y)
cropped = frame[y:y + height, x:x + width]
resized = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
return resized
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def _apply_split_screen(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply split screen for two people.
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Split screen video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
frame_idx = int(exact_frame_idx)
if not framing_plan.frame_contexts:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
context = framing_plan.frame_contexts[frame_idx]
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
if context.selected_people and len(context.selected_people) >= 2:
selected_faces = [context.detected_faces[i] for i in context.selected_people[:2]
if i < len(context.detected_faces)]
if len(selected_faces) >= 2:
faces = sorted(selected_faces, key=lambda f: f.center_x)
left_face = faces[0]
right_face = faces[1]
for idx, face in enumerate([left_face, right_face]):
half_width = self.target_width // 2
half_aspect = self.target_height / half_width # Aspect ratio for half
face_width = max(face.width, frame.shape[1] // 4) # At least 1/4 of frame width
crop_width = int(face_width * 2.5) # Add padding around face
crop_height = int(crop_width * half_aspect) # Maintain correct aspect
max_crop_width = frame.shape[1] // 2 # Half the source width
max_crop_height = frame.shape[0] # Full source height
if crop_width > max_crop_width:
crop_width = max_crop_width
crop_height = int(crop_width * half_aspect)
if crop_height > max_crop_height:
crop_height = max_crop_height
crop_width = int(crop_height / half_aspect)
x = max(0, face.center_x - crop_width // 2)
y = max(0, face.center_y - crop_height // 2)
x = min(x, frame.shape[1] - crop_width)
y = min(y, frame.shape[0] - crop_height)
cropped = frame[y:y + crop_height, x:x + crop_width]
resized = cv2.resize(
cropped,
(half_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
x_offset = idx * half_width
output[:, x_offset:x_offset + half_width] = resized
else:
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
else:
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
return output
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def _apply_grid_layout(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply grid layout for 3+ people.
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Grid layout video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
frame_idx = int(exact_frame_idx)
if not framing_plan.frame_contexts:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
context = framing_plan.frame_contexts[frame_idx]
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
num_faces = len(context.detected_faces)
if num_faces >= 3:
cell_width = self.target_width // 2
cell_height = self.target_height // 2
for idx, face in enumerate(context.detected_faces[:4]):
row = idx // 2
col = idx % 2
cell_aspect = cell_height / cell_width
crop_width = frame.shape[1] // 2
crop_height = int(crop_width * cell_aspect)
max_crop_width = frame.shape[1] // 2
max_crop_height = frame.shape[0] // 2
if crop_width > max_crop_width:
crop_width = max_crop_width
crop_height = int(crop_width * cell_aspect)
if crop_height > max_crop_height:
crop_height = max_crop_height
crop_width = int(crop_height / cell_aspect)
x = max(0, face.center_x - crop_width // 2)
y = max(0, face.center_y - crop_height // 2)
x = min(x, frame.shape[1] - crop_width)
y = min(y, frame.shape[0] - crop_height)
cropped = frame[y:y + crop_height, x:x + crop_width]
resized = cv2.resize(
cropped,
(cell_width, cell_height),
interpolation=cv2.INTER_LINEAR
)
y_offset = row * cell_height
x_offset = col * cell_width
output[y_offset:y_offset + cell_height, x_offset:x_offset + cell_width] = resized
else:
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
return output
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def extract_audio_samples(video_path: str, start_time: float, end_time: float) -> Optional[np.ndarray]:
"""
Extract audio samples from video for speech detection.
Args:
video_path: Path to video file
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Audio samples array or None if no audio
"""
try:
from moviepy.audio.io.AudioFileClip import AudioFileClip
with AudioFileClip(video_path) as audio:
segment = audio.subclipped(start_time, end_time)
fps = getattr(segment, 'fps', 44100)
samples = segment.to_soundarray(fps=fps)
return samples
except Exception as exc:
logger.warning(f"Failed to extract audio: {exc}")
return None