From 98613a000231dd3b4a42a19941c59a7fb0c3bdb1 Mon Sep 17 00:00:00 2001 From: admin Date: Tue, 5 Aug 2025 14:58:44 +0200 Subject: [PATCH] Implementa desacoplamento de I/O --- main.py | 124 +++++++++++++++++++++++++++----------------------------- 1 file changed, 59 insertions(+), 65 deletions(-) diff --git a/main.py b/main.py index 2dbd5d5..43059a5 100644 --- a/main.py +++ b/main.py @@ -14,13 +14,33 @@ RABBITMQ_UPLOAD_QUEUE = os.environ.get('RABBITMQ_UPLOAD_QUEUE', 'to-upload') if not RABBITMQ_PASS: raise RuntimeError("RABBITMQ_PASS não definido no ambiente") +def get_next_message(): + credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + credentials=credentials, + heartbeat=60, + blocked_connection_timeout=300 + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + method_frame, header_frame, body = channel.basic_get(RABBITMQ_QUEUE) + if method_frame: + channel.basic_ack(method_frame.delivery_tag) + connection.close() + return body + else: + connection.close() + return None + def publish_to_queue(payload): credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) parameters = pika.ConnectionParameters( host=RABBITMQ_HOST, port=RABBITMQ_PORT, credentials=credentials, - heartbeat=60, # Usando heartbeat menor + heartbeat=60, blocked_connection_timeout=300 ) connection = pika.BlockingConnection(parameters) @@ -36,74 +56,48 @@ def publish_to_queue(payload): ) connection.close() -def callback(ch, method, properties, body): - try: - data = json.loads(body) - filename = data.get("filename") - times = data.get("times", []) - url = data.get("url") - video_id = data.get("videoId") - - print(f"Processando vídeo: {filename}") - - processed_files = process_full_video(filename, times) - - payload = { - "videosProcessedQuantity": len(processed_files), - "filename": filename, - "processedFiles": processed_files, - "url": url, - "videoId": video_id, - "error": False, - } - except Exception as e: - payload = { - "videosProcessedQuantity": 0, - "filename": filename if 'filename' in locals() else None, - "processedFiles": [], - "url": url if 'url' in locals() else None, - "videoId": video_id if 'video_id' in locals() else None, - "error": str(e), - } - - print(f"Erro no processamento: {e}") - - try: - publish_to_queue(payload) - - print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") - except Exception as publish_err: - print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}") - finally: - try: - ch.basic_ack(delivery_tag=method.delivery_tag) - except Exception as ack_err: - print(f"Erro ao dar ack: {ack_err}") - def main(): + print(' [*] Esperando mensagens. Para sair: CTRL+C') while True: + body = get_next_message() + if body is None: + time.sleep(5) + continue + try: - credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS) - parameters = pika.ConnectionParameters( - host=RABBITMQ_HOST, - port=RABBITMQ_PORT, - credentials=credentials, - heartbeat=60, # Usando heartbeat menor - blocked_connection_timeout=300 - ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True) - channel.basic_qos(prefetch_count=1) - channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback) - print(' [*] Esperando mensagens. Para sair: CTRL+C') - channel.start_consuming() - except pika.exceptions.StreamLostError as e: - print(f"Conexão perdida: {e}. Reconectando em 5s...") - time.sleep(5) + data = json.loads(body) + filename = data.get("filename") + times = data.get("times", []) + url = data.get("url") + video_id = data.get("videoId") + print(f"Processando vídeo: {filename}") + + processed_files = process_full_video(filename, times) + + payload = { + "videosProcessedQuantity": len(processed_files), + "filename": filename, + "processedFiles": processed_files, + "url": url, + "videoId": video_id, + "error": False, + } except Exception as e: - print(f"Erro inesperado: {e}. Reconectando em 5s...") - time.sleep(5) + payload = { + "videosProcessedQuantity": 0, + "filename": filename if 'filename' in locals() else None, + "processedFiles": [], + "url": url if 'url' in locals() else None, + "videoId": video_id if 'video_id' in locals() else None, + "error": str(e), + } + print(f"Erro no processamento: {e}") + + try: + publish_to_queue(payload) + print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.") + except Exception as publish_err: + print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}") if __name__ == "__main__": main()