1154 lines
43 KiB
Python
1154 lines
43 KiB
Python
"""
|
|
Smart framing module for intelligent video cropping and composition.
|
|
|
|
This module provides functionality to create 9:16 vertical videos with
|
|
intelligent framing that follows the action and speakers.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from moviepy.video.VideoClip import VideoClip
|
|
from moviepy.video.io.VideoFileClip import VideoFileClip
|
|
from scipy import signal
|
|
|
|
from video_render.context_detection import ContextAnalyzer, FrameContext, FaceDetection, GroupBoundingBox
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class CropRegion:
|
|
"""Defines a crop region for a frame."""
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
|
|
|
|
@dataclass
|
|
class FramingPlan:
|
|
"""Complete framing plan for a video segment."""
|
|
frame_contexts: List[FrameContext]
|
|
crop_regions: List[CropRegion]
|
|
layout_mode: str
|
|
fps: float
|
|
|
|
|
|
class SmartFramer:
|
|
"""Creates intelligent 9:16 framing for horizontal videos with multi-person support."""
|
|
|
|
def __init__(
|
|
self,
|
|
target_width: int = 1080,
|
|
target_height: int = 1920,
|
|
frame_skip: int = 1,
|
|
smoothing_window: int = 30,
|
|
max_velocity: int = 25,
|
|
person_switch_cooldown: int = 30,
|
|
response_time: float = 0.6,
|
|
group_padding: float = 0.15,
|
|
max_zoom_out: float = 2.0,
|
|
dead_zone: int = 100,
|
|
min_face_confidence: float = 0.3
|
|
):
|
|
self.target_width = target_width
|
|
self.target_height = target_height
|
|
self.target_aspect = target_height / target_width
|
|
self.frame_skip = frame_skip
|
|
self.smoothing_window = smoothing_window
|
|
self.max_velocity = max_velocity
|
|
self.person_switch_cooldown = person_switch_cooldown
|
|
self.response_time = response_time
|
|
self.group_padding = group_padding
|
|
self.max_zoom_out = max_zoom_out
|
|
self.dead_zone = dead_zone
|
|
self.min_face_confidence = min_face_confidence
|
|
self.position_history_size = 45
|
|
self.hysteresis_frames = 8
|
|
|
|
logger.info(f"Smart framer initialized (target: {target_width}x{target_height}, response_time={response_time}s, max_velocity={max_velocity}, dead_zone={dead_zone})")
|
|
|
|
def create_framing_plan(
|
|
self,
|
|
video_path: str,
|
|
start_time: float,
|
|
end_time: float,
|
|
audio_samples: Optional[np.ndarray] = None
|
|
) -> FramingPlan:
|
|
"""
|
|
Analyze video and create a complete framing plan.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
start_time: Start time in seconds
|
|
end_time: End time in seconds
|
|
audio_samples: Optional audio samples for speech detection
|
|
|
|
Returns:
|
|
FramingPlan with all frame contexts and crop regions
|
|
"""
|
|
analyzer = ContextAnalyzer(
|
|
person_switch_cooldown=self.person_switch_cooldown,
|
|
min_face_confidence=self.min_face_confidence
|
|
)
|
|
|
|
speaking_periods = None
|
|
if audio_samples is not None:
|
|
speaking_periods = analyzer.audio_detector.detect_speaking_periods(audio_samples)
|
|
|
|
import os
|
|
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
start_frame = int(start_time * fps)
|
|
end_frame = int(end_time * fps)
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
|
|
|
|
frame_contexts = []
|
|
frame_number = start_frame
|
|
processed_count = 0
|
|
|
|
logger.info(f"Analyzing frames {start_frame} to {end_frame} (fps={fps}, skip={self.frame_skip})")
|
|
|
|
while frame_number < end_frame:
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
if processed_count % self.frame_skip == 0:
|
|
timestamp = frame_number / fps
|
|
context = analyzer.analyze_frame(frame, timestamp, frame_number, speaking_periods)
|
|
frame_contexts.append(context)
|
|
|
|
frame_number += 1
|
|
processed_count += 1
|
|
|
|
source_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
source_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
cap.release()
|
|
analyzer.close()
|
|
|
|
layout_modes = [ctx.layout_mode for ctx in frame_contexts]
|
|
if layout_modes:
|
|
overall_layout = max(set(layout_modes), key=layout_modes.count)
|
|
else:
|
|
overall_layout = "single"
|
|
|
|
crop_regions = self._calculate_crop_regions(
|
|
frame_contexts,
|
|
source_width,
|
|
source_height,
|
|
fps=fps
|
|
)
|
|
|
|
framing_plan = FramingPlan(
|
|
frame_contexts=frame_contexts,
|
|
crop_regions=crop_regions,
|
|
layout_mode=overall_layout,
|
|
fps=fps
|
|
)
|
|
|
|
import gc
|
|
gc.collect()
|
|
|
|
return framing_plan
|
|
|
|
def _segment_by_face_detection(
|
|
self,
|
|
has_face_flags: List[bool],
|
|
min_segment_frames: int = 10
|
|
) -> List[Tuple[int, int, bool]]:
|
|
"""
|
|
Segment the video into continuous regions with/without face.
|
|
Returns list of (start_idx, end_idx, has_face) tuples.
|
|
Small segments are merged with neighbors.
|
|
"""
|
|
if not has_face_flags:
|
|
return []
|
|
|
|
segments = []
|
|
start_idx = 0
|
|
current_state = has_face_flags[0]
|
|
|
|
for i in range(1, len(has_face_flags)):
|
|
if has_face_flags[i] != current_state:
|
|
segments.append((start_idx, i - 1, current_state))
|
|
start_idx = i
|
|
current_state = has_face_flags[i]
|
|
|
|
segments.append((start_idx, len(has_face_flags) - 1, current_state))
|
|
|
|
merged = []
|
|
for seg in segments:
|
|
start, end, has_face = seg
|
|
length = end - start + 1
|
|
|
|
if length < min_segment_frames and merged:
|
|
prev_start, prev_end, prev_has_face = merged[-1]
|
|
merged[-1] = (prev_start, end, prev_has_face)
|
|
else:
|
|
merged.append(seg)
|
|
|
|
return merged
|
|
|
|
def _interpolate_smooth(
|
|
self,
|
|
positions: List[float],
|
|
segments: List[Tuple[int, int, bool]],
|
|
transition_frames: int = 15
|
|
) -> List[float]:
|
|
"""
|
|
Create smooth transitions between segments using cosine interpolation.
|
|
Within each segment, position is constant. Between segments, smooth transition.
|
|
"""
|
|
if not positions or not segments:
|
|
return positions
|
|
|
|
result = list(positions)
|
|
|
|
segment_values = []
|
|
for start, end, has_face in segments:
|
|
seg_positions = positions[start:end + 1]
|
|
if seg_positions:
|
|
segment_values.append(float(np.median(seg_positions)))
|
|
else:
|
|
segment_values.append(positions[start] if start < len(positions) else 0.0)
|
|
|
|
for i, (start, end, has_face) in enumerate(segments):
|
|
value = segment_values[i]
|
|
for j in range(start, end + 1):
|
|
result[j] = value
|
|
|
|
for i in range(len(segments) - 1):
|
|
seg1_start, seg1_end, _ = segments[i]
|
|
seg2_start, seg2_end, _ = segments[i + 1]
|
|
val1 = segment_values[i]
|
|
val2 = segment_values[i + 1]
|
|
|
|
if abs(val2 - val1) < self.dead_zone * 0.5:
|
|
continue
|
|
|
|
trans_start = max(seg1_end - transition_frames // 2, seg1_start)
|
|
trans_end = min(seg2_start + transition_frames // 2, seg2_end)
|
|
trans_length = trans_end - trans_start + 1
|
|
|
|
if trans_length < 2:
|
|
continue
|
|
|
|
for j in range(trans_length):
|
|
t = j / (trans_length - 1)
|
|
smooth_t = 0.5 - 0.5 * np.cos(t * np.pi)
|
|
idx = trans_start + j
|
|
if 0 <= idx < len(result):
|
|
result[idx] = val1 + (val2 - val1) * smooth_t
|
|
|
|
return result
|
|
|
|
def _apply_savgol_filter(
|
|
self,
|
|
positions: List[float],
|
|
window_length: int = 61,
|
|
polyorder: int = 2
|
|
) -> List[float]:
|
|
"""
|
|
Apply Savitzky-Golay filter for ultra-smooth position tracking.
|
|
This is a signal processing filter that preserves trends while removing noise.
|
|
"""
|
|
if len(positions) < window_length:
|
|
window_length = len(positions) if len(positions) % 2 == 1 else len(positions) - 1
|
|
if window_length < 3:
|
|
return positions
|
|
|
|
if window_length % 2 == 0:
|
|
window_length -= 1
|
|
|
|
if window_length <= polyorder:
|
|
polyorder = max(1, window_length - 1)
|
|
|
|
try:
|
|
smoothed = signal.savgol_filter(positions, window_length, polyorder, mode='nearest')
|
|
return smoothed.tolist()
|
|
except Exception as e:
|
|
logger.warning(f"Savgol filter failed: {e}, returning original positions")
|
|
return positions
|
|
|
|
def _apply_median_filter(self, positions: List[float], window_size: int = 5) -> List[float]:
|
|
"""
|
|
Apply median filter to remove detection noise.
|
|
|
|
Median filter is ideal for removing outliers while preserving
|
|
edges (real movements). Window size of 5 means each position
|
|
is replaced by the median of itself and 2 neighbors on each side.
|
|
|
|
Args:
|
|
positions: Raw positions from detection
|
|
window_size: Window size (must be odd), default 5
|
|
|
|
Returns:
|
|
Filtered positions with noise removed
|
|
"""
|
|
if len(positions) < window_size:
|
|
return positions
|
|
|
|
from scipy.signal import medfilt
|
|
|
|
if window_size % 2 == 0:
|
|
window_size += 1
|
|
|
|
filtered = medfilt(positions, kernel_size=window_size)
|
|
|
|
return filtered.tolist()
|
|
|
|
def _is_detection_stable(self, has_face_flags: List[bool], window_size: int = 30) -> bool:
|
|
"""
|
|
Check if face detection is stable enough to use smart framing.
|
|
If detection is too unstable (frequent changes), it's better to use static center crop.
|
|
|
|
Args:
|
|
has_face_flags: Boolean flags indicating if face was detected per frame
|
|
window_size: Number of frames to analyze for stability
|
|
|
|
Returns:
|
|
True if detection is stable, False if too unstable
|
|
"""
|
|
if len(has_face_flags) < window_size:
|
|
window_size = len(has_face_flags)
|
|
|
|
if window_size == 0:
|
|
return False
|
|
|
|
changes = 0
|
|
for i in range(1, len(has_face_flags)):
|
|
if has_face_flags[i] != has_face_flags[i-1]:
|
|
changes += 1
|
|
|
|
change_rate = changes / len(has_face_flags)
|
|
|
|
return change_rate < 0.3
|
|
|
|
def _stabilize_no_face_sequences(
|
|
self,
|
|
positions: List[float],
|
|
has_face_flags: List[bool],
|
|
source_center: float = None
|
|
) -> List[float]:
|
|
"""
|
|
Stabilize positions during sequences without face detection.
|
|
Uses median of all valid positions for maximum stability.
|
|
"""
|
|
if len(positions) != len(has_face_flags):
|
|
return positions
|
|
|
|
fallback = source_center if source_center else (positions[0] if positions else 0.0)
|
|
|
|
face_ratio = sum(has_face_flags) / len(has_face_flags) if has_face_flags else 0
|
|
if face_ratio < 0.15:
|
|
return [fallback] * len(positions)
|
|
|
|
changes = sum(1 for i in range(1, len(has_face_flags)) if has_face_flags[i] != has_face_flags[i-1])
|
|
instability_ratio = changes / len(has_face_flags) if has_face_flags else 0
|
|
if instability_ratio > 0.25:
|
|
valid_positions = [positions[i] for i, has_face in enumerate(has_face_flags) if has_face]
|
|
if valid_positions:
|
|
return [float(np.median(valid_positions))] * len(positions)
|
|
return [fallback] * len(positions)
|
|
|
|
valid_positions = [positions[i] for i, has_face in enumerate(has_face_flags) if has_face]
|
|
if not valid_positions:
|
|
return [fallback] * len(positions)
|
|
|
|
global_median = float(np.median(valid_positions))
|
|
stabilized = list(positions)
|
|
i = 0
|
|
|
|
while i < len(has_face_flags):
|
|
if not has_face_flags[i]:
|
|
start_idx = i
|
|
recent_valid = []
|
|
for j in range(max(0, start_idx - self.position_history_size), start_idx):
|
|
if has_face_flags[j]:
|
|
recent_valid.append(positions[j])
|
|
|
|
lock_value = float(np.median(recent_valid)) if len(recent_valid) >= 5 else global_median
|
|
|
|
while i < len(has_face_flags) and not has_face_flags[i]:
|
|
stabilized[i] = lock_value
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return stabilized
|
|
|
|
def _calculate_crop_regions(
|
|
self,
|
|
contexts: List[FrameContext],
|
|
source_width: int,
|
|
source_height: int,
|
|
fps: Optional[float] = None
|
|
) -> List[CropRegion]:
|
|
"""
|
|
Calculate smooth crop regions for each frame with multi-person support.
|
|
|
|
Args:
|
|
contexts: List of frame contexts
|
|
source_width: Source video width
|
|
source_height: Source video height
|
|
|
|
Returns:
|
|
List of crop regions
|
|
"""
|
|
if not contexts:
|
|
return []
|
|
|
|
source_aspect = source_width / source_height
|
|
|
|
# Calculate base crop dimensions for 9:16
|
|
if source_aspect > self.target_aspect:
|
|
base_crop_height = source_height
|
|
base_crop_width = int(base_crop_height / self.target_aspect)
|
|
|
|
if base_crop_width > source_width:
|
|
base_crop_width = source_width
|
|
base_crop_height = int(base_crop_width * self.target_aspect)
|
|
else:
|
|
base_crop_width = source_width
|
|
base_crop_height = int(base_crop_width * self.target_aspect)
|
|
|
|
if base_crop_height > source_height:
|
|
base_crop_height = source_height
|
|
base_crop_width = int(base_crop_height / self.target_aspect)
|
|
|
|
center_xs = []
|
|
center_ys = []
|
|
zoom_factors = []
|
|
has_face_flags = []
|
|
|
|
static_center_x = float(source_width // 2)
|
|
static_center_y = float(source_height // 2)
|
|
|
|
last_valid_x = static_center_x
|
|
last_valid_y = static_center_y
|
|
last_valid_zoom = 1.0
|
|
|
|
for ctx in contexts:
|
|
selected_face = None
|
|
if ctx.selected_people:
|
|
idx = ctx.selected_people[0]
|
|
if 0 <= idx < len(ctx.detected_faces):
|
|
selected_face = ctx.detected_faces[idx]
|
|
|
|
if selected_face:
|
|
center_x = float(selected_face.center_x)
|
|
center_y = float(selected_face.center_y)
|
|
center_xs.append(center_x)
|
|
center_ys.append(center_y)
|
|
|
|
required_width = selected_face.width * (1 + self.group_padding * 2)
|
|
required_height = selected_face.height * (1 + self.group_padding * 3)
|
|
|
|
zoom_w = required_width / base_crop_width
|
|
zoom_h = required_height / base_crop_height
|
|
zoom = max(zoom_w, zoom_h, 1.0)
|
|
zoom = min(zoom, self.max_zoom_out)
|
|
zoom_factors.append(zoom)
|
|
|
|
last_valid_x = center_x
|
|
last_valid_y = center_y
|
|
last_valid_zoom = zoom
|
|
has_face_flags.append(True)
|
|
elif ctx.group_bounds and ctx.group_bounds.face_count > 0:
|
|
group = ctx.group_bounds
|
|
center_x = float(group.center_x)
|
|
center_y = float(group.center_y)
|
|
center_xs.append(center_x)
|
|
center_ys.append(center_y)
|
|
|
|
required_width = group.width * (1 + self.group_padding * 2)
|
|
required_height = group.height * (1 + self.group_padding * 3)
|
|
|
|
zoom_w = required_width / base_crop_width
|
|
zoom_h = required_height / base_crop_height
|
|
zoom = max(zoom_w, zoom_h, 1.0)
|
|
zoom = min(zoom, self.max_zoom_out)
|
|
zoom_factors.append(zoom)
|
|
|
|
last_valid_x = center_x
|
|
last_valid_y = center_y
|
|
last_valid_zoom = zoom
|
|
has_face_flags.append(True)
|
|
elif ctx.primary_focus and len(ctx.detected_faces) > 0:
|
|
center_x = float(ctx.primary_focus[0])
|
|
center_y = float(ctx.primary_focus[1])
|
|
center_xs.append(center_x)
|
|
center_ys.append(center_y)
|
|
zoom_factors.append(1.0)
|
|
|
|
last_valid_x = center_x
|
|
last_valid_y = center_y
|
|
last_valid_zoom = 1.0
|
|
has_face_flags.append(True)
|
|
else:
|
|
center_xs.append(last_valid_x)
|
|
center_ys.append(last_valid_y)
|
|
zoom_factors.append(last_valid_zoom)
|
|
has_face_flags.append(False)
|
|
|
|
center_x_video = float(source_width // 2)
|
|
center_y_video = float(source_height // 2)
|
|
|
|
if not self._is_detection_stable(has_face_flags):
|
|
final_xs = [center_x_video] * len(center_xs)
|
|
final_ys = [center_y_video] * len(center_ys)
|
|
final_zooms = [1.0] * len(zoom_factors)
|
|
else:
|
|
center_xs = self._stabilize_no_face_sequences(
|
|
center_xs,
|
|
has_face_flags,
|
|
source_center=center_x_video
|
|
)
|
|
center_ys = self._stabilize_no_face_sequences(
|
|
center_ys,
|
|
has_face_flags,
|
|
source_center=center_y_video
|
|
)
|
|
zoom_factors = self._stabilize_no_face_sequences(
|
|
zoom_factors,
|
|
has_face_flags,
|
|
source_center=1.0
|
|
)
|
|
|
|
face_count = sum(has_face_flags)
|
|
if face_count < len(has_face_flags) * 0.3:
|
|
final_xs = [center_x_video] * len(center_xs)
|
|
final_ys = [center_y_video] * len(center_ys)
|
|
final_zooms = [1.0] * len(zoom_factors)
|
|
else:
|
|
valid_xs = [center_xs[i] for i, has_face in enumerate(has_face_flags) if has_face]
|
|
valid_ys = [center_ys[i] for i, has_face in enumerate(has_face_flags) if has_face]
|
|
valid_zooms = [zoom_factors[i] for i, has_face in enumerate(has_face_flags) if has_face]
|
|
|
|
target_x = float(np.median(valid_xs)) if valid_xs else center_x_video
|
|
target_y = float(np.median(valid_ys)) if valid_ys else center_y_video
|
|
target_zoom = float(np.median(valid_zooms)) if valid_zooms else 1.0
|
|
|
|
for i in range(len(center_xs)):
|
|
if not has_face_flags[i]:
|
|
center_xs[i] = target_x
|
|
center_ys[i] = target_y
|
|
zoom_factors[i] = target_zoom
|
|
|
|
final_xs = self._apply_savgol_filter(center_xs, window_length=61, polyorder=2)
|
|
final_ys = self._apply_savgol_filter(center_ys, window_length=61, polyorder=2)
|
|
final_zooms = self._apply_savgol_filter(zoom_factors, window_length=61, polyorder=2)
|
|
|
|
if fps and self.response_time > 0:
|
|
dt = self.frame_skip / fps
|
|
alpha = 1 - np.exp(-dt / self.response_time)
|
|
final_xs = self._apply_exponential_smoothing(final_xs, alpha)
|
|
final_ys = self._apply_exponential_smoothing(final_ys, alpha)
|
|
final_zooms = self._apply_exponential_smoothing(final_zooms, alpha)
|
|
|
|
# Generate crop regions
|
|
crop_regions = []
|
|
for cx, cy, zoom in zip(final_xs, final_ys, final_zooms):
|
|
# Calculate actual crop size with zoom
|
|
crop_width = int(base_crop_width * zoom)
|
|
crop_height = int(base_crop_height * zoom)
|
|
|
|
# Clamp to source dimensions
|
|
crop_width = min(crop_width, source_width)
|
|
crop_height = min(crop_height, source_height)
|
|
|
|
# Maintain aspect ratio after clamping
|
|
if crop_width / crop_height > base_crop_width / base_crop_height:
|
|
crop_width = int(crop_height * base_crop_width / base_crop_height)
|
|
else:
|
|
crop_height = int(crop_width * base_crop_height / base_crop_width)
|
|
|
|
# Calculate top-left corner
|
|
x = int(cx - crop_width // 2)
|
|
y = int(cy - crop_height // 2)
|
|
|
|
# Keep within bounds
|
|
x = max(0, min(x, source_width - crop_width))
|
|
y = max(0, min(y, source_height - crop_height))
|
|
|
|
crop_regions.append(CropRegion(
|
|
x=x,
|
|
y=y,
|
|
width=crop_width,
|
|
height=crop_height
|
|
))
|
|
|
|
# Clear temporary lists
|
|
center_xs.clear()
|
|
center_ys.clear()
|
|
zoom_factors.clear()
|
|
|
|
return crop_regions
|
|
|
|
def _apply_exponential_smoothing(self, positions: List[float], alpha: float) -> List[float]:
|
|
"""
|
|
Smooth positions with exponential moving average.
|
|
"""
|
|
if not positions:
|
|
return positions
|
|
|
|
alpha = max(0.0, min(alpha, 1.0))
|
|
smoothed = [positions[0]]
|
|
for i in range(1, len(positions)):
|
|
prev = smoothed[-1]
|
|
smoothed.append(prev + alpha * (positions[i] - prev))
|
|
return smoothed
|
|
def _apply_dead_zone(self, positions: List[float], threshold: float) -> List[float]:
|
|
"""
|
|
Apply dead zone to eliminate micro-movements.
|
|
If change is smaller than threshold, keep previous position.
|
|
|
|
Args:
|
|
positions: List of positions
|
|
threshold: Minimum change needed to move (pixels)
|
|
|
|
Returns:
|
|
Positions with dead zone applied
|
|
"""
|
|
if len(positions) <= 1:
|
|
return positions
|
|
|
|
filtered = [positions[0]]
|
|
|
|
for i in range(1, len(positions)):
|
|
delta = abs(positions[i] - filtered[i - 1])
|
|
if delta < threshold:
|
|
filtered.append(filtered[i - 1])
|
|
else:
|
|
filtered.append(positions[i])
|
|
|
|
return filtered
|
|
|
|
def _limit_velocity(self, positions: List[float], max_velocity: float) -> List[float]:
|
|
"""
|
|
Limit the velocity of position changes.
|
|
|
|
Args:
|
|
positions: List of positions
|
|
max_velocity: Maximum allowed change per frame
|
|
|
|
Returns:
|
|
Smoothed positions
|
|
"""
|
|
if len(positions) <= 1:
|
|
return positions
|
|
|
|
limited = [positions[0]]
|
|
|
|
for i in range(1, len(positions)):
|
|
delta = positions[i] - limited[i - 1]
|
|
if abs(delta) > max_velocity:
|
|
delta = max_velocity if delta > 0 else -max_velocity
|
|
|
|
limited.append(limited[i - 1] + delta)
|
|
|
|
return limited
|
|
|
|
def apply_framing(
|
|
self,
|
|
video_clip: VideoFileClip,
|
|
framing_plan: FramingPlan
|
|
) -> VideoClip:
|
|
"""
|
|
Apply smart framing to a video clip.
|
|
Automatically selects layout based on number of people detected.
|
|
|
|
Layouts:
|
|
- 1 person: Single framing (follow person)
|
|
- 2 people: Vertical split screen (side by side)
|
|
- 3 people: 1 on top, 2 on bottom
|
|
- 4 people: 2x2 grid
|
|
|
|
Args:
|
|
video_clip: Source video clip
|
|
framing_plan: Framing plan to apply
|
|
|
|
Returns:
|
|
Reframed video clip
|
|
"""
|
|
# Determine predominant number of faces across all frames
|
|
if not framing_plan.frame_contexts:
|
|
return self._apply_single_framing(video_clip, framing_plan)
|
|
|
|
face_counts = []
|
|
for ctx in framing_plan.frame_contexts:
|
|
if ctx.active_speakers:
|
|
face_counts.append(len(ctx.active_speakers))
|
|
elif ctx.group_bounds:
|
|
face_counts.append(ctx.group_bounds.face_count)
|
|
else:
|
|
face_counts.append(len(ctx.detected_faces))
|
|
|
|
# Use mode (most common) face count, minimum 1
|
|
if face_counts:
|
|
from collections import Counter
|
|
count_freq = Counter(face_counts)
|
|
# Get the most common count, but ignore 0
|
|
non_zero_counts = {k: v for k, v in count_freq.items() if k > 0}
|
|
if non_zero_counts:
|
|
predominant_faces = max(non_zero_counts, key=non_zero_counts.get)
|
|
else:
|
|
predominant_faces = 1
|
|
else:
|
|
predominant_faces = 1
|
|
|
|
logger.info(f"Layout selection: predominant_faces={predominant_faces}")
|
|
|
|
if predominant_faces == 1:
|
|
return self._apply_single_framing(video_clip, framing_plan)
|
|
elif predominant_faces == 2:
|
|
return self._apply_split_screen(video_clip, framing_plan)
|
|
elif predominant_faces == 3:
|
|
return self._apply_three_person_layout(video_clip, framing_plan)
|
|
else: # 4 or more
|
|
return self._apply_grid_layout(video_clip, framing_plan)
|
|
|
|
def _apply_single_framing(
|
|
self,
|
|
video_clip: VideoFileClip,
|
|
framing_plan: FramingPlan
|
|
) -> VideoClip:
|
|
"""
|
|
Apply single-focus framing (following one person or action).
|
|
|
|
Args:
|
|
video_clip: Source video clip
|
|
framing_plan: Framing plan
|
|
|
|
Returns:
|
|
Reframed video clip
|
|
"""
|
|
def make_frame(t):
|
|
frame = video_clip.get_frame(t)
|
|
|
|
if not framing_plan.crop_regions:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
else:
|
|
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
|
|
last_idx = len(framing_plan.crop_regions) - 1
|
|
if last_idx <= 0:
|
|
crop = framing_plan.crop_regions[0]
|
|
x, y, width, height = crop.x, crop.y, crop.width, crop.height
|
|
else:
|
|
exact_frame_idx = max(0.0, min(exact_frame_idx, float(last_idx)))
|
|
low_idx = int(np.floor(exact_frame_idx))
|
|
high_idx = min(low_idx + 1, last_idx)
|
|
alpha = exact_frame_idx - low_idx
|
|
|
|
crop_a = framing_plan.crop_regions[low_idx]
|
|
crop_b = framing_plan.crop_regions[high_idx]
|
|
|
|
x = int(round(crop_a.x + (crop_b.x - crop_a.x) * alpha))
|
|
y = int(round(crop_a.y + (crop_b.y - crop_a.y) * alpha))
|
|
width = int(round(crop_a.width + (crop_b.width - crop_a.width) * alpha))
|
|
height = int(round(crop_a.height + (crop_b.height - crop_a.height) * alpha))
|
|
|
|
h, w = frame.shape[:2]
|
|
x = max(0, min(x, w - width))
|
|
y = max(0, min(y, h - height))
|
|
width = min(width, w - x)
|
|
height = min(height, h - y)
|
|
|
|
cropped = frame[y:y + height, x:x + width]
|
|
|
|
resized = cv2.resize(
|
|
cropped,
|
|
(self.target_width, self.target_height),
|
|
interpolation=cv2.INTER_LINEAR
|
|
)
|
|
|
|
return resized
|
|
|
|
new_clip = VideoClip(duration=video_clip.duration)
|
|
new_clip.size = (self.target_width, self.target_height)
|
|
new_clip.frame_function = make_frame
|
|
return new_clip
|
|
|
|
def _apply_split_screen(
|
|
self,
|
|
video_clip: VideoFileClip,
|
|
framing_plan: FramingPlan
|
|
) -> VideoClip:
|
|
"""
|
|
Apply split screen for two people (side by side vertical split).
|
|
|
|
Args:
|
|
video_clip: Source video clip
|
|
framing_plan: Framing plan
|
|
|
|
Returns:
|
|
Split screen video clip
|
|
"""
|
|
def make_frame(t):
|
|
frame = video_clip.get_frame(t)
|
|
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
|
|
frame_idx = int(exact_frame_idx)
|
|
|
|
if not framing_plan.frame_contexts:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
|
|
context = framing_plan.frame_contexts[frame_idx]
|
|
|
|
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
|
|
|
|
if context.active_speakers:
|
|
faces = [
|
|
context.detected_faces[idx]
|
|
for idx in context.active_speakers
|
|
if 0 <= idx < len(context.detected_faces)
|
|
][:2]
|
|
else:
|
|
# Use top faces by confidence for stability
|
|
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:2]
|
|
|
|
if len(faces) >= 2:
|
|
# Sort by X position (left to right)
|
|
faces_sorted = sorted(faces, key=lambda f: f.center_x)
|
|
left_face = faces_sorted[0]
|
|
right_face = faces_sorted[1]
|
|
|
|
half_width = self.target_width // 2
|
|
half_aspect = self.target_height / half_width
|
|
|
|
for idx, face in enumerate([left_face, right_face]):
|
|
# Calculate crop region around face
|
|
crop_width = int(face.width * 3) # 3x face width for good framing
|
|
crop_height = int(crop_width * half_aspect)
|
|
|
|
# Clamp to reasonable limits
|
|
crop_width = max(crop_width, frame.shape[1] // 4)
|
|
crop_width = min(crop_width, frame.shape[1])
|
|
crop_height = min(crop_height, frame.shape[0])
|
|
|
|
# Ensure proper aspect ratio
|
|
if crop_height / crop_width > half_aspect:
|
|
crop_height = int(crop_width * half_aspect)
|
|
else:
|
|
crop_width = int(crop_height / half_aspect)
|
|
|
|
# Center crop on face
|
|
x = max(0, min(face.center_x - crop_width // 2, frame.shape[1] - crop_width))
|
|
y = max(0, min(face.center_y - crop_height // 2, frame.shape[0] - crop_height))
|
|
|
|
# Extract and resize
|
|
cropped = frame[y:y + crop_height, x:x + crop_width]
|
|
resized = cv2.resize(cropped, (half_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
x_offset = idx * half_width
|
|
output[:, x_offset:x_offset + half_width] = resized
|
|
else:
|
|
# Fallback to single framing
|
|
if framing_plan.crop_regions:
|
|
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
|
|
crop = framing_plan.crop_regions[crop_idx]
|
|
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
|
|
else:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
output = cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
return output
|
|
|
|
new_clip = VideoClip(duration=video_clip.duration)
|
|
new_clip.size = (self.target_width, self.target_height)
|
|
new_clip.frame_function = make_frame
|
|
return new_clip
|
|
|
|
def _apply_three_person_layout(
|
|
self,
|
|
video_clip: VideoFileClip,
|
|
framing_plan: FramingPlan
|
|
) -> VideoClip:
|
|
"""
|
|
Apply layout for 3 people: 1 on top (full width), 2 on bottom (side by side).
|
|
|
|
Args:
|
|
video_clip: Source video clip
|
|
framing_plan: Framing plan
|
|
|
|
Returns:
|
|
Three-person layout video clip
|
|
"""
|
|
def make_frame(t):
|
|
frame = video_clip.get_frame(t)
|
|
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
|
|
frame_idx = int(exact_frame_idx)
|
|
|
|
if not framing_plan.frame_contexts:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
|
|
context = framing_plan.frame_contexts[frame_idx]
|
|
|
|
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
|
|
|
|
if context.active_speakers:
|
|
faces = [
|
|
context.detected_faces[idx]
|
|
for idx in context.active_speakers
|
|
if 0 <= idx < len(context.detected_faces)
|
|
][:3]
|
|
else:
|
|
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:3] # Max 3 faces
|
|
num_faces = len(faces)
|
|
|
|
if num_faces >= 3:
|
|
# Sort faces by Y position (top to bottom), then X for bottom row
|
|
faces_sorted = sorted(faces, key=lambda f: f.center_y)
|
|
top_face = faces_sorted[0] # Topmost face
|
|
bottom_faces = sorted(faces_sorted[1:], key=lambda f: f.center_x) # Sort bottom by X
|
|
|
|
# Top section: full width, half height
|
|
top_height = self.target_height // 2
|
|
top_width = self.target_width
|
|
top_aspect = top_height / top_width
|
|
|
|
# Crop around top face
|
|
crop_w = int(top_face.width * 3) # 3x face width for context
|
|
crop_h = int(crop_w * top_aspect)
|
|
crop_w = min(crop_w, frame.shape[1])
|
|
crop_h = min(crop_h, frame.shape[0])
|
|
|
|
x = max(0, min(top_face.center_x - crop_w // 2, frame.shape[1] - crop_w))
|
|
y = max(0, min(top_face.center_y - crop_h // 2, frame.shape[0] - crop_h))
|
|
|
|
cropped_top = frame[y:y + crop_h, x:x + crop_w]
|
|
resized_top = cv2.resize(cropped_top, (top_width, top_height), interpolation=cv2.INTER_LINEAR)
|
|
output[0:top_height, :] = resized_top
|
|
|
|
# Bottom section: two halves
|
|
bottom_height = self.target_height - top_height
|
|
half_width = self.target_width // 2
|
|
bottom_aspect = bottom_height / half_width
|
|
|
|
for idx, face in enumerate(bottom_faces[:2]):
|
|
crop_w = int(face.width * 3)
|
|
crop_h = int(crop_w * bottom_aspect)
|
|
crop_w = min(crop_w, frame.shape[1] // 2)
|
|
crop_h = min(crop_h, frame.shape[0])
|
|
|
|
x = max(0, min(face.center_x - crop_w // 2, frame.shape[1] - crop_w))
|
|
y = max(0, min(face.center_y - crop_h // 2, frame.shape[0] - crop_h))
|
|
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
resized = cv2.resize(cropped, (half_width, bottom_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
x_offset = idx * half_width
|
|
output[top_height:, x_offset:x_offset + half_width] = resized
|
|
else:
|
|
# Fallback to single framing
|
|
if framing_plan.crop_regions:
|
|
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
|
|
crop = framing_plan.crop_regions[crop_idx]
|
|
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
|
|
else:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
output = cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
return output
|
|
|
|
new_clip = VideoClip(duration=video_clip.duration)
|
|
new_clip.size = (self.target_width, self.target_height)
|
|
new_clip.frame_function = make_frame
|
|
return new_clip
|
|
|
|
def _apply_grid_layout(
|
|
self,
|
|
video_clip: VideoFileClip,
|
|
framing_plan: FramingPlan
|
|
) -> VideoClip:
|
|
"""
|
|
Apply grid layout for 4 people (2x2 grid).
|
|
Layout: top-left, top-right, bottom-left, bottom-right
|
|
|
|
Args:
|
|
video_clip: Source video clip
|
|
framing_plan: Framing plan
|
|
|
|
Returns:
|
|
Grid layout video clip
|
|
"""
|
|
def make_frame(t):
|
|
frame = video_clip.get_frame(t)
|
|
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
|
|
frame_idx = int(exact_frame_idx)
|
|
|
|
if not framing_plan.frame_contexts:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
|
|
context = framing_plan.frame_contexts[frame_idx]
|
|
|
|
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
|
|
|
|
if context.active_speakers:
|
|
faces = [
|
|
context.detected_faces[idx]
|
|
for idx in context.active_speakers
|
|
if 0 <= idx < len(context.detected_faces)
|
|
][:4]
|
|
else:
|
|
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:4] # Max 4 faces
|
|
num_faces = len(faces)
|
|
|
|
if num_faces >= 4:
|
|
cell_width = self.target_width // 2
|
|
cell_height = self.target_height // 2
|
|
cell_aspect = cell_height / cell_width
|
|
|
|
# Sort faces into grid positions by their actual position
|
|
# First sort by Y (top row vs bottom row), then by X within each row
|
|
sorted_by_y = sorted(faces, key=lambda f: f.center_y)
|
|
top_row = sorted(sorted_by_y[:2], key=lambda f: f.center_x)
|
|
bottom_row = sorted(sorted_by_y[2:], key=lambda f: f.center_x)
|
|
grid_faces = top_row + bottom_row
|
|
|
|
for idx, face in enumerate(grid_faces):
|
|
row = idx // 2
|
|
col = idx % 2
|
|
|
|
# Calculate crop region centered on face
|
|
crop_width = int(face.width * 3) # 3x face width
|
|
crop_height = int(crop_width * cell_aspect)
|
|
|
|
# Clamp to reasonable limits
|
|
crop_width = max(crop_width, frame.shape[1] // 4)
|
|
crop_width = min(crop_width, frame.shape[1])
|
|
crop_height = min(crop_height, frame.shape[0])
|
|
|
|
# Ensure proper aspect ratio
|
|
if crop_height / crop_width > cell_aspect:
|
|
crop_height = int(crop_width * cell_aspect)
|
|
else:
|
|
crop_width = int(crop_height / cell_aspect)
|
|
|
|
# Center crop on face
|
|
x = max(0, min(face.center_x - crop_width // 2, frame.shape[1] - crop_width))
|
|
y = max(0, min(face.center_y - crop_height // 2, frame.shape[0] - crop_height))
|
|
|
|
cropped = frame[y:y + crop_height, x:x + crop_width]
|
|
resized = cv2.resize(cropped, (cell_width, cell_height), interpolation=cv2.INTER_LINEAR)
|
|
|
|
y_offset = row * cell_height
|
|
x_offset = col * cell_width
|
|
output[y_offset:y_offset + cell_height, x_offset:x_offset + cell_width] = resized
|
|
else:
|
|
if framing_plan.crop_regions:
|
|
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
|
|
crop = framing_plan.crop_regions[crop_idx]
|
|
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
|
|
else:
|
|
h, w = frame.shape[:2]
|
|
crop_h = int(w * self.target_aspect)
|
|
crop_w = w
|
|
if crop_h > h:
|
|
crop_h = h
|
|
crop_w = int(h / self.target_aspect)
|
|
y = (h - crop_h) // 2
|
|
x = (w - crop_w) // 2
|
|
cropped = frame[y:y + crop_h, x:x + crop_w]
|
|
output = cv2.resize(
|
|
cropped,
|
|
(self.target_width, self.target_height),
|
|
interpolation=cv2.INTER_LINEAR
|
|
)
|
|
|
|
return output
|
|
|
|
new_clip = VideoClip(duration=video_clip.duration)
|
|
new_clip.size = (self.target_width, self.target_height)
|
|
new_clip.frame_function = make_frame
|
|
return new_clip
|
|
|
|
|
|
def extract_audio_samples(video_path: str, start_time: float, end_time: float) -> Optional[np.ndarray]:
|
|
"""
|
|
Extract audio samples from video for speech detection.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
start_time: Start time in seconds
|
|
end_time: End time in seconds
|
|
|
|
Returns:
|
|
Audio samples array or None if no audio
|
|
"""
|
|
try:
|
|
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
|
|
|
with AudioFileClip(video_path) as audio:
|
|
segment = audio.subclipped(start_time, end_time)
|
|
fps = getattr(segment, 'fps', 44100)
|
|
samples = segment.to_soundarray(fps=fps)
|
|
return samples
|
|
except Exception as exc:
|
|
logger.warning(f"Failed to extract audio: {exc}")
|
|
return None
|