Files
video-render/video_render/transcription.py
2025-10-28 17:34:13 -03:00

193 lines
6.2 KiB
Python

from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from faster_whisper import WhisperModel
from video_render.config import Settings
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class WordTiming:
start: float
end: float
word: str
@dataclass(frozen=True)
class TranscriptSegment:
id: int
start: float
end: float
text: str
words: List[WordTiming]
@dataclass(frozen=True)
class TranscriptionResult:
segments: List[TranscriptSegment]
full_text: str
class TranscriptionService:
def __init__(self, settings: Settings) -> None:
self.settings = settings
self._model: Optional[WhisperModel] = None
def _load_model(self) -> WhisperModel:
if self._model is None:
logger.info(
"Carregando modelo Faster-Whisper '%s' (device=%s, compute_type=%s)",
self.settings.whisper.model_size,
self.settings.whisper.device or "auto",
self.settings.whisper.compute_type or "default",
)
self._model = WhisperModel(
self.settings.whisper.model_size,
device=self.settings.whisper.device or "auto",
compute_type=self.settings.whisper.compute_type or "default",
download_root=str(self.settings.whisper.download_root),
)
return self._model
def transcribe(self, audio_path: Path) -> TranscriptionResult:
model = self._load_model()
segments, _ = model.transcribe(
str(audio_path),
beam_size=5,
word_timestamps=True,
)
parsed_segments: List[TranscriptSegment] = []
full_text_parts: List[str] = []
for idx, segment in enumerate(segments):
words = [
WordTiming(start=w.start, end=w.end, word=w.word.strip())
for w in segment.words or []
if w.word.strip()
]
text = segment.text.strip()
full_text_parts.append(text)
parsed_segments.append(
TranscriptSegment(
id=idx,
start=segment.start,
end=segment.end,
text=text,
words=words,
)
)
return TranscriptionResult(
segments=parsed_segments,
full_text=" ".join(full_text_parts).strip(),
)
@staticmethod
def persist(result: TranscriptionResult, destination: Path) -> None:
json_path = destination / "transcription.json"
text_path = destination / "transcription.txt"
payload = {
"segments": [
{
"id": segment.id,
"start": segment.start,
"end": segment.end,
"text": segment.text,
"words": [
{"start": word.start, "end": word.end, "text": word.word}
for word in segment.words
],
}
for segment in result.segments
],
"full_text": result.full_text,
}
with json_path.open("w", encoding="utf-8") as fp:
json.dump(payload, fp, ensure_ascii=False, indent=2)
with text_path.open("w", encoding="utf-8") as fp:
fp.write(result.full_text)
logger.info("Transcricao salva em %s", destination)
@staticmethod
def load(source: Path) -> Optional[TranscriptionResult]:
json_path = source / "transcription.json"
if not json_path.exists():
return None
try:
with json_path.open("r", encoding="utf-8") as fp:
payload = json.load(fp)
except (OSError, json.JSONDecodeError) as exc:
logger.warning(
"Falha ao carregar transcricao existente de %s: %s", json_path, exc
)
return None
segments_payload = payload.get("segments", [])
if not isinstance(segments_payload, list):
logger.warning(
"Formato inesperado ao carregar transcricao de %s: 'segments' invalido",
json_path,
)
return None
segments: List[TranscriptSegment] = []
for idx, segment_data in enumerate(segments_payload):
if not isinstance(segment_data, dict):
logger.debug("Segmento invalido ignorado ao carregar: %s", segment_data)
continue
try:
segment_id = int(segment_data.get("id", idx))
start = float(segment_data["start"])
end = float(segment_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug("Segmento sem dados obrigatorios ignorado: %s", segment_data)
continue
text = str(segment_data.get("text", "")).strip()
words_payload = segment_data.get("words", [])
words: List[WordTiming] = []
if isinstance(words_payload, list):
for word_data in words_payload:
if not isinstance(word_data, dict):
continue
try:
w_start = float(word_data["start"])
w_end = float(word_data["end"])
except (KeyError, TypeError, ValueError):
logger.debug(
"Palavra sem dados obrigatorios ignorada: %s", word_data
)
continue
word_text = str(word_data.get("text", "")).strip()
if not word_text:
continue
words.append(WordTiming(start=w_start, end=w_end, word=word_text))
segments.append(
TranscriptSegment(
id=segment_id,
start=start,
end=end,
text=text,
words=words,
)
)
full_text = str(payload.get("full_text", "")).strip()
return TranscriptionResult(segments=segments, full_text=full_text)