845 lines
31 KiB
Python
845 lines
31 KiB
Python
"""
|
|
Context detection module for video analysis.
|
|
|
|
This module provides functionality to detect faces, track people,
|
|
and identify who is speaking in video content using MediaPipe and audio analysis.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Tuple
|
|
|
|
import cv2
|
|
import mediapipe as mp
|
|
import numpy as np
|
|
from scipy import signal
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FaceDetection:
|
|
"""Represents a detected face in a frame."""
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
confidence: float
|
|
center_x: int
|
|
center_y: int
|
|
landmarks: Optional[List[Tuple[int, int]]] = None
|
|
|
|
|
|
@dataclass
|
|
class PersonTracking:
|
|
"""Tracks a person across frames."""
|
|
person_id: int
|
|
face: FaceDetection
|
|
is_speaking: bool
|
|
speaking_confidence: float
|
|
frame_number: int
|
|
|
|
|
|
@dataclass
|
|
class GroupBoundingBox:
|
|
"""Bounding box containing all tracked faces."""
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
center_x: int
|
|
center_y: int
|
|
face_count: int
|
|
|
|
|
|
@dataclass
|
|
class FrameContext:
|
|
"""Context information for a video frame."""
|
|
frame_number: int
|
|
timestamp: float
|
|
detected_faces: List[FaceDetection]
|
|
active_speakers: List[int] # indices of speaking faces
|
|
primary_focus: Optional[Tuple[int, int]] # (x, y) center point
|
|
layout_mode: str # "single", "dual_split", "grid"
|
|
selected_people: List[int] = field(default_factory=list) # indices of people selected for display
|
|
group_bounds: Optional[GroupBoundingBox] = None # bounding box for all detected faces
|
|
|
|
|
|
class MediaPipeDetector:
|
|
"""Face and pose detection using MediaPipe with OpenCV Haar Cascade fallback."""
|
|
|
|
def __init__(self, min_detection_confidence: float = 0.3, min_tracking_confidence: float = 0.3):
|
|
self.min_detection_confidence = min_detection_confidence
|
|
self.min_tracking_confidence = min_tracking_confidence
|
|
self.mp_face_detection = mp.solutions.face_detection
|
|
self.mp_face_mesh = mp.solutions.face_mesh
|
|
|
|
# MediaPipe detectors with lower confidence for better cartoon detection
|
|
self.face_detection = self.mp_face_detection.FaceDetection(
|
|
min_detection_confidence=min_detection_confidence,
|
|
model_selection=0 # Changed to 0 for better detection of varied faces (including cartoons)
|
|
)
|
|
|
|
self.face_mesh = self.mp_face_mesh.FaceMesh(
|
|
max_num_faces=5,
|
|
min_detection_confidence=min_detection_confidence,
|
|
min_tracking_confidence=min_tracking_confidence,
|
|
static_image_mode=False
|
|
)
|
|
|
|
# OpenCV Haar Cascade as fallback for cartoon/anime faces
|
|
self.haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
|
|
|
# Alternative cascade for profile/side faces
|
|
self.haar_cascade_profile = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_profileface.xml')
|
|
|
|
logger.info(f"Hybrid detector initialized (MediaPipe confidence={min_detection_confidence}, OpenCV Haar Cascade enabled)")
|
|
|
|
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
|
|
"""
|
|
Detect faces in a frame using hybrid approach (MediaPipe + OpenCV Haar Cascade).
|
|
|
|
Args:
|
|
frame: RGB image array
|
|
|
|
Returns:
|
|
List of detected faces
|
|
"""
|
|
height, width = frame.shape[:2]
|
|
|
|
if len(frame.shape) == 2:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
|
|
elif frame.shape[2] == 4:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
|
|
else:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
# Try MediaPipe first
|
|
results = self.face_detection.process(frame_rgb)
|
|
|
|
faces = []
|
|
if results.detections:
|
|
for detection in results.detections:
|
|
bbox = detection.location_data.relative_bounding_box
|
|
|
|
x = int(bbox.xmin * width)
|
|
y = int(bbox.ymin * height)
|
|
w = int(bbox.width * width)
|
|
h = int(bbox.height * height)
|
|
|
|
x = max(0, min(x, width - 1))
|
|
y = max(0, min(y, height - 1))
|
|
w = min(w, width - x)
|
|
h = min(h, height - y)
|
|
|
|
center_x = x + w // 2
|
|
center_y = y + h // 2
|
|
|
|
confidence = detection.score[0] if detection.score else 0.0
|
|
|
|
faces.append(FaceDetection(
|
|
x=x,
|
|
y=y,
|
|
width=w,
|
|
height=h,
|
|
confidence=confidence,
|
|
center_x=center_x,
|
|
center_y=center_y
|
|
))
|
|
|
|
# Fallback to OpenCV Haar Cascade if MediaPipe found nothing
|
|
if not faces:
|
|
faces = self._detect_faces_haar_cascade(frame, width, height)
|
|
|
|
return faces
|
|
|
|
def _detect_faces_haar_cascade(self, frame: np.ndarray, width: int, height: int) -> List[FaceDetection]:
|
|
"""
|
|
Detect faces using OpenCV Haar Cascade (works better with cartoons/anime).
|
|
|
|
Args:
|
|
frame: Image frame (BGR format)
|
|
width: Frame width
|
|
height: Frame height
|
|
|
|
Returns:
|
|
List of detected faces
|
|
"""
|
|
# Convert to grayscale for Haar Cascade
|
|
if len(frame.shape) == 3:
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
|
else:
|
|
gray = frame
|
|
|
|
# Detect frontal faces with more sensitive parameters
|
|
frontal_faces = self.haar_cascade.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.05, # More sensitive to size variations
|
|
minNeighbors=3, # Lower threshold for detection (more permissive)
|
|
minSize=(30, 30), # Smaller minimum size
|
|
flags=cv2.CASCADE_SCALE_IMAGE
|
|
)
|
|
|
|
# Also try profile faces
|
|
profile_faces = self.haar_cascade_profile.detectMultiScale(
|
|
gray,
|
|
scaleFactor=1.1,
|
|
minNeighbors=3,
|
|
minSize=(30, 30),
|
|
flags=cv2.CASCADE_SCALE_IMAGE
|
|
)
|
|
|
|
# Combine frontal and profile detections
|
|
all_faces = []
|
|
|
|
for (x, y, w, h) in frontal_faces:
|
|
x = max(0, min(x, width - 1))
|
|
y = max(0, min(y, height - 1))
|
|
w = min(w, width - x)
|
|
h = min(h, height - y)
|
|
|
|
center_x = x + w // 2
|
|
center_y = y + h // 2
|
|
|
|
all_faces.append(FaceDetection(
|
|
x=x,
|
|
y=y,
|
|
width=w,
|
|
height=h,
|
|
confidence=0.7, # Haar Cascade doesn't provide confidence, use fixed value
|
|
center_x=center_x,
|
|
center_y=center_y
|
|
))
|
|
|
|
for (x, y, w, h) in profile_faces:
|
|
# Check if this face overlaps significantly with any frontal face
|
|
overlap = False
|
|
for existing_face in all_faces:
|
|
# Calculate IoU (Intersection over Union)
|
|
x1_overlap = max(x, existing_face.x)
|
|
y1_overlap = max(y, existing_face.y)
|
|
x2_overlap = min(x + w, existing_face.x + existing_face.width)
|
|
y2_overlap = min(y + h, existing_face.y + existing_face.height)
|
|
|
|
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
|
|
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
|
|
face_area = w * h
|
|
if overlap_area / face_area > 0.3: # 30% overlap threshold
|
|
overlap = True
|
|
break
|
|
|
|
if not overlap:
|
|
x = max(0, min(x, width - 1))
|
|
y = max(0, min(y, height - 1))
|
|
w = min(w, width - x)
|
|
h = min(h, height - y)
|
|
|
|
center_x = x + w // 2
|
|
center_y = y + h // 2
|
|
|
|
all_faces.append(FaceDetection(
|
|
x=x,
|
|
y=y,
|
|
width=w,
|
|
height=h,
|
|
confidence=0.6, # Slightly lower confidence for profile
|
|
center_x=center_x,
|
|
center_y=center_y
|
|
))
|
|
|
|
if all_faces:
|
|
logger.debug(f"Haar Cascade detected {len(all_faces)} faces (MediaPipe failed)")
|
|
|
|
return all_faces
|
|
|
|
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
|
|
"""
|
|
Detect faces with landmarks for lip sync detection.
|
|
|
|
Args:
|
|
frame: RGB image array
|
|
|
|
Returns:
|
|
List of detected faces with landmark information
|
|
"""
|
|
height, width = frame.shape[:2]
|
|
|
|
if len(frame.shape) == 2:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
|
|
elif frame.shape[2] == 4:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
|
|
else:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
results = self.face_mesh.process(frame_rgb)
|
|
|
|
faces = []
|
|
if results.multi_face_landmarks:
|
|
for face_landmarks in results.multi_face_landmarks:
|
|
xs = [lm.x for lm in face_landmarks.landmark]
|
|
ys = [lm.y for lm in face_landmarks.landmark]
|
|
|
|
x_min, x_max = min(xs), max(xs)
|
|
y_min, y_max = min(ys), max(ys)
|
|
|
|
x = int(x_min * width)
|
|
y = int(y_min * height)
|
|
w = int((x_max - x_min) * width)
|
|
h = int((y_max - y_min) * height)
|
|
|
|
center_x = x + w // 2
|
|
center_y = y + h // 2
|
|
|
|
lip_landmarks = []
|
|
for idx in [13, 14, 78, 308]:
|
|
lm = face_landmarks.landmark[idx]
|
|
lip_landmarks.append((int(lm.x * width), int(lm.y * height)))
|
|
|
|
faces.append(FaceDetection(
|
|
x=x,
|
|
y=y,
|
|
width=w,
|
|
height=h,
|
|
confidence=1.0,
|
|
center_x=center_x,
|
|
center_y=center_y,
|
|
landmarks=lip_landmarks
|
|
))
|
|
|
|
return faces
|
|
|
|
def close(self):
|
|
"""Release MediaPipe resources."""
|
|
self.face_detection.close()
|
|
self.face_mesh.close()
|
|
|
|
|
|
class AudioActivityDetector:
|
|
"""Detects speech activity in audio."""
|
|
|
|
def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30):
|
|
self.sample_rate = sample_rate
|
|
self.frame_duration_ms = frame_duration_ms
|
|
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
|
|
|
|
logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)")
|
|
|
|
def detect_speaking_periods(
|
|
self,
|
|
audio_samples: np.ndarray,
|
|
threshold: float = 0.01, # Reduced from 0.02 for better speech detection
|
|
min_speech_duration: float = 0.05 # Reduced from 0.1 to catch shorter utterances
|
|
) -> List[Tuple[float, float]]:
|
|
"""
|
|
Detect periods of speech in audio.
|
|
|
|
Args:
|
|
audio_samples: Audio samples array
|
|
threshold: Energy threshold for speech detection
|
|
min_speech_duration: Minimum duration of speech in seconds
|
|
|
|
Returns:
|
|
List of (start_time, end_time) tuples in seconds
|
|
"""
|
|
if audio_samples.ndim > 1:
|
|
audio_samples = audio_samples.mean(axis=1)
|
|
|
|
energies = []
|
|
for i in range(0, len(audio_samples), self.frame_size):
|
|
frame = audio_samples[i:i + self.frame_size]
|
|
if len(frame) > 0:
|
|
energy = np.sqrt(np.mean(frame ** 2))
|
|
energies.append(energy)
|
|
|
|
speaking_frames = [e > threshold for e in energies]
|
|
|
|
periods = []
|
|
start_frame = None
|
|
|
|
for i, is_speaking in enumerate(speaking_frames):
|
|
if is_speaking and start_frame is None:
|
|
start_frame = i
|
|
elif not is_speaking and start_frame is not None:
|
|
start_time = start_frame * self.frame_duration_ms / 1000
|
|
end_time = i * self.frame_duration_ms / 1000
|
|
|
|
if end_time - start_time >= min_speech_duration:
|
|
periods.append((start_time, end_time))
|
|
|
|
start_frame = None
|
|
|
|
if start_frame is not None:
|
|
start_time = start_frame * self.frame_duration_ms / 1000
|
|
end_time = len(speaking_frames) * self.frame_duration_ms / 1000
|
|
if end_time - start_time >= min_speech_duration:
|
|
periods.append((start_time, end_time))
|
|
|
|
# Log detected speech periods for debugging
|
|
if periods:
|
|
total_speech_time = sum(end - start for start, end in periods)
|
|
logger.info(f"Audio speech detection: {len(periods)} periods found, "
|
|
f"total {total_speech_time:.1f}s of speech (threshold={threshold})")
|
|
else:
|
|
max_energy = max(energies) if energies else 0
|
|
logger.warning(f"No speech detected! Max energy={max_energy:.4f}, threshold={threshold} "
|
|
f"(try lowering threshold if speech should be present)")
|
|
|
|
return periods
|
|
|
|
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
|
|
"""Check if there is speech activity at a given time."""
|
|
for start, end in speaking_periods:
|
|
if start <= time <= end:
|
|
return True
|
|
return False
|
|
|
|
|
|
class ContextAnalyzer:
|
|
"""Analyzes video context to determine focus and layout."""
|
|
|
|
def __init__(self, person_switch_cooldown: int = 30, min_face_confidence: float = 0.3):
|
|
self.detector = MediaPipeDetector()
|
|
self.audio_detector = AudioActivityDetector()
|
|
self.previous_faces: List[FaceDetection] = []
|
|
self.min_face_confidence = min_face_confidence
|
|
|
|
# Person tracking state
|
|
self.current_selected_people: List[int] = [] # Indices of people currently on screen
|
|
self.last_switch_frame: int = -999 # Frame when we last switched people
|
|
self.person_switch_cooldown = person_switch_cooldown # Minimum frames before switching
|
|
|
|
# Stability tracking to prevent flip-flopping
|
|
self.desired_people_history: List[List[int]] = [] # Track recent desired selections
|
|
self.stability_threshold = 20 # Frames needed to confirm a switch (increased for more stability)
|
|
self.last_switched_people: List[int] = [] # People we just switched FROM
|
|
|
|
self.focus_history: List[Tuple[int, int]] = []
|
|
self.focus_history_size: int = 20
|
|
self.focus_dead_zone: int = 60
|
|
|
|
# Debug logging
|
|
self.frame_log_interval = 30 # Log every N frames
|
|
|
|
logger.info(f"Context analyzer initialized (cooldown={person_switch_cooldown} frames, focus_smoothing={self.focus_history_size})")
|
|
|
|
def analyze_frame(
|
|
self,
|
|
frame: np.ndarray,
|
|
timestamp: float,
|
|
frame_number: int,
|
|
speaking_periods: Optional[List[Tuple[float, float]]] = None
|
|
) -> FrameContext:
|
|
"""
|
|
Analyze a single frame to extract context information.
|
|
|
|
Args:
|
|
frame: Video frame (BGR format from OpenCV)
|
|
timestamp: Frame timestamp in seconds
|
|
frame_number: Frame index
|
|
speaking_periods: List of (start, end) times where speech is detected
|
|
|
|
Returns:
|
|
FrameContext with detection results
|
|
"""
|
|
faces = self.detector.detect_face_landmarks(frame)
|
|
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
|
|
|
|
if not faces:
|
|
faces = self.detector.detect_faces(frame)
|
|
faces = [face for face in faces if face.confidence >= self.min_face_confidence] if faces else []
|
|
|
|
# Determine who is speaking
|
|
active_speakers = []
|
|
has_audio_speech = speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp)
|
|
|
|
for i, face in enumerate(faces):
|
|
is_speaking = False
|
|
|
|
# Prefer visual cues when multiple faces are present.
|
|
if face.landmarks and len(self.previous_faces) > i:
|
|
is_speaking = self._detect_lip_movement(face, self.previous_faces[i])
|
|
|
|
# Audio can confirm speech when there's only one face.
|
|
if has_audio_speech and len(faces) == 1:
|
|
is_speaking = True
|
|
|
|
if is_speaking:
|
|
active_speakers.append(i)
|
|
|
|
# Debug: Log speech detection
|
|
if frame_number % 30 == 0: # Every second at 30fps
|
|
logger.info(f"Speech detection - Frame {frame_number}: audio_active={has_audio_speech}, "
|
|
f"speakers={active_speakers}, total_faces={len(faces)}")
|
|
|
|
if active_speakers:
|
|
selected_people = active_speakers[:4]
|
|
if len(selected_people) == 1:
|
|
layout_mode = "single"
|
|
elif len(selected_people) == 2:
|
|
layout_mode = "dual_split"
|
|
else:
|
|
layout_mode = "grid"
|
|
else:
|
|
# Select THE person to focus on (always single person)
|
|
# Priority: 1) Who is speaking, 2) Who is most centered
|
|
selected_people = self._select_person_to_focus(
|
|
faces,
|
|
active_speakers,
|
|
frame_number,
|
|
frame.shape[1], # frame width for center calculation
|
|
frame.shape[0] # frame height for center calculation
|
|
)
|
|
layout_mode = "single"
|
|
|
|
# Calculate group bounding box for ALL detected faces (multi-person support)
|
|
group_bounds = self._calculate_group_bounding_box(faces)
|
|
|
|
# For multi-person mode, use group center as primary focus
|
|
if group_bounds and group_bounds.face_count > 1:
|
|
primary_focus = (group_bounds.center_x, group_bounds.center_y)
|
|
else:
|
|
primary_focus = self._calculate_focus_point(faces, selected_people)
|
|
|
|
# Debug logging every N frames
|
|
if frame_number % self.frame_log_interval == 0:
|
|
focus_reason = "speaker" if active_speakers else "no_speech_detected"
|
|
group_info = f", group={group_bounds.face_count} faces" if group_bounds else ""
|
|
logger.info(f"Frame {frame_number}: {len(faces)} faces, "
|
|
f"{len(active_speakers)} speakers, focus={selected_people}, reason={focus_reason}{group_info}")
|
|
|
|
self.previous_faces = faces
|
|
|
|
return FrameContext(
|
|
frame_number=frame_number,
|
|
timestamp=timestamp,
|
|
detected_faces=faces,
|
|
active_speakers=active_speakers,
|
|
primary_focus=primary_focus,
|
|
layout_mode=layout_mode,
|
|
selected_people=selected_people,
|
|
group_bounds=group_bounds
|
|
)
|
|
|
|
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
|
|
"""
|
|
Detect lip movement by comparing landmarks between frames.
|
|
|
|
Args:
|
|
current_face: Current frame face detection
|
|
previous_face: Previous frame face detection
|
|
|
|
Returns:
|
|
True if significant lip movement detected
|
|
"""
|
|
if not current_face.landmarks or not previous_face.landmarks:
|
|
return False
|
|
|
|
def lip_distance(landmarks):
|
|
if len(landmarks) < 4:
|
|
return 0
|
|
|
|
upper = np.array(landmarks[0:2])
|
|
lower = np.array(landmarks[2:4])
|
|
return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0))
|
|
|
|
current_dist = lip_distance(current_face.landmarks)
|
|
previous_dist = lip_distance(previous_face.landmarks)
|
|
|
|
threshold = 2.0
|
|
return abs(current_dist - previous_dist) > threshold
|
|
|
|
def _select_person_to_focus(
|
|
self,
|
|
faces: List[FaceDetection],
|
|
active_speakers: List[int],
|
|
frame_number: int,
|
|
frame_width: int,
|
|
frame_height: int
|
|
) -> List[int]:
|
|
"""
|
|
Select THE single person to focus on.
|
|
Priority: 1) Who is speaking, 2) Who is most centered in frame
|
|
|
|
Args:
|
|
faces: List of detected faces
|
|
active_speakers: Indices of people currently speaking
|
|
frame_number: Current frame number
|
|
frame_width: Frame width for center calculation
|
|
frame_height: Frame height for center calculation
|
|
|
|
Returns:
|
|
List with single person index [idx], or empty list if no faces
|
|
"""
|
|
if not faces:
|
|
self.current_selected_people = []
|
|
return []
|
|
|
|
if len(faces) == 1:
|
|
self.current_selected_people = [0]
|
|
return [0]
|
|
|
|
frames_since_last_switch = frame_number - self.last_switch_frame
|
|
can_switch = frames_since_last_switch >= self.person_switch_cooldown
|
|
|
|
desired_person_idx = None
|
|
|
|
if active_speakers:
|
|
if self.current_selected_people and self.current_selected_people[0] in active_speakers:
|
|
desired_person_idx = self.current_selected_people[0]
|
|
else:
|
|
if can_switch or not self.current_selected_people:
|
|
desired_person_idx = active_speakers[0]
|
|
if self.current_selected_people and desired_person_idx != self.current_selected_people[0]:
|
|
logger.info(f"Switching focus to speaker: {desired_person_idx}")
|
|
self.last_switch_frame = frame_number
|
|
else:
|
|
desired_person_idx = self.current_selected_people[0] if self.current_selected_people else active_speakers[0]
|
|
else:
|
|
if self.current_selected_people and len(self.current_selected_people) > 0:
|
|
current_idx = self.current_selected_people[0]
|
|
if current_idx < len(faces):
|
|
desired_person_idx = current_idx
|
|
else:
|
|
if self.previous_faces and current_idx < len(self.previous_faces):
|
|
prev_face = self.previous_faces[current_idx]
|
|
best_match_idx = None
|
|
best_match_score = float('inf')
|
|
for idx, face in enumerate(faces):
|
|
dx = face.center_x - prev_face.center_x
|
|
dy = face.center_y - prev_face.center_y
|
|
dist = np.sqrt(dx**2 + dy**2)
|
|
size_diff = abs(face.width - prev_face.width) + abs(face.height - prev_face.height)
|
|
score = dist + size_diff * 0.5
|
|
if score < best_match_score:
|
|
best_match_score = score
|
|
best_match_idx = idx
|
|
|
|
if best_match_idx is not None and best_match_score < 1000:
|
|
desired_person_idx = best_match_idx
|
|
else:
|
|
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
|
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
|
desired_person_idx = face_confidences[0][0]
|
|
else:
|
|
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
|
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
|
desired_person_idx = face_confidences[0][0]
|
|
else:
|
|
face_confidences = [(idx, face.confidence) for idx, face in enumerate(faces)]
|
|
face_confidences.sort(key=lambda x: x[1], reverse=True)
|
|
desired_person_idx = face_confidences[0][0]
|
|
|
|
desired_people = [desired_person_idx] if desired_person_idx is not None else []
|
|
|
|
if not self.current_selected_people:
|
|
self.current_selected_people = desired_people
|
|
self.last_switch_frame = frame_number
|
|
logger.info(f"Frame {frame_number}: Locked on person {desired_people}")
|
|
else:
|
|
self.current_selected_people = desired_people
|
|
|
|
return self.current_selected_people.copy()
|
|
|
|
def _ensure_distinct_people(
|
|
self,
|
|
faces: List[FaceDetection],
|
|
people_indices: List[int]
|
|
) -> List[int]:
|
|
"""
|
|
Ensure selected people are distinct by checking minimum distance between them.
|
|
Prevents showing the same person twice due to duplicate detection.
|
|
|
|
Args:
|
|
faces: List of detected faces
|
|
people_indices: Indices of people to validate
|
|
|
|
Returns:
|
|
List of distinct people indices (max 2)
|
|
"""
|
|
if len(people_indices) <= 1:
|
|
return people_indices
|
|
|
|
distinct_people = []
|
|
|
|
for idx in people_indices:
|
|
if idx >= len(faces):
|
|
continue
|
|
|
|
current_face = faces[idx]
|
|
is_distinct = True
|
|
|
|
# Check if this person is too close to any already selected person
|
|
for selected_idx in distinct_people:
|
|
selected_face = faces[selected_idx]
|
|
|
|
# Calculate distance between face centers
|
|
dx = current_face.center_x - selected_face.center_x
|
|
dy = current_face.center_y - selected_face.center_y
|
|
distance = np.sqrt(dx**2 + dy**2)
|
|
|
|
# Also check overlap via IoU (Intersection over Union)
|
|
x1_overlap = max(current_face.x, selected_face.x)
|
|
y1_overlap = max(current_face.y, selected_face.y)
|
|
x2_overlap = min(current_face.x + current_face.width, selected_face.x + selected_face.width)
|
|
y2_overlap = min(current_face.y + current_face.height, selected_face.y + selected_face.height)
|
|
|
|
overlap_area = 0
|
|
if x1_overlap < x2_overlap and y1_overlap < y2_overlap:
|
|
overlap_area = (x2_overlap - x1_overlap) * (y2_overlap - y1_overlap)
|
|
|
|
# Calculate areas
|
|
area1 = current_face.width * current_face.height
|
|
area2 = selected_face.width * selected_face.height
|
|
min_area = min(area1, area2)
|
|
|
|
# If faces are very close OR significantly overlapping, they're likely the same person
|
|
# Minimum distance: 1/4 of average face width
|
|
min_distance = (current_face.width + selected_face.width) / 8
|
|
overlap_threshold = 0.3 # 30% overlap
|
|
|
|
if distance < min_distance or (min_area > 0 and overlap_area / min_area > overlap_threshold):
|
|
is_distinct = False
|
|
logger.debug(f"Person {idx} too similar to person {selected_idx} (dist={distance:.1f}, overlap={overlap_area/min_area if min_area > 0 else 0:.2%})")
|
|
break
|
|
|
|
if is_distinct:
|
|
distinct_people.append(idx)
|
|
|
|
# Stop at 2 distinct people
|
|
if len(distinct_people) >= 2:
|
|
break
|
|
|
|
# If we couldn't find 2 distinct people, return at most 1
|
|
if len(distinct_people) < 2 and len(people_indices) >= 2:
|
|
logger.debug(f"Only {len(distinct_people)} distinct person(s) found from {len(people_indices)} detections")
|
|
|
|
return distinct_people
|
|
|
|
def _calculate_focus_point(
|
|
self,
|
|
faces: List[FaceDetection],
|
|
selected_people: List[int]
|
|
) -> Optional[Tuple[int, int]]:
|
|
"""
|
|
Calculate the primary focus point based on selected people with temporal smoothing.
|
|
|
|
Args:
|
|
faces: List of detected faces
|
|
selected_people: Indices of people selected for display
|
|
|
|
Returns:
|
|
(x, y) tuple of focus center, or None if no faces
|
|
"""
|
|
if not faces or not selected_people:
|
|
return None
|
|
|
|
# Calculate raw focus point
|
|
raw_focus_x = 0
|
|
raw_focus_y = 0
|
|
|
|
if len(selected_people) == 1:
|
|
# Single person - focus on them
|
|
if selected_people[0] < len(faces):
|
|
primary = faces[selected_people[0]]
|
|
raw_focus_x = primary.center_x
|
|
raw_focus_y = primary.center_y
|
|
else:
|
|
# Fallback
|
|
most_confident = max(faces, key=lambda f: f.confidence)
|
|
raw_focus_x = most_confident.center_x
|
|
raw_focus_y = most_confident.center_y
|
|
else:
|
|
# Multiple people - focus on the CENTER between them for stability
|
|
# This prevents jarring movements when switching focus between people
|
|
valid_people = [idx for idx in selected_people if idx < len(faces)]
|
|
if valid_people:
|
|
centers_x = [faces[idx].center_x for idx in valid_people]
|
|
centers_y = [faces[idx].center_y for idx in valid_people]
|
|
raw_focus_x = int(np.mean(centers_x))
|
|
raw_focus_y = int(np.mean(centers_y))
|
|
else:
|
|
# Fallback
|
|
most_confident = max(faces, key=lambda f: f.confidence)
|
|
raw_focus_x = most_confident.center_x
|
|
raw_focus_y = most_confident.center_y
|
|
|
|
if self.focus_history:
|
|
last_x, last_y = self.focus_history[-1]
|
|
dx = abs(raw_focus_x - last_x)
|
|
dy = abs(raw_focus_y - last_y)
|
|
if dx < self.focus_dead_zone and dy < self.focus_dead_zone:
|
|
return self.focus_history[-1]
|
|
|
|
self.focus_history.append((raw_focus_x, raw_focus_y))
|
|
if len(self.focus_history) > self.focus_history_size:
|
|
self.focus_history.pop(0)
|
|
|
|
if len(self.focus_history) >= 5:
|
|
xs = [x for x, y in self.focus_history]
|
|
ys = [y for x, y in self.focus_history]
|
|
median_x = int(np.median(xs))
|
|
median_y = int(np.median(ys))
|
|
return (median_x, median_y)
|
|
else:
|
|
return (raw_focus_x, raw_focus_y)
|
|
|
|
def _calculate_group_bounding_box(
|
|
self,
|
|
faces: List[FaceDetection],
|
|
padding_percent: float = 0.15,
|
|
max_faces: int = 6
|
|
) -> Optional[GroupBoundingBox]:
|
|
"""
|
|
Calculate bounding box containing all detected faces with padding.
|
|
|
|
Args:
|
|
faces: List of detected faces
|
|
padding_percent: Padding around group as percentage of bbox dimensions
|
|
max_faces: Maximum faces to include (use most confident if exceeded)
|
|
|
|
Returns:
|
|
GroupBoundingBox or None if no faces
|
|
"""
|
|
if not faces:
|
|
return None
|
|
|
|
# If too many faces, use most confident ones
|
|
if len(faces) > max_faces:
|
|
faces = sorted(faces, key=lambda f: f.confidence, reverse=True)[:max_faces]
|
|
|
|
# Calculate bounding box containing all faces
|
|
min_x = min(f.x for f in faces)
|
|
max_x = max(f.x + f.width for f in faces)
|
|
min_y = min(f.y for f in faces)
|
|
max_y = max(f.y + f.height for f in faces)
|
|
|
|
# Add padding
|
|
width = max_x - min_x
|
|
height = max_y - min_y
|
|
pad_x = int(width * padding_percent)
|
|
pad_y = int(height * padding_percent)
|
|
|
|
final_x = max(0, min_x - pad_x)
|
|
final_y = max(0, min_y - pad_y)
|
|
final_width = width + 2 * pad_x
|
|
final_height = height + 2 * pad_y
|
|
|
|
return GroupBoundingBox(
|
|
x=final_x,
|
|
y=final_y,
|
|
width=final_width,
|
|
height=final_height,
|
|
center_x=final_x + final_width // 2,
|
|
center_y=final_y + final_height // 2,
|
|
face_count=len(faces)
|
|
)
|
|
|
|
def close(self):
|
|
"""Release resources."""
|
|
self.detector.close()
|
|
# Clear tracking state to free memory
|
|
self.previous_faces.clear()
|
|
self.current_selected_people.clear()
|
|
self.focus_history.clear()
|