334 lines
12 KiB
Python
334 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
import numpy as np
|
|
from faster_whisper import WhisperModel
|
|
|
|
from video_render.config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class WordTiming:
|
|
start: float
|
|
end: float
|
|
word: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TranscriptSegment:
|
|
id: int
|
|
start: float
|
|
end: float
|
|
text: str
|
|
words: List[WordTiming]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class TranscriptionResult:
|
|
segments: List[TranscriptSegment]
|
|
full_text: str
|
|
|
|
|
|
class TranscriptionService:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self._model: Optional[WhisperModel] = None
|
|
|
|
def _load_model(self) -> WhisperModel:
|
|
if self._model is None:
|
|
logger.info(
|
|
"Carregando modelo Faster-Whisper '%s' (device=%s, compute_type=%s)",
|
|
self.settings.whisper.model_size,
|
|
self.settings.whisper.device or "auto",
|
|
self.settings.whisper.compute_type or "default",
|
|
)
|
|
self._model = WhisperModel(
|
|
self.settings.whisper.model_size,
|
|
device=self.settings.whisper.device or "auto",
|
|
compute_type=self.settings.whisper.compute_type or "default",
|
|
download_root=str(self.settings.whisper.download_root),
|
|
)
|
|
return self._model
|
|
|
|
def unload_model(self) -> None:
|
|
"""Unload the Whisper model to free memory (reduces RAM usage by 1-3GB)."""
|
|
if self._model is not None:
|
|
logger.info("Descarregando modelo Whisper para liberar memória...")
|
|
del self._model
|
|
self._model = None
|
|
# Force garbage collection to immediately free GPU/CPU memory
|
|
import gc
|
|
gc.collect()
|
|
logger.info("Modelo Whisper descarregado com sucesso")
|
|
|
|
def transcribe(self, audio_path: Path, output_dir: Optional[Path] = None) -> TranscriptionResult:
|
|
if output_dir is not None:
|
|
existing_transcription = self.load(output_dir)
|
|
if existing_transcription is not None:
|
|
logger.info("Transcrição já existe em %s, reutilizando...", output_dir)
|
|
return existing_transcription
|
|
|
|
# Get audio duration to decide if we need chunked processing
|
|
audio_duration = self._get_audio_duration(audio_path)
|
|
chunk_duration_minutes = 30 # Process in 30-minute chunks for long videos
|
|
chunk_duration_seconds = chunk_duration_minutes * 60
|
|
|
|
# For videos longer than 30 minutes, use chunked processing to avoid OOM
|
|
if audio_duration > chunk_duration_seconds:
|
|
logger.info(
|
|
f"Áudio longo detectado ({audio_duration/60:.1f} min). "
|
|
f"Processando em chunks de {chunk_duration_minutes} min para evitar erro de memória..."
|
|
)
|
|
return self._transcribe_chunked(audio_path, chunk_duration_seconds)
|
|
else:
|
|
logger.info(f"Iniciando transcrição do áudio ({audio_duration/60:.1f} min) com FasterWhisper...")
|
|
return self._transcribe_full(audio_path)
|
|
|
|
def _get_audio_duration(self, audio_path: Path) -> float:
|
|
"""Get audio duration in seconds."""
|
|
try:
|
|
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
|
with AudioFileClip(str(audio_path)) as audio:
|
|
return audio.duration or 0.0
|
|
except Exception as e:
|
|
logger.warning(f"Falha ao obter duração do áudio, assumindo curto: {e}")
|
|
return 0.0 # Assume short if we can't determine
|
|
|
|
def _transcribe_full(self, audio_path: Path) -> TranscriptionResult:
|
|
"""Transcribe entire audio at once (for shorter videos)."""
|
|
model = self._load_model()
|
|
segments, _ = model.transcribe(
|
|
str(audio_path),
|
|
beam_size=5,
|
|
word_timestamps=True,
|
|
)
|
|
|
|
parsed_segments: List[TranscriptSegment] = []
|
|
full_text_parts: List[str] = []
|
|
|
|
for idx, segment in enumerate(segments):
|
|
words = [
|
|
WordTiming(start=w.start, end=w.end, word=w.word.strip())
|
|
for w in segment.words or []
|
|
if w.word.strip()
|
|
]
|
|
text = segment.text.strip()
|
|
full_text_parts.append(text)
|
|
parsed_segments.append(
|
|
TranscriptSegment(
|
|
id=idx,
|
|
start=segment.start,
|
|
end=segment.end,
|
|
text=text,
|
|
words=words,
|
|
)
|
|
)
|
|
|
|
return TranscriptionResult(
|
|
segments=parsed_segments,
|
|
full_text=" ".join(full_text_parts).strip(),
|
|
)
|
|
|
|
def _transcribe_chunked(self, audio_path: Path, chunk_duration: float) -> TranscriptionResult:
|
|
"""Transcribe audio in chunks to avoid OOM on long videos."""
|
|
import subprocess
|
|
from moviepy.audio.io.AudioFileClip import AudioFileClip
|
|
|
|
model = self._load_model()
|
|
all_segments: List[TranscriptSegment] = []
|
|
full_text_parts: List[str] = []
|
|
segment_id_counter = 0
|
|
|
|
# Get total duration
|
|
total_duration = self._get_audio_duration(audio_path)
|
|
num_chunks = int(np.ceil(total_duration / chunk_duration))
|
|
|
|
logger.info(f"Processando áudio em {num_chunks} chunks...")
|
|
|
|
for chunk_idx in range(num_chunks):
|
|
start_time = chunk_idx * chunk_duration
|
|
end_time = min((chunk_idx + 1) * chunk_duration, total_duration)
|
|
|
|
logger.info(
|
|
f"Processando chunk {chunk_idx + 1}/{num_chunks} "
|
|
f"({start_time/60:.1f}min - {end_time/60:.1f}min)..."
|
|
)
|
|
|
|
# Extract chunk using ffmpeg directly (more reliable than moviepy subclip)
|
|
temp_chunk_path = audio_path.parent / f"temp_chunk_{chunk_idx}.wav"
|
|
try:
|
|
# Use ffmpeg to extract the chunk
|
|
chunk_duration_actual = end_time - start_time
|
|
ffmpeg_cmd = [
|
|
'ffmpeg',
|
|
'-y', # Overwrite output file
|
|
'-ss', str(start_time), # Start time
|
|
'-i', str(audio_path), # Input file
|
|
'-t', str(chunk_duration_actual), # Duration
|
|
'-acodec', 'pcm_s16le', # Audio codec
|
|
'-ar', '44100', # Sample rate
|
|
'-ac', '2', # Stereo
|
|
'-loglevel', 'error', # Only show errors
|
|
str(temp_chunk_path)
|
|
]
|
|
|
|
subprocess.run(ffmpeg_cmd, check=True, capture_output=True)
|
|
|
|
# Transcribe chunk
|
|
segments, _ = model.transcribe(
|
|
str(temp_chunk_path),
|
|
beam_size=5,
|
|
word_timestamps=True,
|
|
)
|
|
|
|
# Process segments with time offset
|
|
for segment in segments:
|
|
words = [
|
|
WordTiming(
|
|
start=w.start + start_time,
|
|
end=w.end + start_time,
|
|
word=w.word.strip()
|
|
)
|
|
for w in segment.words or []
|
|
if w.word.strip()
|
|
]
|
|
text = segment.text.strip()
|
|
full_text_parts.append(text)
|
|
all_segments.append(
|
|
TranscriptSegment(
|
|
id=segment_id_counter,
|
|
start=segment.start + start_time,
|
|
end=segment.end + start_time,
|
|
text=text,
|
|
words=words,
|
|
)
|
|
)
|
|
segment_id_counter += 1
|
|
|
|
# Force garbage collection after each chunk
|
|
import gc
|
|
gc.collect()
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
logger.error(f"Erro ao extrair chunk {chunk_idx}: {e.stderr.decode() if e.stderr else str(e)}")
|
|
raise
|
|
finally:
|
|
# Clean up temp chunk
|
|
if temp_chunk_path.exists():
|
|
temp_chunk_path.unlink()
|
|
|
|
logger.info(f"Transcrição em chunks concluída: {len(all_segments)} segmentos processados")
|
|
|
|
return TranscriptionResult(
|
|
segments=all_segments,
|
|
full_text=" ".join(full_text_parts).strip(),
|
|
)
|
|
|
|
@staticmethod
|
|
def persist(result: TranscriptionResult, destination: Path) -> None:
|
|
json_path = destination / "transcription.json"
|
|
text_path = destination / "transcription.txt"
|
|
|
|
payload = {
|
|
"segments": [
|
|
{
|
|
"id": segment.id,
|
|
"start": segment.start,
|
|
"end": segment.end,
|
|
"text": segment.text,
|
|
"words": [
|
|
{"start": word.start, "end": word.end, "text": word.word}
|
|
for word in segment.words
|
|
],
|
|
}
|
|
for segment in result.segments
|
|
],
|
|
"full_text": result.full_text,
|
|
}
|
|
|
|
with json_path.open("w", encoding="utf-8") as fp:
|
|
json.dump(payload, fp, ensure_ascii=False, indent=2)
|
|
|
|
with text_path.open("w", encoding="utf-8") as fp:
|
|
fp.write(result.full_text)
|
|
|
|
logger.info("Transcricao salva em %s", destination)
|
|
|
|
@staticmethod
|
|
def load(source: Path) -> Optional[TranscriptionResult]:
|
|
json_path = source / "transcription.json"
|
|
if not json_path.exists():
|
|
return None
|
|
|
|
try:
|
|
with json_path.open("r", encoding="utf-8") as fp:
|
|
payload = json.load(fp)
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
logger.warning(
|
|
"Falha ao carregar transcricao existente de %s: %s", json_path, exc
|
|
)
|
|
return None
|
|
|
|
segments_payload = payload.get("segments", [])
|
|
if not isinstance(segments_payload, list):
|
|
logger.warning(
|
|
"Formato inesperado ao carregar transcricao de %s: 'segments' invalido",
|
|
json_path,
|
|
)
|
|
return None
|
|
|
|
segments: List[TranscriptSegment] = []
|
|
for idx, segment_data in enumerate(segments_payload):
|
|
if not isinstance(segment_data, dict):
|
|
logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data)
|
|
continue
|
|
try:
|
|
segment_id = int(segment_data.get("id", idx))
|
|
start = float(segment_data["start"])
|
|
end = float(segment_data["end"])
|
|
except (KeyError, TypeError, ValueError):
|
|
logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data)
|
|
continue
|
|
|
|
text = str(segment_data.get("text", "")).strip()
|
|
words_payload = segment_data.get("words", [])
|
|
words: List[WordTiming] = []
|
|
|
|
if isinstance(words_payload, list):
|
|
for word_data in words_payload:
|
|
if not isinstance(word_data, dict):
|
|
continue
|
|
try:
|
|
w_start = float(word_data["start"])
|
|
w_end = float(word_data["end"])
|
|
except (KeyError, TypeError, ValueError):
|
|
logger.debug(
|
|
"Palavra sem dados obrigatorios ignorada: %s", word_data
|
|
)
|
|
continue
|
|
word_text = str(word_data.get("text", "")).strip()
|
|
if not word_text:
|
|
continue
|
|
words.append(WordTiming(start=w_start, end=w_end, word=word_text))
|
|
|
|
segments.append(
|
|
TranscriptSegment(
|
|
id=segment_id,
|
|
start=start,
|
|
end=end,
|
|
text=text,
|
|
words=words,
|
|
)
|
|
)
|
|
|
|
full_text = str(payload.get("full_text", "")).strip()
|
|
return TranscriptionResult(segments=segments, full_text=full_text)
|
|
|