""" Context detection module for video analysis. This module provides functionality to detect faces, track people, and identify who is speaking in video content using MediaPipe and audio analysis. """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import List, Optional, Tuple import cv2 import mediapipe as mp import numpy as np from scipy import signal logger = logging.getLogger(__name__) @dataclass class FaceDetection: """Represents a detected face in a frame.""" x: int y: int width: int height: int confidence: float center_x: int center_y: int landmarks: Optional[List[Tuple[int, int]]] = None @dataclass class PersonTracking: """Tracks a person across frames.""" person_id: int face: FaceDetection is_speaking: bool speaking_confidence: float frame_number: int @dataclass class GroupBoundingBox: """Bounding box containing all tracked faces.""" x: int y: int width: int height: int center_x: int center_y: int face_count: int @dataclass class FrameContext: """Context information for a video frame.""" frame_number: int timestamp: float detected_faces: List[FaceDetection] active_speakers: List[int] # indices of speaking faces primary_focus: Optional[Tuple[int, int]] # (x, y) center point layout_mode: str # "single", "dual_split", "grid" selected_people: List[int] = field(default_factory=list) # indices of people selected for display group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces class MediaPipeDetector: """Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback.""" def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3): self.min_detection_confidence = min_detection_confidence self.min_tracking_confidence = min_tracking_confidence self.mp_face_detection = mp.solutions.face_detection self.mp_face_mesh = mp.solutions.face_mesh # MediaPipe detectors with lower confidence for better cartoon detection self.face_detection = self.mp_face_detection.FaceDetection( min_detection_confidence=min_detection_confidence, model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons) ) self.face_mesh = self.mp_face_mesh.FaceMesh( max_num_faces=5, min_detection_confidence=min_detection_confidence, min_tracking_confidence=min_tracking_confidence, static_image_mode=False ) # OpenCV Haar Cascade as fallback for cartoon/anime faces self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # Alternative cascade for profile/side faces self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml') logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)") def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade). Args: frame: RGB image array Returns: List of detected faces """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Try MediaPipe first results = self.face_detection.process(frame_rgb) faces = [] if results.detections: for detection in results.detections: bbox = detection.location_data.relative_bounding_box x = int(bbox.xmin * width) y = int(bbox.ymin * height) w = int(bbox.width * width) h = int(bbox.height * height) x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 confidence = detection.score[0] if detection.score else 0.0 faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=confidence, center_x=center_x, center_y=center_y )) # Fallback to OpenCV Haar Cascade if MediaPipe found nothing if not faces: faces = self._detect_faces_haar_cascade(frame, width, height) return faces def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]: """ Detect faces using OpenCV Haar Cascade (works better with cartoons/anime). Args: frame: Image frame (BGR format) width: Frame width height: Frame height Returns: List of detected faces """ # Convert to grayscale for Haar Cascade if len(frame.shape) == 3: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: gray = frame # Detect frontal faces with more sensitive parameters frontal_faces = self.haar_cascade.detectMultiScale( gray, scaleFactor=1.05, # More sensitive to size variations minNeighbors=3, # Lower threshold for detection (more permissive) minSize=(30, 30), # Smaller minimum size flags=cv2.CASCADE_SCALE_IMAGE ) # Also try profile faces profile_faces = self.haar_cascade_profile.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=3, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE ) # Combine frontal and profile detections all_faces = [] for (x, y, w, h) in frontal_faces: x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 all_faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value center_x=center_x, center_y=center_y )) for (x, y, w, h) in profile_faces: # Check if this face overlaps significantly with any frontal face overlap = False for existing_face in all_faces: # Calculate IoU (Intersection over Union) x1_overlap = max(x, existing_face.x) y1_overlap = max(y, existing_face.y) x2_overlap = min(x + w, existing_face.x + existing_face.width) y2_overlap = min(y + h, existing_face.y + existing_face.height) if x1_overlap < x2_overlap and y1_overlap < y2_overlap: overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap) face_area = w * h if overlap_area / face_area > 0.3: # 30% overlap threshold overlap = True break if not overlap: x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 all_faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=0.6, # Slightly lower confidence for profile center_x=center_x, center_y=center_y )) if all_faces: logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)") return all_faces def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces with landmarks for lip sync detection. Args: frame: RGB image array Returns: List of detected faces with landmark information """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(frame_rgb) faces = [] if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: xs = [lm.x for lm in face_landmarks.landmark] ys = [lm.y for lm in face_landmarks.landmark] x_min, x_max = min(xs), max(xs) y_min, y_max = min(ys), max(ys) x = int(x_min * width) y = int(y_min * height) w = int((x_max - x_min) * width) h = int((y_max - y_min) * height) center_x = x + w // 2 center_y = y + h // 2 lip_landmarks = [] for idx in [13, 14, 78, 308]: lm = face_landmarks.landmark[idx] lip_landmarks.append((int(lm.x * width), int(lm.y * height))) faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=1.0, center_x=center_x, center_y=center_y, landmarks=lip_landmarks )) return faces def close(self): """Release MediaPipe resources.""" self.face_detection.close() self.face_mesh.close() class AudioActivityDetector: """Detects speech activity in audio.""" def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30): self.sample_rate = sample_rate self.frame_duration_ms = frame_duration_ms self.frame_size = int(sample_rate * frame_duration_ms / 1000) logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)") def detect_speaking_periods( self, audio_samples: np.ndarray, threshold: float = 0.01, # Reduced from 0.02 for better speech detection min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances ) -> List[Tuple[float, float]]: """ Detect periods of speech in audio. Args: audio_samples: Audio samples array threshold: Energy threshold for speech detection min_speech_duration: Minimum duration of speech in seconds Returns: List of (start_time, end_time) tuples in seconds """ if audio_samples.ndim > 1: audio_samples = audio_samples.mean(axis=1) energies = [] for i in range(0, len(audio_samples), self.frame_size): frame = audio_samples[i:i + self.frame_size] if len(frame) > 0: energy = np.sqrt(np.mean(frame ** 2)) energies.append(energy) speaking_frames = [e > threshold for e in energies] periods = [] start_frame = None for i, is_speaking in enumerate(speaking_frames): if is_speaking and start_frame is None: start_frame = i elif not is_speaking and start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = i * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) start_frame = None if start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = len(speaking_frames) * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) # Log detected speech periods for debugging if periods: total_speech_time = sum(end - start for start, end in periods) logger.info(f"Audio speech detection: {len(periods)} periods found, " f"total {total_speech_time:.1f}s of speech (threshold={threshold})") else: max_energy = max(energies) if energies else 0 logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} " f"(try lowering threshold if speech should be present)") return periods def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool: """Check if there is speech activity at a given time.""" for start, end in speaking_periods: if start <= time <= end: return True return False class ContextAnalyzer: """Analyzes video context to determine focus and layout.""" def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3): self.detector = MediaPipeDetector() self.audio_detector = AudioActivityDetector() self.previous_faces: List[FaceDetection] = [] self.min_face_confidence = min_face_confidence # Person tracking state self.current_selected_people: List[int] = [] # Indices of people currently on screen self.last_switch_frame: int = -999 # Frame when we last switched people self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching # Stability tracking to prevent flip-flopping self.desired_people_history: List[List[int]] = [] # Track recent desired selections self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability) self.last_switched_people: List[int] = [] # People we just switched FROM self.focus_history: List[Tuple[int, int]] = [] self.focus_history_size: int = 20 self.focus_dead_zone: int = 60 # Debug logging self.frame_log_interval = 30 # Log every N frames logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})") def analyze_frame( self, frame: np.ndarray, timestamp: float, frame_number: int, speaking_periods: Optional[List[Tuple[float, float]]] = None ) -> FrameContext: """ Analyze a single frame to extract context information. Args: frame: Video frame (BGR format from OpenCV) timestamp: Frame timestamp in seconds frame_number: Frame index speaking_periods: List of (start, end) times where speech is detected Returns: FrameContext with detection results """ faces = self.detector.detect_face_landmarks(frame) faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else [] if not faces: faces = self.detector.detect_faces(frame) faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else [] # Determine who is speaking active_speakers = [] has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp) for i, face in enumerate(faces): is_speaking = False # Prefer visual cues when multiple faces are present. if face.landmarks and len(self.previous_faces) > i: is_speaking = self._detect_lip_movement(face, self.previous_faces[i]) # Audio can confirm speech when there's only one face. if has_audio_speech and len(faces) == 1: is_speaking = True if is_speaking: active_speakers.append(i) # Debug: Log speech detection if frame_number % 30 == 0: # Every second at 30fps logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, " f"speakers={active_speakers}, total_faces={len(faces)}") if active_speakers: selected_people = active_speakers[:4] if len(selected_people) == 1: layout_mode = "single" elif len(selected_people) == 2: layout_mode = "dual_split" else: layout_mode = "grid" else: # Select THE person to focus on (always single person) # Priority: 1) Who is speaking, 2) Who is most centered selected_people = self._select_person_to_focus( faces, active_speakers, frame_number, frame.shape[1], # frame width for center calculation frame.shape[0] # frame height for center calculation ) layout_mode = "single" # Calculate group bounding box for ALL detected faces (multi-person support) group_bounds = self._calculate_group_bounding_box(faces) # For multi-person mode, use group center as primary focus if group_bounds and group_bounds.face_count > 1: primary_focus = (group_bounds.center_x, group_bounds.center_y) else: primary_focus = self._calculate_focus_point(faces, selected_people) # Debug logging every N frames if frame_number % self.frame_log_interval == 0: focus_reason = "speaker" if active_speakers else "no_speech_detected" group_info = f", group={group_bounds.face_count} faces" if group_bounds else "" logger.info(f"Frame {frame_number}: {len(faces)} faces, " f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}") self.previous_faces = faces return FrameContext( frame_number=frame_number, timestamp=timestamp, detected_faces=faces, active_speakers=active_speakers, primary_focus=primary_focus, layout_mode=layout_mode, selected_people=selected_people, group_bounds=group_bounds ) def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool: """ Detect lip movement by comparing landmarks between frames. Args: current_face: Current frame face detection previous_face: Previous frame face detection Returns: True if significant lip movement detected """ if not current_face.landmarks or not previous_face.landmarks: return False def lip_distance(landmarks): if len(landmarks) < 4: return 0 upper = np.array(landmarks[0:2]) lower = np.array(landmarks[2:4]) return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0)) current_dist = lip_distance(current_face.landmarks) previous_dist = lip_distance(previous_face.landmarks) threshold = 2.0 return abs(current_dist - previous_dist) > threshold def _select_person_to_focus( self, faces: List[FaceDetection], active_speakers: List[int], frame_number: int, frame_width: int, frame_height: int ) -> List[int]: """ Select THE single person to focus on. Priority: 1) Who is speaking, 2) Who is most centered in frame Args: faces: List of detected faces active_speakers: Indices of people currently speaking frame_number: Current frame number frame_width: Frame width for center calculation frame_height: Frame height for center calculation Returns: List with single person index [idx], or empty list if no faces """ if not faces: self.current_selected_people = [] return [] if len(faces) == 1: self.current_selected_people = [0] return [0] frames_since_last_switch = frame_number - self.last_switch_frame can_switch = frames_since_last_switch >= self.person_switch_cooldown desired_person_idx = None if active_speakers: if self.current_selected_people and self.current_selected_people[0] in active_speakers: desired_person_idx = self.current_selected_people[0] else: if can_switch or not self.current_selected_people: desired_person_idx = active_speakers[0] if self.current_selected_people and desired_person_idx != self.current_selected_people[0]: logger.info(f"Switching focus to speaker: {desired_person_idx}") self.last_switch_frame = frame_number else: desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0] else: if self.current_selected_people and len(self.current_selected_people) > 0: current_idx = self.current_selected_people[0] if current_idx < len(faces): desired_person_idx = current_idx else: if self.previous_faces and current_idx < len(self.previous_faces): prev_face = self.previous_faces[current_idx] best_match_idx = None best_match_score = float('inf') for idx, face in enumerate(faces): dx = face.center_x - prev_face.center_x dy = face.center_y - prev_face.center_y dist = np.sqrt(dx**2 + dy**2) size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height) score = dist + size_diff * 0.5 if score < best_match_score: best_match_score = score best_match_idx = idx if best_match_idx is not None and best_match_score < 1000: desired_person_idx = best_match_idx else: face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)] face_confidences.sort(key=lambda x: x[1], reverse=True) desired_person_idx = face_confidences[0][0] else: face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)] face_confidences.sort(key=lambda x: x[1], reverse=True) desired_person_idx = face_confidences[0][0] else: face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)] face_confidences.sort(key=lambda x: x[1], reverse=True) desired_person_idx = face_confidences[0][0] desired_people = [desired_person_idx] if desired_person_idx is not None else [] if not self.current_selected_people: self.current_selected_people = desired_people self.last_switch_frame = frame_number logger.info(f"Frame {frame_number}: Locked on person {desired_people}") else: self.current_selected_people = desired_people return self.current_selected_people.copy() def _ensure_distinct_people( self, faces: List[FaceDetection], people_indices: List[int] ) -> List[int]: """ Ensure selected people are distinct by checking minimum distance between them. Prevents showing the same person twice due to duplicate detection. Args: faces: List of detected faces people_indices: Indices of people to validate Returns: List of distinct people indices (max 2) """ if len(people_indices) <= 1: return people_indices distinct_people = [] for idx in people_indices: if idx >= len(faces): continue current_face = faces[idx] is_distinct = True # Check if this person is too close to any already selected person for selected_idx in distinct_people: selected_face = faces[selected_idx] # Calculate distance between face centers dx = current_face.center_x - selected_face.center_x dy = current_face.center_y - selected_face.center_y distance = np.sqrt(dx**2 + dy**2) # Also check overlap via IoU (Intersection over Union) x1_overlap = max(current_face.x, selected_face.x) y1_overlap = max(current_face.y, selected_face.y) x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width) y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height) overlap_area = 0 if x1_overlap < x2_overlap and y1_overlap < y2_overlap: overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap) # Calculate areas area1 = current_face.width * current_face.height area2 = selected_face.width * selected_face.height min_area = min(area1, area2) # If faces are very close OR significantly overlapping, they're likely the same person # Minimum distance: 1/4 of average face width min_distance = (current_face.width + selected_face.width) / 8 overlap_threshold = 0.3 # 30% overlap if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold): is_distinct = False logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})") break if is_distinct: distinct_people.append(idx) # Stop at 2 distinct people if len(distinct_people) >= 2: break # If we couldn't find 2 distinct people, return at most 1 if len(distinct_people) < 2 and len(people_indices) >= 2: logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections") return distinct_people def _calculate_focus_point( self, faces: List[FaceDetection], selected_people: List[int] ) -> Optional[Tuple[int, int]]: """ Calculate the primary focus point based on selected people with temporal smoothing. Args: faces: List of detected faces selected_people: Indices of people selected for display Returns: (x, y) tuple of focus center, or None if no faces """ if not faces or not selected_people: return None # Calculate raw focus point raw_focus_x = 0 raw_focus_y = 0 if len(selected_people) == 1: # Single person - focus on them if selected_people[0] < len(faces): primary = faces[selected_people[0]] raw_focus_x = primary.center_x raw_focus_y = primary.center_y else: # Fallback most_confident = max(faces, key=lambda f: f.confidence) raw_focus_x = most_confident.center_x raw_focus_y = most_confident.center_y else: # Multiple people - focus on the CENTER between them for stability # This prevents jarring movements when switching focus between people valid_people = [idx for idx in selected_people if idx < len(faces)] if valid_people: centers_x = [faces[idx].center_x for idx in valid_people] centers_y = [faces[idx].center_y for idx in valid_people] raw_focus_x = int(np.mean(centers_x)) raw_focus_y = int(np.mean(centers_y)) else: # Fallback most_confident = max(faces, key=lambda f: f.confidence) raw_focus_x = most_confident.center_x raw_focus_y = most_confident.center_y if self.focus_history: last_x, last_y = self.focus_history[-1] dx = abs(raw_focus_x - last_x) dy = abs(raw_focus_y - last_y) if dx < self.focus_dead_zone and dy < self.focus_dead_zone: return self.focus_history[-1] self.focus_history.append((raw_focus_x, raw_focus_y)) if len(self.focus_history) > self.focus_history_size: self.focus_history.pop(0) if len(self.focus_history) >= 5: xs = [x for x, y in self.focus_history] ys = [y for x, y in self.focus_history] median_x = int(np.median(xs)) median_y = int(np.median(ys)) return (median_x, median_y) else: return (raw_focus_x, raw_focus_y) def _calculate_group_bounding_box( self, faces: List[FaceDetection], padding_percent: float = 0.15, max_faces: int = 6 ) -> Optional[GroupBoundingBox]: """ Calculate bounding box containing all detected faces with padding. Args: faces: List of detected faces padding_percent: Padding around group as percentage of bbox dimensions max_faces: Maximum faces to include (use most confident if exceeded) Returns: GroupBoundingBox or None if no faces """ if not faces: return None # If too many faces, use most confident ones if len(faces) > max_faces: faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces] # Calculate bounding box containing all faces min_x = min(f.x for f in faces) max_x = max(f.x + f.width for f in faces) min_y = min(f.y for f in faces) max_y = max(f.y + f.height for f in faces) # Add padding width = max_x - min_x height = max_y - min_y pad_x = int(width * padding_percent) pad_y = int(height * padding_percent) final_x = max(0, min_x - pad_x) final_y = max(0, min_y - pad_y) final_width = width + 2 * pad_x final_height = height + 2 * pad_y return GroupBoundingBox( x=final_x, y=final_y, width=final_width, height=final_height, center_x=final_x + final_width // 2, center_y=final_y + final_height // 2, face_count=len(faces) ) def close(self): """Release resources.""" self.detector.close() # Clear tracking state to free memory self.previous_faces.clear() self.current_selected_people.clear() self.focus_history.clear()