Files
video-render/video_render/smart_framing.py

1154 lines
43 KiB
Python

"""
Smart framing module for intelligent video cropping and composition.
This module provides functionality to create 9:16 vertical videos with
intelligent framing that follows the action and speakers.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple
import cv2
import numpy as np
from moviepy.video.VideoClip import VideoClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from scipy import signal
from video_render.context_detection import ContextAnalyzer, FrameContext, FaceDetection, GroupBoundingBox
logger = logging.getLogger(__name__)
@dataclass
class CropRegion:
"""Defines a crop region for a frame."""
x: int
y: int
width: int
height: int
@dataclass
class FramingPlan:
"""Complete framing plan for a video segment."""
frame_contexts: List[FrameContext]
crop_regions: List[CropRegion]
layout_mode: str
fps: float
class SmartFramer:
"""Creates intelligent 9:16 framing for horizontal videos with multi-person support."""
def __init__(
self,
target_width: int = 1080,
target_height: int = 1920,
frame_skip: int = 1,
smoothing_window: int = 30,
max_velocity: int = 25,
person_switch_cooldown: int = 30,
response_time: float = 0.6,
group_padding: float = 0.15,
max_zoom_out: float = 2.0,
dead_zone: int = 100,
min_face_confidence: float = 0.3
):
self.target_width = target_width
self.target_height = target_height
self.target_aspect = target_height / target_width
self.frame_skip = frame_skip
self.smoothing_window = smoothing_window
self.max_velocity = max_velocity
self.person_switch_cooldown = person_switch_cooldown
self.response_time = response_time
self.group_padding = group_padding
self.max_zoom_out = max_zoom_out
self.dead_zone = dead_zone
self.min_face_confidence = min_face_confidence
self.position_history_size = 45
self.hysteresis_frames = 8
logger.info(f"Smart framer initialized (target: {target_width}x{target_height}, response_time={response_time}s, max_velocity={max_velocity}, dead_zone={dead_zone})")
def create_framing_plan(
self,
video_path: str,
start_time: float,
end_time: float,
audio_samples: Optional[np.ndarray] = None
) -> FramingPlan:
"""
Analyze video and create a complete framing plan.
Args:
video_path: Path to video file
start_time: Start time in seconds
end_time: End time in seconds
audio_samples: Optional audio samples for speech detection
Returns:
FramingPlan with all frame contexts and crop regions
"""
analyzer = ContextAnalyzer(
person_switch_cooldown=self.person_switch_cooldown,
min_face_confidence=self.min_face_confidence
)
speaking_periods = None
if audio_samples is not None:
speaking_periods = analyzer.audio_detector.detect_speaking_periods(audio_samples)
import os
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'loglevel;quiet'
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_contexts = []
frame_number = start_frame
processed_count = 0
logger.info(f"Analyzing frames {start_frame} to {end_frame} (fps={fps}, skip={self.frame_skip})")
while frame_number < end_frame:
ret, frame = cap.read()
if not ret:
break
if processed_count % self.frame_skip == 0:
timestamp = frame_number / fps
context = analyzer.analyze_frame(frame, timestamp, frame_number, speaking_periods)
frame_contexts.append(context)
frame_number += 1
processed_count += 1
source_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
source_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
cap.release()
analyzer.close()
layout_modes = [ctx.layout_mode for ctx in frame_contexts]
if layout_modes:
overall_layout = max(set(layout_modes), key=layout_modes.count)
else:
overall_layout = "single"
crop_regions = self._calculate_crop_regions(
frame_contexts,
source_width,
source_height,
fps=fps
)
framing_plan = FramingPlan(
frame_contexts=frame_contexts,
crop_regions=crop_regions,
layout_mode=overall_layout,
fps=fps
)
import gc
gc.collect()
return framing_plan
def _segment_by_face_detection(
self,
has_face_flags: List[bool],
min_segment_frames: int = 10
) -> List[Tuple[int, int, bool]]:
"""
Segment the video into continuous regions with/without face.
Returns list of (start_idx, end_idx, has_face) tuples.
Small segments are merged with neighbors.
"""
if not has_face_flags:
return []
segments = []
start_idx = 0
current_state = has_face_flags[0]
for i in range(1, len(has_face_flags)):
if has_face_flags[i] != current_state:
segments.append((start_idx, i - 1, current_state))
start_idx = i
current_state = has_face_flags[i]
segments.append((start_idx, len(has_face_flags) - 1, current_state))
merged = []
for seg in segments:
start, end, has_face = seg
length = end - start + 1
if length < min_segment_frames and merged:
prev_start, prev_end, prev_has_face = merged[-1]
merged[-1] = (prev_start, end, prev_has_face)
else:
merged.append(seg)
return merged
def _interpolate_smooth(
self,
positions: List[float],
segments: List[Tuple[int, int, bool]],
transition_frames: int = 15
) -> List[float]:
"""
Create smooth transitions between segments using cosine interpolation.
Within each segment, position is constant. Between segments, smooth transition.
"""
if not positions or not segments:
return positions
result = list(positions)
segment_values = []
for start, end, has_face in segments:
seg_positions = positions[start:end + 1]
if seg_positions:
segment_values.append(float(np.median(seg_positions)))
else:
segment_values.append(positions[start] if start < len(positions) else 0.0)
for i, (start, end, has_face) in enumerate(segments):
value = segment_values[i]
for j in range(start, end + 1):
result[j] = value
for i in range(len(segments) - 1):
seg1_start, seg1_end, _ = segments[i]
seg2_start, seg2_end, _ = segments[i + 1]
val1 = segment_values[i]
val2 = segment_values[i + 1]
if abs(val2 - val1) < self.dead_zone * 0.5:
continue
trans_start = max(seg1_end - transition_frames // 2, seg1_start)
trans_end = min(seg2_start + transition_frames // 2, seg2_end)
trans_length = trans_end - trans_start + 1
if trans_length < 2:
continue
for j in range(trans_length):
t = j / (trans_length - 1)
smooth_t = 0.5 - 0.5 * np.cos(t * np.pi)
idx = trans_start + j
if 0 <= idx < len(result):
result[idx] = val1 + (val2 - val1) * smooth_t
return result
def _apply_savgol_filter(
self,
positions: List[float],
window_length: int = 61,
polyorder: int = 2
) -> List[float]:
"""
Apply Savitzky-Golay filter for ultra-smooth position tracking.
This is a signal processing filter that preserves trends while removing noise.
"""
if len(positions) < window_length:
window_length = len(positions) if len(positions) % 2 == 1 else len(positions) - 1
if window_length < 3:
return positions
if window_length % 2 == 0:
window_length -= 1
if window_length <= polyorder:
polyorder = max(1, window_length - 1)
try:
smoothed = signal.savgol_filter(positions, window_length, polyorder, mode='nearest')
return smoothed.tolist()
except Exception as e:
logger.warning(f"Savgol filter failed: {e}, returning original positions")
return positions
def _apply_median_filter(self, positions: List[float], window_size: int = 5) -> List[float]:
"""
Apply median filter to remove detection noise.
Median filter is ideal for removing outliers while preserving
edges (real movements). Window size of 5 means each position
is replaced by the median of itself and 2 neighbors on each side.
Args:
positions: Raw positions from detection
window_size: Window size (must be odd), default 5
Returns:
Filtered positions with noise removed
"""
if len(positions) < window_size:
return positions
from scipy.signal import medfilt
if window_size % 2 == 0:
window_size += 1
filtered = medfilt(positions, kernel_size=window_size)
return filtered.tolist()
def _is_detection_stable(self, has_face_flags: List[bool], window_size: int = 30) -> bool:
"""
Check if face detection is stable enough to use smart framing.
If detection is too unstable (frequent changes), it's better to use static center crop.
Args:
has_face_flags: Boolean flags indicating if face was detected per frame
window_size: Number of frames to analyze for stability
Returns:
True if detection is stable, False if too unstable
"""
if len(has_face_flags) < window_size:
window_size = len(has_face_flags)
if window_size == 0:
return False
changes = 0
for i in range(1, len(has_face_flags)):
if has_face_flags[i] != has_face_flags[i-1]:
changes += 1
change_rate = changes / len(has_face_flags)
return change_rate < 0.3
def _stabilize_no_face_sequences(
self,
positions: List[float],
has_face_flags: List[bool],
source_center: float = None
) -> List[float]:
"""
Stabilize positions during sequences without face detection.
Uses median of all valid positions for maximum stability.
"""
if len(positions) != len(has_face_flags):
return positions
fallback = source_center if source_center else (positions[0] if positions else 0.0)
face_ratio = sum(has_face_flags) / len(has_face_flags) if has_face_flags else 0
if face_ratio < 0.15:
return [fallback] * len(positions)
changes = sum(1 for i in range(1, len(has_face_flags)) if has_face_flags[i] != has_face_flags[i-1])
instability_ratio = changes / len(has_face_flags) if has_face_flags else 0
if instability_ratio > 0.25:
valid_positions = [positions[i] for i, has_face in enumerate(has_face_flags) if has_face]
if valid_positions:
return [float(np.median(valid_positions))] * len(positions)
return [fallback] * len(positions)
valid_positions = [positions[i] for i, has_face in enumerate(has_face_flags) if has_face]
if not valid_positions:
return [fallback] * len(positions)
global_median = float(np.median(valid_positions))
stabilized = list(positions)
i = 0
while i < len(has_face_flags):
if not has_face_flags[i]:
start_idx = i
recent_valid = []
for j in range(max(0, start_idx - self.position_history_size), start_idx):
if has_face_flags[j]:
recent_valid.append(positions[j])
lock_value = float(np.median(recent_valid)) if len(recent_valid) >= 5 else global_median
while i < len(has_face_flags) and not has_face_flags[i]:
stabilized[i] = lock_value
i += 1
else:
i += 1
return stabilized
def _calculate_crop_regions(
self,
contexts: List[FrameContext],
source_width: int,
source_height: int,
fps: Optional[float] = None
) -> List[CropRegion]:
"""
Calculate smooth crop regions for each frame with multi-person support.
Args:
contexts: List of frame contexts
source_width: Source video width
source_height: Source video height
Returns:
List of crop regions
"""
if not contexts:
return []
source_aspect = source_width / source_height
# Calculate base crop dimensions for 9:16
if source_aspect > self.target_aspect:
base_crop_height = source_height
base_crop_width = int(base_crop_height / self.target_aspect)
if base_crop_width > source_width:
base_crop_width = source_width
base_crop_height = int(base_crop_width * self.target_aspect)
else:
base_crop_width = source_width
base_crop_height = int(base_crop_width * self.target_aspect)
if base_crop_height > source_height:
base_crop_height = source_height
base_crop_width = int(base_crop_height / self.target_aspect)
center_xs = []
center_ys = []
zoom_factors = []
has_face_flags = []
static_center_x = float(source_width // 2)
static_center_y = float(source_height // 2)
last_valid_x = static_center_x
last_valid_y = static_center_y
last_valid_zoom = 1.0
for ctx in contexts:
selected_face = None
if ctx.selected_people:
idx = ctx.selected_people[0]
if 0 <= idx < len(ctx.detected_faces):
selected_face = ctx.detected_faces[idx]
if selected_face:
center_x = float(selected_face.center_x)
center_y = float(selected_face.center_y)
center_xs.append(center_x)
center_ys.append(center_y)
required_width = selected_face.width * (1 + self.group_padding * 2)
required_height = selected_face.height * (1 + self.group_padding * 3)
zoom_w = required_width / base_crop_width
zoom_h = required_height / base_crop_height
zoom = max(zoom_w, zoom_h, 1.0)
zoom = min(zoom, self.max_zoom_out)
zoom_factors.append(zoom)
last_valid_x = center_x
last_valid_y = center_y
last_valid_zoom = zoom
has_face_flags.append(True)
elif ctx.group_bounds and ctx.group_bounds.face_count > 0:
group = ctx.group_bounds
center_x = float(group.center_x)
center_y = float(group.center_y)
center_xs.append(center_x)
center_ys.append(center_y)
required_width = group.width * (1 + self.group_padding * 2)
required_height = group.height * (1 + self.group_padding * 3)
zoom_w = required_width / base_crop_width
zoom_h = required_height / base_crop_height
zoom = max(zoom_w, zoom_h, 1.0)
zoom = min(zoom, self.max_zoom_out)
zoom_factors.append(zoom)
last_valid_x = center_x
last_valid_y = center_y
last_valid_zoom = zoom
has_face_flags.append(True)
elif ctx.primary_focus and len(ctx.detected_faces) > 0:
center_x = float(ctx.primary_focus[0])
center_y = float(ctx.primary_focus[1])
center_xs.append(center_x)
center_ys.append(center_y)
zoom_factors.append(1.0)
last_valid_x = center_x
last_valid_y = center_y
last_valid_zoom = 1.0
has_face_flags.append(True)
else:
center_xs.append(last_valid_x)
center_ys.append(last_valid_y)
zoom_factors.append(last_valid_zoom)
has_face_flags.append(False)
center_x_video = float(source_width // 2)
center_y_video = float(source_height // 2)
if not self._is_detection_stable(has_face_flags):
final_xs = [center_x_video] * len(center_xs)
final_ys = [center_y_video] * len(center_ys)
final_zooms = [1.0] * len(zoom_factors)
else:
center_xs = self._stabilize_no_face_sequences(
center_xs,
has_face_flags,
source_center=center_x_video
)
center_ys = self._stabilize_no_face_sequences(
center_ys,
has_face_flags,
source_center=center_y_video
)
zoom_factors = self._stabilize_no_face_sequences(
zoom_factors,
has_face_flags,
source_center=1.0
)
face_count = sum(has_face_flags)
if face_count < len(has_face_flags) * 0.3:
final_xs = [center_x_video] * len(center_xs)
final_ys = [center_y_video] * len(center_ys)
final_zooms = [1.0] * len(zoom_factors)
else:
valid_xs = [center_xs[i] for i, has_face in enumerate(has_face_flags) if has_face]
valid_ys = [center_ys[i] for i, has_face in enumerate(has_face_flags) if has_face]
valid_zooms = [zoom_factors[i] for i, has_face in enumerate(has_face_flags) if has_face]
target_x = float(np.median(valid_xs)) if valid_xs else center_x_video
target_y = float(np.median(valid_ys)) if valid_ys else center_y_video
target_zoom = float(np.median(valid_zooms)) if valid_zooms else 1.0
for i in range(len(center_xs)):
if not has_face_flags[i]:
center_xs[i] = target_x
center_ys[i] = target_y
zoom_factors[i] = target_zoom
final_xs = self._apply_savgol_filter(center_xs, window_length=61, polyorder=2)
final_ys = self._apply_savgol_filter(center_ys, window_length=61, polyorder=2)
final_zooms = self._apply_savgol_filter(zoom_factors, window_length=61, polyorder=2)
if fps and self.response_time > 0:
dt = self.frame_skip / fps
alpha = 1 - np.exp(-dt / self.response_time)
final_xs = self._apply_exponential_smoothing(final_xs, alpha)
final_ys = self._apply_exponential_smoothing(final_ys, alpha)
final_zooms = self._apply_exponential_smoothing(final_zooms, alpha)
# Generate crop regions
crop_regions = []
for cx, cy, zoom in zip(final_xs, final_ys, final_zooms):
# Calculate actual crop size with zoom
crop_width = int(base_crop_width * zoom)
crop_height = int(base_crop_height * zoom)
# Clamp to source dimensions
crop_width = min(crop_width, source_width)
crop_height = min(crop_height, source_height)
# Maintain aspect ratio after clamping
if crop_width / crop_height > base_crop_width / base_crop_height:
crop_width = int(crop_height * base_crop_width / base_crop_height)
else:
crop_height = int(crop_width * base_crop_height / base_crop_width)
# Calculate top-left corner
x = int(cx - crop_width // 2)
y = int(cy - crop_height // 2)
# Keep within bounds
x = max(0, min(x, source_width - crop_width))
y = max(0, min(y, source_height - crop_height))
crop_regions.append(CropRegion(
x=x,
y=y,
width=crop_width,
height=crop_height
))
# Clear temporary lists
center_xs.clear()
center_ys.clear()
zoom_factors.clear()
return crop_regions
def _apply_exponential_smoothing(self, positions: List[float], alpha: float) -> List[float]:
"""
Smooth positions with exponential moving average.
"""
if not positions:
return positions
alpha = max(0.0, min(alpha, 1.0))
smoothed = [positions[0]]
for i in range(1, len(positions)):
prev = smoothed[-1]
smoothed.append(prev + alpha * (positions[i] - prev))
return smoothed
def _apply_dead_zone(self, positions: List[float], threshold: float) -> List[float]:
"""
Apply dead zone to eliminate micro-movements.
If change is smaller than threshold, keep previous position.
Args:
positions: List of positions
threshold: Minimum change needed to move (pixels)
Returns:
Positions with dead zone applied
"""
if len(positions) <= 1:
return positions
filtered = [positions[0]]
for i in range(1, len(positions)):
delta = abs(positions[i] - filtered[i - 1])
if delta < threshold:
filtered.append(filtered[i - 1])
else:
filtered.append(positions[i])
return filtered
def _limit_velocity(self, positions: List[float], max_velocity: float) -> List[float]:
"""
Limit the velocity of position changes.
Args:
positions: List of positions
max_velocity: Maximum allowed change per frame
Returns:
Smoothed positions
"""
if len(positions) <= 1:
return positions
limited = [positions[0]]
for i in range(1, len(positions)):
delta = positions[i] - limited[i - 1]
if abs(delta) > max_velocity:
delta = max_velocity if delta > 0 else -max_velocity
limited.append(limited[i - 1] + delta)
return limited
def apply_framing(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply smart framing to a video clip.
Automatically selects layout based on number of people detected.
Layouts:
- 1 person: Single framing (follow person)
- 2 people: Vertical split screen (side by side)
- 3 people: 1 on top, 2 on bottom
- 4 people: 2x2 grid
Args:
video_clip: Source video clip
framing_plan: Framing plan to apply
Returns:
Reframed video clip
"""
# Determine predominant number of faces across all frames
if not framing_plan.frame_contexts:
return self._apply_single_framing(video_clip, framing_plan)
face_counts = []
for ctx in framing_plan.frame_contexts:
if ctx.active_speakers:
face_counts.append(len(ctx.active_speakers))
elif ctx.group_bounds:
face_counts.append(ctx.group_bounds.face_count)
else:
face_counts.append(len(ctx.detected_faces))
# Use mode (most common) face count, minimum 1
if face_counts:
from collections import Counter
count_freq = Counter(face_counts)
# Get the most common count, but ignore 0
non_zero_counts = {k: v for k, v in count_freq.items() if k > 0}
if non_zero_counts:
predominant_faces = max(non_zero_counts, key=non_zero_counts.get)
else:
predominant_faces = 1
else:
predominant_faces = 1
logger.info(f"Layout selection: predominant_faces={predominant_faces}")
if predominant_faces == 1:
return self._apply_single_framing(video_clip, framing_plan)
elif predominant_faces == 2:
return self._apply_split_screen(video_clip, framing_plan)
elif predominant_faces == 3:
return self._apply_three_person_layout(video_clip, framing_plan)
else: # 4 or more
return self._apply_grid_layout(video_clip, framing_plan)
def _apply_single_framing(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply single-focus framing (following one person or action).
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Reframed video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
if not framing_plan.crop_regions:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
else:
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
last_idx = len(framing_plan.crop_regions) - 1
if last_idx <= 0:
crop = framing_plan.crop_regions[0]
x, y, width, height = crop.x, crop.y, crop.width, crop.height
else:
exact_frame_idx = max(0.0, min(exact_frame_idx, float(last_idx)))
low_idx = int(np.floor(exact_frame_idx))
high_idx = min(low_idx + 1, last_idx)
alpha = exact_frame_idx - low_idx
crop_a = framing_plan.crop_regions[low_idx]
crop_b = framing_plan.crop_regions[high_idx]
x = int(round(crop_a.x + (crop_b.x - crop_a.x) * alpha))
y = int(round(crop_a.y + (crop_b.y - crop_a.y) * alpha))
width = int(round(crop_a.width + (crop_b.width - crop_a.width) * alpha))
height = int(round(crop_a.height + (crop_b.height - crop_a.height) * alpha))
h, w = frame.shape[:2]
x = max(0, min(x, w - width))
y = max(0, min(y, h - height))
width = min(width, w - x)
height = min(height, h - y)
cropped = frame[y:y + height, x:x + width]
resized = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
return resized
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def _apply_split_screen(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply split screen for two people (side by side vertical split).
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Split screen video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
frame_idx = int(exact_frame_idx)
if not framing_plan.frame_contexts:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
context = framing_plan.frame_contexts[frame_idx]
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
if context.active_speakers:
faces = [
context.detected_faces[idx]
for idx in context.active_speakers
if 0 <= idx < len(context.detected_faces)
][:2]
else:
# Use top faces by confidence for stability
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:2]
if len(faces) >= 2:
# Sort by X position (left to right)
faces_sorted = sorted(faces, key=lambda f: f.center_x)
left_face = faces_sorted[0]
right_face = faces_sorted[1]
half_width = self.target_width // 2
half_aspect = self.target_height / half_width
for idx, face in enumerate([left_face, right_face]):
# Calculate crop region around face
crop_width = int(face.width * 3) # 3x face width for good framing
crop_height = int(crop_width * half_aspect)
# Clamp to reasonable limits
crop_width = max(crop_width, frame.shape[1] // 4)
crop_width = min(crop_width, frame.shape[1])
crop_height = min(crop_height, frame.shape[0])
# Ensure proper aspect ratio
if crop_height / crop_width > half_aspect:
crop_height = int(crop_width * half_aspect)
else:
crop_width = int(crop_height / half_aspect)
# Center crop on face
x = max(0, min(face.center_x - crop_width // 2, frame.shape[1] - crop_width))
y = max(0, min(face.center_y - crop_height // 2, frame.shape[0] - crop_height))
# Extract and resize
cropped = frame[y:y + crop_height, x:x + crop_width]
resized = cv2.resize(cropped, (half_width, self.target_height), interpolation=cv2.INTER_LINEAR)
x_offset = idx * half_width
output[:, x_offset:x_offset + half_width] = resized
else:
# Fallback to single framing
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
return output
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def _apply_three_person_layout(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply layout for 3 people: 1 on top (full width), 2 on bottom (side by side).
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Three-person layout video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
frame_idx = int(exact_frame_idx)
if not framing_plan.frame_contexts:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
context = framing_plan.frame_contexts[frame_idx]
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
if context.active_speakers:
faces = [
context.detected_faces[idx]
for idx in context.active_speakers
if 0 <= idx < len(context.detected_faces)
][:3]
else:
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:3] # Max 3 faces
num_faces = len(faces)
if num_faces >= 3:
# Sort faces by Y position (top to bottom), then X for bottom row
faces_sorted = sorted(faces, key=lambda f: f.center_y)
top_face = faces_sorted[0] # Topmost face
bottom_faces = sorted(faces_sorted[1:], key=lambda f: f.center_x) # Sort bottom by X
# Top section: full width, half height
top_height = self.target_height // 2
top_width = self.target_width
top_aspect = top_height / top_width
# Crop around top face
crop_w = int(top_face.width * 3) # 3x face width for context
crop_h = int(crop_w * top_aspect)
crop_w = min(crop_w, frame.shape[1])
crop_h = min(crop_h, frame.shape[0])
x = max(0, min(top_face.center_x - crop_w // 2, frame.shape[1] - crop_w))
y = max(0, min(top_face.center_y - crop_h // 2, frame.shape[0] - crop_h))
cropped_top = frame[y:y + crop_h, x:x + crop_w]
resized_top = cv2.resize(cropped_top, (top_width, top_height), interpolation=cv2.INTER_LINEAR)
output[0:top_height, :] = resized_top
# Bottom section: two halves
bottom_height = self.target_height - top_height
half_width = self.target_width // 2
bottom_aspect = bottom_height / half_width
for idx, face in enumerate(bottom_faces[:2]):
crop_w = int(face.width * 3)
crop_h = int(crop_w * bottom_aspect)
crop_w = min(crop_w, frame.shape[1] // 2)
crop_h = min(crop_h, frame.shape[0])
x = max(0, min(face.center_x - crop_w // 2, frame.shape[1] - crop_w))
y = max(0, min(face.center_y - crop_h // 2, frame.shape[0] - crop_h))
cropped = frame[y:y + crop_h, x:x + crop_w]
resized = cv2.resize(cropped, (half_width, bottom_height), interpolation=cv2.INTER_LINEAR)
x_offset = idx * half_width
output[top_height:, x_offset:x_offset + half_width] = resized
else:
# Fallback to single framing
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
return output
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def _apply_grid_layout(
self,
video_clip: VideoFileClip,
framing_plan: FramingPlan
) -> VideoClip:
"""
Apply grid layout for 4 people (2x2 grid).
Layout: top-left, top-right, bottom-left, bottom-right
Args:
video_clip: Source video clip
framing_plan: Framing plan
Returns:
Grid layout video clip
"""
def make_frame(t):
frame = video_clip.get_frame(t)
exact_frame_idx = (t * framing_plan.fps) / self.frame_skip
frame_idx = int(exact_frame_idx)
if not framing_plan.frame_contexts:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
return cv2.resize(cropped, (self.target_width, self.target_height), interpolation=cv2.INTER_LINEAR)
frame_idx = max(0, min(frame_idx, len(framing_plan.frame_contexts) - 1))
context = framing_plan.frame_contexts[frame_idx]
output = np.zeros((self.target_height, self.target_width, 3), dtype=np.uint8)
if context.active_speakers:
faces = [
context.detected_faces[idx]
for idx in context.active_speakers
if 0 <= idx < len(context.detected_faces)
][:4]
else:
faces = sorted(context.detected_faces, key=lambda f: f.confidence, reverse=True)[:4] # Max 4 faces
num_faces = len(faces)
if num_faces >= 4:
cell_width = self.target_width // 2
cell_height = self.target_height // 2
cell_aspect = cell_height / cell_width
# Sort faces into grid positions by their actual position
# First sort by Y (top row vs bottom row), then by X within each row
sorted_by_y = sorted(faces, key=lambda f: f.center_y)
top_row = sorted(sorted_by_y[:2], key=lambda f: f.center_x)
bottom_row = sorted(sorted_by_y[2:], key=lambda f: f.center_x)
grid_faces = top_row + bottom_row
for idx, face in enumerate(grid_faces):
row = idx // 2
col = idx % 2
# Calculate crop region centered on face
crop_width = int(face.width * 3) # 3x face width
crop_height = int(crop_width * cell_aspect)
# Clamp to reasonable limits
crop_width = max(crop_width, frame.shape[1] // 4)
crop_width = min(crop_width, frame.shape[1])
crop_height = min(crop_height, frame.shape[0])
# Ensure proper aspect ratio
if crop_height / crop_width > cell_aspect:
crop_height = int(crop_width * cell_aspect)
else:
crop_width = int(crop_height / cell_aspect)
# Center crop on face
x = max(0, min(face.center_x - crop_width // 2, frame.shape[1] - crop_width))
y = max(0, min(face.center_y - crop_height // 2, frame.shape[0] - crop_height))
cropped = frame[y:y + crop_height, x:x + crop_width]
resized = cv2.resize(cropped, (cell_width, cell_height), interpolation=cv2.INTER_LINEAR)
y_offset = row * cell_height
x_offset = col * cell_width
output[y_offset:y_offset + cell_height, x_offset:x_offset + cell_width] = resized
else:
if framing_plan.crop_regions:
crop_idx = max(0, min(frame_idx, len(framing_plan.crop_regions) - 1))
crop = framing_plan.crop_regions[crop_idx]
cropped = frame[crop.y:crop.y + crop.height, crop.x:crop.x + crop.width]
else:
h, w = frame.shape[:2]
crop_h = int(w * self.target_aspect)
crop_w = w
if crop_h > h:
crop_h = h
crop_w = int(h / self.target_aspect)
y = (h - crop_h) // 2
x = (w - crop_w) // 2
cropped = frame[y:y + crop_h, x:x + crop_w]
output = cv2.resize(
cropped,
(self.target_width, self.target_height),
interpolation=cv2.INTER_LINEAR
)
return output
new_clip = VideoClip(duration=video_clip.duration)
new_clip.size = (self.target_width, self.target_height)
new_clip.frame_function = make_frame
return new_clip
def extract_audio_samples(video_path: str, start_time: float, end_time: float) -> Optional[np.ndarray]:
"""
Extract audio samples from video for speech detection.
Args:
video_path: Path to video file
start_time: Start time in seconds
end_time: End time in seconds
Returns:
Audio samples array or None if no audio
"""
try:
from moviepy.audio.io.AudioFileClip import AudioFileClip
with AudioFileClip(video_path) as audio:
segment = audio.subclipped(start_time, end_time)
fps = getattr(segment, 'fps', 44100)
samples = segment.to_soundarray(fps=fps)
return samples
except Exception as exc:
logger.warning(f"Failed to extract audio: {exc}")
return None