Files
video-render/video_render/context_detection.py
LeoMortari c5d3e83a5f #v2 - Inicia testes da v2
- Adiciona rastreamento de objetos
- Facial detection
- Legenda interativa
- Cortes mais precisos
- Refinamento do Prompt
2025-11-12 11:38:09 -03:00

399 lines
12 KiB
Python

"""
Context detection module for video analysis.
This module provides functionality to detect faces, track people,
and identify who is speaking in video content using MediaPipe and audio analysis.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from typing import List, Optional, Tuple
import cv2
import mediapipe as mp
import numpy as np
from scipy import signal
logger = logging.getLogger(__name__)
@dataclass
class FaceDetection:
"""Represents a detected face in a frame."""
x: int
y: int
width: int
height: int
confidence: float
center_x: int
center_y: int
landmarks: Optional[List[Tuple[int, int]]] = None
@dataclass
class PersonTracking:
"""Tracks a person across frames."""
person_id: int
face: FaceDetection
is_speaking: bool
speaking_confidence: float
frame_number: int
@dataclass
class FrameContext:
"""Context information for a video frame."""
frame_number: int
timestamp: float
detected_faces: List[FaceDetection]
active_speakers: List[int] # indices of speaking faces
primary_focus: Optional[Tuple[int, int]] # (x, y) center point
layout_mode: str # "single", "dual_split", "grid"
class MediaPipeDetector:
"""Face and pose detection using MediaPipe."""
def __init__(self, min_detection_confidence: float = 0.5, min_tracking_confidence: float = 0.5):
self.min_detection_confidence = min_detection_confidence
self.min_tracking_confidence = min_tracking_confidence
self.mp_face_detection = mp.solutions.face_detection
self.mp_face_mesh = mp.solutions.face_mesh
self.face_detection = self.mp_face_detection.FaceDetection(
min_detection_confidence=min_detection_confidence,
model_selection=1
)
self.face_mesh = self.mp_face_mesh.FaceMesh(
max_num_faces=5,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence,
static_image_mode=False
)
logger.info("MediaPipe detector initialized")
def detect_faces(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces in a frame.
Args:
frame: RGB image array
Returns:
List of detected faces
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_detection.process(frame_rgb)
faces = []
if results.detections:
for detection in results.detections:
bbox = detection.location_data.relative_bounding_box
x = int(bbox.xmin * width)
y = int(bbox.ymin * height)
w = int(bbox.width * width)
h = int(bbox.height * height)
x = max(0, min(x, width - 1))
y = max(0, min(y, height - 1))
w = min(w, width - x)
h = min(h, height - y)
center_x = x + w // 2
center_y = y + h // 2
confidence = detection.score[0] if detection.score else 0.0
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=confidence,
center_x=center_x,
center_y=center_y
))
return faces
def detect_face_landmarks(self, frame: np.ndarray) -> List[FaceDetection]:
"""
Detect faces with landmarks for lip sync detection.
Args:
frame: RGB image array
Returns:
List of detected faces with landmark information
"""
height, width = frame.shape[:2]
if len(frame.shape) == 2:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif frame.shape[2] == 4:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGRA2RGB)
else:
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(frame_rgb)
faces = []
if results.multi_face_landmarks:
for face_landmarks in results.multi_face_landmarks:
xs = [lm.x for lm in face_landmarks.landmark]
ys = [lm.y for lm in face_landmarks.landmark]
x_min, x_max = min(xs), max(xs)
y_min, y_max = min(ys), max(ys)
x = int(x_min * width)
y = int(y_min * height)
w = int((x_max - x_min) * width)
h = int((y_max - y_min) * height)
center_x = x + w // 2
center_y = y + h // 2
lip_landmarks = []
for idx in [13, 14, 78, 308]:
lm = face_landmarks.landmark[idx]
lip_landmarks.append((int(lm.x * width), int(lm.y * height)))
faces.append(FaceDetection(
x=x,
y=y,
width=w,
height=h,
confidence=1.0,
center_x=center_x,
center_y=center_y,
landmarks=lip_landmarks
))
return faces
def close(self):
"""Release MediaPipe resources."""
self.face_detection.close()
self.face_mesh.close()
class AudioActivityDetector:
"""Detects speech activity in audio."""
def __init__(self, sample_rate: int = 44100, frame_duration_ms: int = 30):
self.sample_rate = sample_rate
self.frame_duration_ms = frame_duration_ms
self.frame_size = int(sample_rate * frame_duration_ms / 1000)
logger.info(f"Audio activity detector initialized (sr={sample_rate}, frame={frame_duration_ms}ms)")
def detect_speaking_periods(
self,
audio_samples: np.ndarray,
threshold: float = 0.02,
min_speech_duration: float = 0.1
) -> List[Tuple[float, float]]:
"""
Detect periods of speech in audio.
Args:
audio_samples: Audio samples array
threshold: Energy threshold for speech detection
min_speech_duration: Minimum duration of speech in seconds
Returns:
List of (start_time, end_time) tuples in seconds
"""
if audio_samples.ndim > 1:
audio_samples = audio_samples.mean(axis=1)
energies = []
for i in range(0, len(audio_samples), self.frame_size):
frame = audio_samples[i:i + self.frame_size]
if len(frame) > 0:
energy = np.sqrt(np.mean(frame ** 2))
energies.append(energy)
speaking_frames = [e > threshold for e in energies]
periods = []
start_frame = None
for i, is_speaking in enumerate(speaking_frames):
if is_speaking and start_frame is None:
start_frame = i
elif not is_speaking and start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = i * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
start_frame = None
if start_frame is not None:
start_time = start_frame * self.frame_duration_ms / 1000
end_time = len(speaking_frames) * self.frame_duration_ms / 1000
if end_time - start_time >= min_speech_duration:
periods.append((start_time, end_time))
return periods
def is_speaking_at_time(self, speaking_periods: List[Tuple[float, float]], time: float) -> bool:
"""Check if there is speech activity at a given time."""
for start, end in speaking_periods:
if start <= time <= end:
return True
return False
class ContextAnalyzer:
"""Analyzes video context to determine focus and layout."""
def __init__(self):
self.detector = MediaPipeDetector()
self.audio_detector = AudioActivityDetector()
self.previous_faces: List[FaceDetection] = []
logger.info("Context analyzer initialized")
def analyze_frame(
self,
frame: np.ndarray,
timestamp: float,
frame_number: int,
speaking_periods: Optional[List[Tuple[float, float]]] = None
) -> FrameContext:
"""
Analyze a single frame to extract context information.
Args:
frame: Video frame (BGR format from OpenCV)
timestamp: Frame timestamp in seconds
frame_number: Frame index
speaking_periods: List of (start, end) times where speech is detected
Returns:
FrameContext with detection results
"""
faces = self.detector.detect_face_landmarks(frame)
if not faces:
faces = self.detector.detect_faces(frame)
# Determine who is speaking
active_speakers = []
for i, face in enumerate(faces):
is_speaking = False
if speaking_periods and self.audio_detector.is_speaking_at_time(speaking_periods, timestamp):
is_speaking = True
if face.landmarks and len(self.previous_faces) > i:
is_speaking = is_speaking or self._detect_lip_movement(face, self.previous_faces[i])
if is_speaking:
active_speakers.append(i)
num_faces = len(faces)
num_speakers = len(active_speakers)
if num_faces == 0:
layout_mode = "single"
elif num_faces == 1:
layout_mode = "single"
elif num_faces == 2:
layout_mode = "dual_split"
elif num_faces >= 3:
layout_mode = "dual_split"
else:
layout_mode = "single"
primary_focus = self._calculate_focus_point(faces, active_speakers)
self.previous_faces = faces
return FrameContext(
frame_number=frame_number,
timestamp=timestamp,
detected_faces=faces,
active_speakers=active_speakers,
primary_focus=primary_focus,
layout_mode=layout_mode
)
def _detect_lip_movement(self, current_face: FaceDetection, previous_face: FaceDetection) -> bool:
"""
Detect lip movement by comparing landmarks between frames.
Args:
current_face: Current frame face detection
previous_face: Previous frame face detection
Returns:
True if significant lip movement detected
"""
if not current_face.landmarks or not previous_face.landmarks:
return False
def lip_distance(landmarks):
if len(landmarks) < 4:
return 0
upper = np.array(landmarks[0:2])
lower = np.array(landmarks[2:4])
return np.linalg.norm(upper.mean(axis=0) - lower.mean(axis=0))
current_dist = lip_distance(current_face.landmarks)
previous_dist = lip_distance(previous_face.landmarks)
threshold = 2.0
return abs(current_dist - previous_dist) > threshold
def _calculate_focus_point(
self,
faces: List[FaceDetection],
active_speakers: List[int]
) -> Optional[Tuple[int, int]]:
"""
Calculate the primary focus point based on detected faces and speakers.
IMPORTANT: This focuses on ONE person to avoid focusing on empty space (table).
When multiple people are present, we pick the most relevant person, not average positions.
Args:
faces: List of detected faces
active_speakers: Indices of faces that are speaking
Returns:
(x, y) tuple of focus center, or None if no faces
"""
if not faces:
return None
if active_speakers:
speaker_faces = [faces[i] for i in active_speakers if i < len(faces)]
if speaker_faces:
primary_speaker = max(speaker_faces, key=lambda f: f.confidence)
return (primary_speaker.center_x, primary_speaker.center_y)
most_confident = max(faces, key=lambda f: f.confidence)
return (most_confident.center_x, most_confident.center_y)
def close(self):
"""Release resources."""
self.detector.close()