""" Context detection module for video analysis. This module provides functionality to detect faces, track people, and identify who is speaking in video content using MediaPipe and audio analysis. """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import List, Optional, Tuple import cv2 import mediapipe as mp import numpy as np from scipy import signal logger = logging.getLogger(__name__) @dataclass class FaceDetection: """Represents a detected face in a frame.""" x: int y: int width: int height: int confidence: float center_x: int center_y: int landmarks: Optional[List[Tuple[int, int]]] = None @dataclass class PersonTracking: """Tracks a person across frames.""" person_id: int face: FaceDetection is_speaking: bool speaking_confidence: float frame_number: int @dataclass class FrameContext: """Context information for a video frame.""" frame_number: int timestamp: float detected_faces: List[FaceDetection] active_speakers: List[int] # indices of speaking faces primary_focus: Optional[Tuple[int, int]] # (x, y) center point layout_mode: str # "single", "dual_split", "grid" selected_people: List[int] = field(default_factory=list) # indices of people selected for display (max 2) class MediaPipeDetector: """Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback.""" def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3): self.min_detection_confidence = min_detection_confidence self.min_tracking_confidence = min_tracking_confidence self.mp_face_detection = mp.solutions.face_detection self.mp_face_mesh = mp.solutions.face_mesh # MediaPipe detectors with lower confidence for better cartoon detection self.face_detection = self.mp_face_detection.FaceDetection( min_detection_confidence=min_detection_confidence, model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons) ) self.face_mesh = self.mp_face_mesh.FaceMesh( max_num_faces=5, min_detection_confidence=min_detection_confidence, min_tracking_confidence=min_tracking_confidence, static_image_mode=False ) # OpenCV Haar Cascade as fallback for cartoon/anime faces self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # Alternative cascade for profile/side faces self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml') logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)") def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade). Args: frame: RGB image array Returns: List of detected faces """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Try MediaPipe first results = self.face_detection.process(frame_rgb) faces = [] if results.detections: for detection in results.detections: bbox = detection.location_data.relative_bounding_box x = int(bbox.xmin * width) y = int(bbox.ymin * height) w = int(bbox.width * width) h = int(bbox.height * height) x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 confidence = detection.score[0] if detection.score else 0.0 faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=confidence, center_x=center_x, center_y=center_y )) # Fallback to OpenCV Haar Cascade if MediaPipe found nothing if not faces: faces = self._detect_faces_haar_cascade(frame, width, height) return faces def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]: """ Detect faces using OpenCV Haar Cascade (works better with cartoons/anime). Args: frame: Image frame (BGR format) width: Frame width height: Frame height Returns: List of detected faces """ # Convert to grayscale for Haar Cascade if len(frame.shape) == 3: gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) else: gray = frame # Detect frontal faces with more sensitive parameters frontal_faces = self.haar_cascade.detectMultiScale( gray, scaleFactor=1.05, # More sensitive to size variations minNeighbors=3, # Lower threshold for detection (more permissive) minSize=(30, 30), # Smaller minimum size flags=cv2.CASCADE_SCALE_IMAGE ) # Also try profile faces profile_faces = self.haar_cascade_profile.detectMultiScale( gray, scaleFactor=1.1, minNeighbors=3, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE ) # Combine frontal and profile detections all_faces = [] for (x, y, w, h) in frontal_faces: x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 all_faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value center_x=center_x, center_y=center_y )) for (x, y, w, h) in profile_faces: # Check if this face overlaps significantly with any frontal face overlap = False for existing_face in all_faces: # Calculate IoU (Intersection over Union) x1_overlap = max(x, existing_face.x) y1_overlap = max(y, existing_face.y) x2_overlap = min(x + w, existing_face.x + existing_face.width) y2_overlap = min(y + h, existing_face.y + existing_face.height) if x1_overlap < x2_overlap and y1_overlap < y2_overlap: overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap) face_area = w * h if overlap_area / face_area > 0.3: # 30% overlap threshold overlap = True break if not overlap: x = max(0, min(x, width - 1)) y = max(0, min(y, height - 1)) w = min(w, width - x) h = min(h, height - y) center_x = x + w // 2 center_y = y + h // 2 all_faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=0.6, # Slightly lower confidence for profile center_x=center_x, center_y=center_y )) if all_faces: logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)") return all_faces def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]: """ Detect faces with landmarks for lip sync detection. Args: frame: RGB image array Returns: List of detected faces with landmark information """ height, width = frame.shape[:2] if len(frame.shape) == 2: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif frame.shape[2] == 4: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB) else: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(frame_rgb) faces = [] if results.multi_face_landmarks: for face_landmarks in results.multi_face_landmarks: xs = [lm.x for lm in face_landmarks.landmark] ys = [lm.y for lm in face_landmarks.landmark] x_min, x_max = min(xs), max(xs) y_min, y_max = min(ys), max(ys) x = int(x_min * width) y = int(y_min * height) w = int((x_max - x_min) * width) h = int((y_max - y_min) * height) center_x = x + w // 2 center_y = y + h // 2 lip_landmarks = [] for idx in [13, 14, 78, 308]: lm = face_landmarks.landmark[idx] lip_landmarks.append((int(lm.x * width), int(lm.y * height))) faces.append(FaceDetection( x=x, y=y, width=w, height=h, confidence=1.0, center_x=center_x, center_y=center_y, landmarks=lip_landmarks )) return faces def close(self): """Release MediaPipe resources.""" self.face_detection.close() self.face_mesh.close() class AudioActivityDetector: """Detects speech activity in audio.""" def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30): self.sample_rate = sample_rate self.frame_duration_ms = frame_duration_ms self.frame_size = int(sample_rate * frame_duration_ms / 1000) logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)") def detect_speaking_periods( self, audio_samples: np.ndarray, threshold: float = 0.01, # Reduced from 0.02 for better speech detection min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances ) -> List[Tuple[float, float]]: """ Detect periods of speech in audio. Args: audio_samples: Audio samples array threshold: Energy threshold for speech detection min_speech_duration: Minimum duration of speech in seconds Returns: List of (start_time, end_time) tuples in seconds """ if audio_samples.ndim > 1: audio_samples = audio_samples.mean(axis=1) energies = [] for i in range(0, len(audio_samples), self.frame_size): frame = audio_samples[i:i + self.frame_size] if len(frame) > 0: energy = np.sqrt(np.mean(frame ** 2)) energies.append(energy) speaking_frames = [e > threshold for e in energies] periods = [] start_frame = None for i, is_speaking in enumerate(speaking_frames): if is_speaking and start_frame is None: start_frame = i elif not is_speaking and start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = i * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) start_frame = None if start_frame is not None: start_time = start_frame * self.frame_duration_ms / 1000 end_time = len(speaking_frames) * self.frame_duration_ms / 1000 if end_time - start_time >= min_speech_duration: periods.append((start_time, end_time)) # Log detected speech periods for debugging if periods: total_speech_time = sum(end - start for start, end in periods) logger.info(f"Audio speech detection: {len(periods)} periods found, " f"total {total_speech_time:.1f}s of speech (threshold={threshold})") else: max_energy = max(energies) if energies else 0 logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} " f"(try lowering threshold if speech should be present)") return periods def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool: """Check if there is speech activity at a given time.""" for start, end in speaking_periods: if start <= time <= end: return True return False class ContextAnalyzer: """Analyzes video context to determine focus and layout.""" def __init__(self, person_switch_cooldown: int = 30): self.detector = MediaPipeDetector() self.audio_detector = AudioActivityDetector() self.previous_faces: List[FaceDetection] = [] # Person tracking state self.current_selected_people: List[int] = [] # Indices of people currently on screen self.last_switch_frame: int = -999 # Frame when we last switched people self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching # Stability tracking to prevent flip-flopping self.desired_people_history: List[List[int]] = [] # Track recent desired selections self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability) self.last_switched_people: List[int] = [] # People we just switched FROM # Focus stability: track recent focus points for temporal smoothing self.focus_history: List[Tuple[int, int]] = [] self.focus_history_size: int = 5 # Keep last 5 focus points for smoothing # Debug logging self.frame_log_interval = 30 # Log every N frames logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})") def analyze_frame( self, frame: np.ndarray, timestamp: float, frame_number: int, speaking_periods: Optional[List[Tuple[float, float]]] = None ) -> FrameContext: """ Analyze a single frame to extract context information. Args: frame: Video frame (BGR format from OpenCV) timestamp: Frame timestamp in seconds frame_number: Frame index speaking_periods: List of (start, end) times where speech is detected Returns: FrameContext with detection results """ faces = self.detector.detect_face_landmarks(frame) if not faces: faces = self.detector.detect_faces(frame) # Determine who is speaking active_speakers = [] has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp) for i, face in enumerate(faces): is_speaking = False # Check audio-based speech detection if has_audio_speech: is_speaking = True # Check lip movement (visual speech detection) if face.landmarks and len(self.previous_faces) > i: is_speaking = is_speaking or self._detect_lip_movement(face, self.previous_faces[i]) if is_speaking: active_speakers.append(i) # Debug: Log speech detection if frame_number % 30 == 0: # Every second at 30fps logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, " f"speakers={active_speakers}, total_faces={len(faces)}") # Select THE person to focus on (always single person) # Priority: 1) Who is speaking, 2) Who is most centered selected_people = self._select_person_to_focus( faces, active_speakers, frame_number, frame.shape[1], # frame width for center calculation frame.shape[0] # frame height for center calculation ) # Always use single-person layout (no split screen) layout_mode = "single" primary_focus = self._calculate_focus_point(faces, selected_people) # Debug logging every N frames if frame_number % self.frame_log_interval == 0: focus_reason = "speaker" if active_speakers else "no_speech_detected" logger.info(f"Frame {frame_number}: {len(faces)} faces, " f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}") self.previous_faces = faces return FrameContext( frame_number=frame_number, timestamp=timestamp, detected_faces=faces, active_speakers=active_speakers, primary_focus=primary_focus, layout_mode=layout_mode, selected_people=selected_people ) def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool: """ Detect lip movement by comparing landmarks between frames. Args: current_face: Current frame face detection previous_face: Previous frame face detection Returns: True if significant lip movement detected """ if not current_face.landmarks or not previous_face.landmarks: return False def lip_distance(landmarks): if len(landmarks) < 4: return 0 upper = np.array(landmarks[0:2]) lower = np.array(landmarks[2:4]) return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0)) current_dist = lip_distance(current_face.landmarks) previous_dist = lip_distance(previous_face.landmarks) threshold = 2.0 return abs(current_dist - previous_dist) > threshold def _select_person_to_focus( self, faces: List[FaceDetection], active_speakers: List[int], frame_number: int, frame_width: int, frame_height: int ) -> List[int]: """ Select THE single person to focus on. Priority: 1) Who is speaking, 2) Who is most centered in frame Args: faces: List of detected faces active_speakers: Indices of people currently speaking frame_number: Current frame number frame_width: Frame width for center calculation frame_height: Frame height for center calculation Returns: List with single person index [idx], or empty list if no faces """ if not faces: self.current_selected_people = [] return [] # If only 1 person, always focus on them if len(faces) == 1: self.current_selected_people = [0] return [0] # Check if we can switch people (cooldown period) frames_since_last_switch = frame_number - self.last_switch_frame can_switch = frames_since_last_switch >= self.person_switch_cooldown # Calculate frame center for distance comparison frame_center_x = frame_width / 2 frame_center_y = frame_height / 2 # ULTRA-STABLE MODE: Select ONE person at start, NEVER switch # This completely eliminates switching-related instability desired_person_idx = None # If we already have someone selected, ALWAYS KEEP THEM (never switch) if self.current_selected_people and len(self.current_selected_people) > 0: current_idx = self.current_selected_people[0] if current_idx < len(faces): # Current person still detected - keep them desired_person_idx = current_idx else: # Current person lost - try to find them again by position/size similarity # This handles temporary detection failures current_person_found = False if self.previous_faces and current_idx < len(self.previous_faces): prev_face = self.previous_faces[current_idx] # Find most similar face by position and size best_match_idx = None best_match_score = float('inf') for idx, face in enumerate(faces): # Distance between centers dx = face.center_x - prev_face.center_x dy = face.center_y - prev_face.center_y dist = np.sqrt(dx**2 + dy**2) # Size similarity size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height) score = dist + size_diff * 0.5 if score < best_match_score: best_match_score = score best_match_idx = idx if best_match_idx is not None and best_match_score < 1000: desired_person_idx = best_match_idx current_person_found = True if not current_person_found: # Really lost - select most confident face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)] face_confidences.sort(key=lambda x: x[1], reverse=True) desired_person_idx = face_confidences[0][0] logger.warning(f"Current person permanently lost - selecting new: {desired_person_idx}") else: # First frame - select most confident person ONCE face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)] face_confidences.sort(key=lambda x: x[1], reverse=True) desired_person_idx = face_confidences[0][0] logger.info(f"INITIAL SELECTION - Person {desired_person_idx} (will be tracked throughout entire video)") # IGNORE SPEECH DETECTION - it was causing instability # We now track ONE person from start to finish, regardless of who speaks # OLD LOGIC (commented out - was causing issues): # This logic would switch based on "who is more centered" which caused constant switching if False: # Disabled # Calculate distance from center for each face center_distances = [] for idx, face in enumerate(faces): # Euclidean distance from frame center dx = face.center_x - frame_center_x dy = face.center_y - frame_center_y distance = np.sqrt(dx**2 + dy**2) center_distances.append((idx, distance, face.confidence)) # Sort by distance (closest first), then by confidence as tiebreaker center_distances.sort(key=lambda x: (x[1], -x[2])) most_centered_idx = center_distances[0][0] most_centered_distance = center_distances[0][1] # STICKY BEHAVIOR: If we already have someone selected, only switch if: # - New person is SIGNIFICANTLY more centered (30% closer to center) # - OR current person is now very far from center (>40% of frame width) if self.current_selected_people and len(self.current_selected_people) > 0: current_idx = self.current_selected_people[0] if current_idx < len(faces): current_face = faces[current_idx] current_dx = current_face.center_x - frame_center_x current_dy = current_face.center_y - frame_center_y current_distance = np.sqrt(current_dx**2 + current_dy**2) # Define "significantly better" threshold max_acceptable_distance = frame_width * 0.4 # 40% of frame width improvement_threshold = 0.7 # New person must be 30% closer (0.7 ratio) # Keep current person if they're still reasonably centered if current_distance < max_acceptable_distance: # Current person is still acceptable - only switch if new is MUCH better if most_centered_distance < current_distance * improvement_threshold: desired_person_idx = most_centered_idx logger.debug(f"Switching: new person MUCH more centered ({most_centered_distance:.0f} vs {current_distance:.0f})") else: desired_person_idx = current_idx # Keep current logger.debug(f"Keeping current person: still reasonably centered ({current_distance:.0f} px from center)") else: # Current person is too far from center - switch desired_person_idx = most_centered_idx logger.debug(f"Current person too far from center ({current_distance:.0f} px), switching") else: # Current selection invalid desired_person_idx = most_centered_idx else: # First time - select most centered desired_person_idx = most_centered_idx # Wrap in list for compatibility with existing code desired_people = [desired_person_idx] if desired_person_idx is not None else [] # ULTRA-STABLE MODE: NO SWITCHING LOGIC AT ALL # Simply set the person and never change if not self.current_selected_people: # First time only self.current_selected_people = desired_people self.last_switch_frame = frame_number logger.info(f"Frame {frame_number}: LOCKED ON person {desired_people} - will never switch") else: # Already have someone - just update to desired (which is same person due to logic above) self.current_selected_people = desired_people return self.current_selected_people.copy() def _ensure_distinct_people( self, faces: List[FaceDetection], people_indices: List[int] ) -> List[int]: """ Ensure selected people are distinct by checking minimum distance between them. Prevents showing the same person twice due to duplicate detection. Args: faces: List of detected faces people_indices: Indices of people to validate Returns: List of distinct people indices (max 2) """ if len(people_indices) <= 1: return people_indices distinct_people = [] for idx in people_indices: if idx >= len(faces): continue current_face = faces[idx] is_distinct = True # Check if this person is too close to any already selected person for selected_idx in distinct_people: selected_face = faces[selected_idx] # Calculate distance between face centers dx = current_face.center_x - selected_face.center_x dy = current_face.center_y - selected_face.center_y distance = np.sqrt(dx**2 + dy**2) # Also check overlap via IoU (Intersection over Union) x1_overlap = max(current_face.x, selected_face.x) y1_overlap = max(current_face.y, selected_face.y) x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width) y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height) overlap_area = 0 if x1_overlap < x2_overlap and y1_overlap < y2_overlap: overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap) # Calculate areas area1 = current_face.width * current_face.height area2 = selected_face.width * selected_face.height min_area = min(area1, area2) # If faces are very close OR significantly overlapping, they're likely the same person # Minimum distance: 1/4 of average face width min_distance = (current_face.width + selected_face.width) / 8 overlap_threshold = 0.3 # 30% overlap if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold): is_distinct = False logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})") break if is_distinct: distinct_people.append(idx) # Stop at 2 distinct people if len(distinct_people) >= 2: break # If we couldn't find 2 distinct people, return at most 1 if len(distinct_people) < 2 and len(people_indices) >= 2: logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections") return distinct_people def _calculate_focus_point( self, faces: List[FaceDetection], selected_people: List[int] ) -> Optional[Tuple[int, int]]: """ Calculate the primary focus point based on selected people with temporal smoothing. Args: faces: List of detected faces selected_people: Indices of people selected for display Returns: (x, y) tuple of focus center, or None if no faces """ if not faces or not selected_people: return None # Calculate raw focus point raw_focus_x = 0 raw_focus_y = 0 if len(selected_people) == 1: # Single person - focus on them if selected_people[0] < len(faces): primary = faces[selected_people[0]] raw_focus_x = primary.center_x raw_focus_y = primary.center_y else: # Fallback most_confident = max(faces, key=lambda f: f.confidence) raw_focus_x = most_confident.center_x raw_focus_y = most_confident.center_y else: # Multiple people - focus on the CENTER between them for stability # This prevents jarring movements when switching focus between people valid_people = [idx for idx in selected_people if idx < len(faces)] if valid_people: centers_x = [faces[idx].center_x for idx in valid_people] centers_y = [faces[idx].center_y for idx in valid_people] raw_focus_x = int(np.mean(centers_x)) raw_focus_y = int(np.mean(centers_y)) else: # Fallback most_confident = max(faces, key=lambda f: f.confidence) raw_focus_x = most_confident.center_x raw_focus_y = most_confident.center_y # Apply temporal smoothing using focus history self.focus_history.append((raw_focus_x, raw_focus_y)) if len(self.focus_history) > self.focus_history_size: self.focus_history.pop(0) # Calculate smoothed focus as weighted average (more weight to recent frames) if len(self.focus_history) > 1: # Exponential weights: recent frames have more influence weights = [2 ** i for i in range(len(self.focus_history))] total_weight = sum(weights) smoothed_x = sum(x * w for (x, y), w in zip(self.focus_history, weights)) / total_weight smoothed_y = sum(y * w for (x, y), w in zip(self.focus_history, weights)) / total_weight return (int(smoothed_x), int(smoothed_y)) else: return (raw_focus_x, raw_focus_y) def close(self): """Release resources.""" self.detector.close() # Clear tracking state to free memory self.previous_faces.clear() self.current_selected_people.clear() self.focus_history.clear()