import os import uuid from typing import Optional from fastapi import FastAPI, HTTPException, Query from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import SRTFormatter from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound from yt_dlp import YoutubeDL from utils import extract_video_id, sanitize_title from proxy_manager import execute_with_proxy_retry, ProxyError app = FastAPI( title="YouTube Transcript, Download and Metadata API", version="1.0.0" ) @app.get("/get-transcript") def get_transcript( url: Optional[str] = Query(None, description="URL completa do vídeo"), videoId: Optional[str] = Query(None, alias="videoId", description="ID do vídeo no YouTube") ): if not url and not videoId: raise HTTPException(status_code=400, detail="Informe 'url' ou 'videoId'") if url: try: video_id = extract_video_id(url) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) else: video_id = videoId try: ytt_api = YouTubeTranscriptApi() result = ytt_api.fetch(video_id, languages=['pt']) formatter = SRTFormatter() if not result: raise NoTranscriptFound("Nenhuma transcrição encontrada para este vídeo") except TranscriptsDisabled: raise HTTPException(status_code=404, detail="Transcrição desativada para este vídeo") except NoTranscriptFound: raise HTTPException(status_code=404, detail="Nenhuma transcrição encontrada") except Exception as e: raise HTTPException(status_code=500, detail=f"Erro ao obter transcrição: {e}") return { "video_id": video_id, "transcript": formatter.format_transcript(result), } @app.get("/get-video-metadata") def get_video_metadata( url: Optional[str] = Query(None, description="URL completa do vídeo"), videoId: Optional[str] = Query(None, alias="videoId", description="ID do vídeo no YouTube") ): if not url and not videoId: raise HTTPException(status_code=400, detail="Informe 'url' ou 'videoId'") if url: target = url else: target = f"https://www.youtube.com/watch?v={videoId}" ydl_opts = { 'quiet': True, 'no_warnings': True, 'skip_download': True, 'nocheckcertificate': True, 'ignoreerrors': True, 'no_color': True, 'extract_flat': 'in_playlist', 'force_generic_extractor': True, 'format': 'best[ext=mp4]/best[ext=webm]/best', 'allow_unplayable_formats': True, 'socket_timeout': 8, 'retries': 0, } try: def extract_metadata(ydl): info = ydl.extract_info(target, download=False, process=False) if not info or 'title' not in info: info = ydl.extract_info(target, download=False) if not info or 'title' not in info: simple_ydl_opts = { 'quiet': True, 'skip_download': True, 'extract_flat': True, 'force_generic_extractor': True, } with YoutubeDL(simple_ydl_opts) as simple_ydl: info = simple_ydl.extract_info(target, download=False) if not info: raise Exception("Não foi possível extrair as informações do vídeo") if isinstance(info, dict): if 'title' not in info and 'url' in info: info = ydl.extract_info(info['url'], download=False) if 'title' not in info: info['title'] = f"Vídeo {videoId or 'desconhecido'}" return info info = execute_with_proxy_retry(ydl_opts, extract_metadata) except ProxyError as e: error_msg = str(e).replace('\n', ' ').strip() raise HTTPException( status_code=503, detail=f"Erro com proxies: {error_msg}" ) except Exception as e: error_msg = str(e).replace('\n', ' ').strip() try: import requests from bs4 import BeautifulSoup video_id = videoId or (url.split('v=')[-1].split('&')[0] if 'v=' in url else '') if video_id: response = requests.get(f'https://www.youtube.com/watch?v={video_id}', timeout=10) soup = BeautifulSoup(response.text, 'html.parser') title = soup.find('title').text.replace(' - YouTube', '').strip() if title and title != 'YouTube': return { 'id': video_id, 'title': title, 'url': f'https://www.youtube.com/watch?v={video_id}', 'thumbnail': f'https://img.youtube.com/vi/{video_id}/hqdefault.jpg', 'error': f'Informações limitadas: {error_msg}' } except Exception as fallback_error: pass raise HTTPException( status_code=500, detail=f"Erro ao processar o vídeo: {error_msg}" ) return info @app.get("/download-video") def download_video( url: Optional[str] = Query(None, description="URL completa do vídeo"), videoId: Optional[str] = Query(None, alias="videoId", description="ID do vídeo no YouTube"), qualidade: str = Query("high", description="Qualidade do vídeo: low, medium, high") ): if not url and not videoId: raise HTTPException(status_code=400, detail="Informe 'url' ou 'videoId'") if url: target = url try: video_id = extract_video_id(url) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) else: target = f"https://www.youtube.com/watch?v={videoId}" video_id = videoId quality_map = { "low": "bestvideo[height<=480]+bestaudio/best[height<=480]/bestvideo[height<=480]/best[height<=480]/best", "medium": "bestvideo[height<=720]+bestaudio/best[height<=720]/bestvideo[height<=720]/best[height<=720]/best", "high": "bestvideo+bestaudio/best" } qualidade = qualidade.lower() if qualidade not in quality_map: raise HTTPException(status_code=400, detail="Qualidade deve ser: low, medium ou high") videos_dir = "/app/videos" os.makedirs(videos_dir, exist_ok=True) unique_id = str(uuid.uuid4()) output_template = os.path.join(videos_dir, f"{unique_id}.%(ext)s") ydl_opts = { "format": quality_map[qualidade], "outtmpl": output_template, "quiet": True, "noplaylist": True, "merge_output_format": "mp4", "cookiefile": "/app/cookies.txt", "socket_timeout": 8, "retries": 0, "extractor_retries": 0, } try: def download_operation(ydl): base = ydl.extract_info(target, download=False) title = base.get("title", unique_id) clean_title = sanitize_title(title) filename = f"{clean_title}_{qualidade}.mp4" final_path = os.path.join(videos_dir, filename) print('Info ok') if os.path.exists(final_path): return { "videoId": video_id, "filename": filename, "cached": True } print('Lets download') result = ydl.extract_info(target, download=True) if "requested_downloads" in result and len(result["requested_downloads"]) > 0: real_file_path = result["requested_downloads"][0]["filepath"] elif "filepath" in result: real_file_path = result["filepath"] else: real_file_path = output_template.replace("%(ext)s", "mp4") os.rename(real_file_path, final_path) return { "videoId": video_id, "filename": filename, "cached": False } return execute_with_proxy_retry(ydl_opts, download_operation) except ProxyError as e: raise HTTPException(status_code=503, detail=f"Erro com proxies: {e}") except Exception as e: raise HTTPException(status_code=500, detail=f"Erro ao baixar vídeo: {e}") @app.get("/search") def search_youtube_yt_dlp( q: str = Query(..., description="Termo de busca"), max_results: int = Query(5, ge=1, le=10, description="Número de resultados (máx 10)") ): ydl_opts = { "quiet": True, "extract_flat": "in_playlist", "skip_download": True, "socket_timeout": 8, "retries": 0, } search_query = f"ytsearch{max_results}:{q}" try: def search_operation(ydl): search_result = ydl.extract_info(search_query, download=False) entries = search_result.get("entries", [])[:max_results] results = [] for item in entries: results.append({ "videoId": item.get("id"), "title": item.get("title"), "duration": item.get("duration"), "url": item.get("webpage_url"), "channel": item.get("uploader"), "thumbnail": item.get("thumbnail"), }) return {"results": results} return execute_with_proxy_retry(ydl_opts, search_operation) except ProxyError as e: raise HTTPException(status_code=503, detail=f"Erro com proxies: {e}") except Exception as e: raise HTTPException(status_code=500, detail=f"Erro ao buscar vídeos: {e}") @app.get("/list-formats") def list_formats(url: str): opts = { "quiet": False, "no_warnings": False, "noplaylist": True, "force_ipv4": True, "geo_bypass": True, "extractor_args": {"youtube": {"player_client": ["android"], "player_skip": ["webpage"]}}, "http_headers": { "Accept-Language": "pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7", "User-Agent": "com.google.android.youtube/19.17.36 (Linux; U; Android 13) gzip", }, "socket_timeout": 8, "retries": 0, } try: def list_formats_operation(ydl): info = ydl.extract_info(url, download=False) fmts = info.get("formats") or [] brief = [{ "id": f.get("format_id"), "ext": f.get("ext"), "h": f.get("height"), "fps": f.get("fps"), "v": f.get("vcodec"), "a": f.get("acodec"), "tbr": f.get("tbr"), } for f in fmts] return {"total": len(brief), "formats": brief[:60]} return execute_with_proxy_retry(opts, list_formats_operation) except ProxyError as e: raise HTTPException(status_code=503, detail=f"Erro com proxies: {e}") except Exception as e: raise HTTPException(status_code=500, detail=f"Erro ao listar formatos: {e}")