""" Context detection module for video analysis. This module provides functionality to detect faces, track people, and identify who is speaking in video content using MediaPipe and audio analysis. """ from __future__ import annotations import logging from dataclasses import dataclass from typing import List, Optional, Tuple import cv2 import mediapipe as mp import numpy as np from scipy import signal logger = logging.getLogger(__name__) @dataclass class FaceDetection: """Represents a detected face in a frame.""" x: int y: int width: int height: int confidence: float center_x: int center_y: int landmarks: Optional[List[Tuple[int, int]]] = None @dataclass class PersonTracking: """Tracks a person across frames.""" person_id: int face: FaceDetection is_speaking: bool speaking_confidence: float frame_number: int @dataclass class FrameContext: """Context information for a video frame.""" frame_number: int timestamp: float detected_faces: List[FaceDetection] active_speakers: List[int] # indices of speaking faces primary_focus: Optional[Tuple[int, int]] # (x, y) center point layout_mode: str # "single", "dual_split", "grid" class MediaPipeDetector: """Face and pose detection using MediaPipe.""" def __init__(self, min_detection_confidence: float = 0.5, min_tracking_confidence: float = 0.5): self.min_detection_confidence = min_detection_confidence self.min_tracking_confidence = min_tracking_confidence self.mp_face_detection = mp.solutions.face_detection self.mp_face_mesh = mp.solutions.face_mesh self.face_detection = self.mp_face_detection.FaceDetection( min_detection_confidence=min_detection_confidence, model_selection=1 ) self.face_mesh = self.mp_face_mesh.FaceMesh( max_num_faces=5, min_detection_confidence=min_detection_confidence, min_tracking_confidence=min_tracking_confidence, static_image_mode=False ) logger.info("MediaPipe detector initialized") def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces in a frame. Args: frame: RGB image array Returns: List of detected faces """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_detection.process(frame_rgb) faces = [] if results.detections: for detection in results.detections: bbox = detection.location_data.relative_bounding_box x = int(bbox.xmin * width) y = int(bbox.ymin * height) w = int(bbox.width * width) h = int(bbox.height * height) x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 confidence = detection.score[0] if detection.score else 0.0 faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=confidence, center_x=center_x, center_y=center_y )) return faces def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces with landmarks for lip sync detection. Args: frame: RGB image array Returns: List of detected faces with landmark information """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(frame_rgb) faces = [] if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: xs = [lm.x for lm in face_landmarks.landmark] ys = [lm.y for lm in face_landmarks.landmark] x_min, x_max = min(xs), max(xs) y_min, y_max = min(ys), max(ys) x = int(x_min * width) y = int(y_min * height) w = int((x_max - x_min) * width) h = int((y_max - y_min) * height) center_x = x + w // 2 center_y = y + h // 2 lip_landmarks = [] for idx in [13, 14, 78, 308]: lm = face_landmarks.landmark[idx] lip_landmarks.append((int(lm.x * width), int(lm.y * height))) faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=1.0, center_x=center_x, center_y=center_y, landmarks=lip_landmarks )) return faces def close(self): """Release MediaPipe resources.""" self.face_detection.close() self.face_mesh.close() class AudioActivityDetector: """Detects speech activity in audio.""" def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30): self.sample_rate = sample_rate self.frame_duration_ms = frame_duration_ms self.frame_size = int(sample_rate * frame_duration_ms / 1000) logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)") def detect_speaking_periods( self, audio_samples: np.ndarray, threshold: float = 0.02, min_speech_duration: float = 0.1 ) -> List[Tuple[float, float]]: """ Detect periods of speech in audio. Args: audio_samples: Audio samples array threshold: Energy threshold for speech detection min_speech_duration: Minimum duration of speech in seconds Returns: List of (start_time, end_time) tuples in seconds """ if audio_samples.ndim > 1: audio_samples = audio_samples.mean(axis=1) energies = [] for i in range(0, len(audio_samples), self.frame_size): frame = audio_samples[i:i + self.frame_size] if len(frame) > 0: energy = np.sqrt(np.mean(frame ** 2)) energies.append(energy) speaking_frames = [e > threshold for e in energies] periods = [] start_frame = None for i, is_speaking in enumerate(speaking_frames): if is_speaking and start_frame is None: start_frame = i elif not is_speaking and start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = i * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) start_frame = None if start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = len(speaking_frames) * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) return periods def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool: """Check if there is speech activity at a given time.""" for start, end in speaking_periods: if start <= time <= end: return True return False class ContextAnalyzer: """Analyzes video context to determine focus and layout.""" def __init__(self): self.detector = MediaPipeDetector() self.audio_detector = AudioActivityDetector() self.previous_faces: List[FaceDetection] = [] logger.info("Context analyzer initialized") def analyze_frame( self, frame: np.ndarray, timestamp: float, frame_number: int, speaking_periods: Optional[List[Tuple[float, float]]] = None ) -> FrameContext: """ Analyze a single frame to extract context information. Args: frame: Video frame (BGR format from OpenCV) timestamp: Frame timestamp in seconds frame_number: Frame index speaking_periods: List of (start, end) times where speech is detected Returns: FrameContext with detection results """ faces = self.detector.detect_face_landmarks(frame) if not faces: faces = self.detector.detect_faces(frame) # Determine who is speaking active_speakers = [] for i, face in enumerate(faces): is_speaking = False if speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp): is_speaking = True if face.landmarks and len(self.previous_faces) > i: is_speaking = is_speaking or self._detect_lip_movement(face, self.previous_faces[i]) if is_speaking: active_speakers.append(i) num_faces = len(faces) num_speakers = len(active_speakers) if num_faces == 0: layout_mode = "single" elif num_faces == 1: layout_mode = "single" elif num_faces == 2: layout_mode = "dual_split" elif num_faces >= 3: layout_mode = "dual_split" else: layout_mode = "single" primary_focus = self._calculate_focus_point(faces, active_speakers) self.previous_faces = faces return FrameContext( frame_number=frame_number, timestamp=timestamp, detected_faces=faces, active_speakers=active_speakers, primary_focus=primary_focus, layout_mode=layout_mode ) def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool: """ Detect lip movement by comparing landmarks between frames. Args: current_face: Current frame face detection previous_face: Previous frame face detection Returns: True if significant lip movement detected """ if not current_face.landmarks or not previous_face.landmarks: return False def lip_distance(landmarks): if len(landmarks) < 4: return 0 upper = np.array(landmarks[0:2]) lower = np.array(landmarks[2:4]) return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0)) current_dist = lip_distance(current_face.landmarks) previous_dist = lip_distance(previous_face.landmarks) threshold = 2.0 return abs(current_dist - previous_dist) > threshold def _calculate_focus_point( self, faces: List[FaceDetection], active_speakers: List[int] ) -> Optional[Tuple[int, int]]: """ Calculate the primary focus point based on detected faces and speakers. IMPORTANT: This focuses on ONE person to avoid focusing on empty space (table). When multiple people are present, we pick the most relevant person, not average positions. Args: faces: List of detected faces active_speakers: Indices of faces that are speaking Returns: (x, y) tuple of focus center, or None if no faces """ if not faces: return None if active_speakers: speaker_faces = [faces[i] for i in active_speakers if i < len(faces)] if speaker_faces: primary_speaker = max(speaker_faces, key=lambda f: f.confidence) return (primary_speaker.center_x, primary_speaker.center_y) most_confident = max(faces, key=lambda f: f.confidence) return (most_confident.center_x, most_confident.center_y) def close(self): """Release resources.""" self.detector.close()