137 lines
5.3 KiB
Python
137 lines
5.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any, Callable, Dict
|
|
|
|
import pika
|
|
|
|
from video_render.config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MessageHandler = Callable[[Dict[str, Any]], Dict[str, Any]]
|
|
|
|
|
|
def _safe_ack(
|
|
channel: pika.adapters.blocking_connection.BlockingChannel, delivery_tag
|
|
) -> bool:
|
|
if not channel.is_open:
|
|
logger.warning(
|
|
"Canal fechado antes do ACK; mensagem sera reprocessada apos reconexao"
|
|
)
|
|
return False
|
|
try:
|
|
channel.basic_ack(delivery_tag=delivery_tag)
|
|
return True
|
|
except Exception:
|
|
logger.exception("Falha ao confirmar mensagem")
|
|
return False
|
|
|
|
|
|
class RabbitMQWorker:
|
|
def __init__(self, settings: Settings) -> None:
|
|
self.settings = settings
|
|
self._params = pika.ConnectionParameters(
|
|
host=settings.rabbitmq.host,
|
|
port=settings.rabbitmq.port,
|
|
credentials=pika.PlainCredentials(
|
|
settings.rabbitmq.user, settings.rabbitmq.password
|
|
),
|
|
heartbeat=settings.rabbitmq.heartbeat,
|
|
blocked_connection_timeout=settings.rabbitmq.blocked_timeout,
|
|
)
|
|
|
|
def consume_forever(self, handler: MessageHandler) -> None:
|
|
while True:
|
|
try:
|
|
with pika.BlockingConnection(self._params) as connection:
|
|
channel = connection.channel()
|
|
channel.queue_declare(
|
|
queue=self.settings.rabbitmq.consume_queue, durable=True
|
|
)
|
|
channel.queue_declare(
|
|
queue=self.settings.rabbitmq.publish_queue, durable=True
|
|
)
|
|
channel.basic_qos(
|
|
prefetch_count=self.settings.rabbitmq.prefetch_count
|
|
)
|
|
|
|
def _on_message(
|
|
ch: pika.adapters.blocking_connection.BlockingChannel,
|
|
method,
|
|
properties,
|
|
body,
|
|
) -> None:
|
|
"""Consume message, ACK immediately, then process."""
|
|
try:
|
|
message = json.loads(body)
|
|
except json.JSONDecodeError:
|
|
logger.error("Mensagem invalida recebida: %s", body)
|
|
_safe_ack(ch, method.delivery_tag)
|
|
return
|
|
|
|
if not _safe_ack(ch, method.delivery_tag):
|
|
logger.warning(
|
|
"Nao foi possivel confirmar mensagem; abortando processamento"
|
|
)
|
|
return
|
|
|
|
logger.info(
|
|
"Mensagem recebida: %s",
|
|
message.get("filename", "<sem_nome>"),
|
|
)
|
|
|
|
try:
|
|
response = handler(message)
|
|
except Exception:
|
|
logger.exception("Erro nao tratado durante o processamento")
|
|
response = {
|
|
"hasError": True,
|
|
"error": "Erro nao tratado no pipeline",
|
|
"filename": message.get("filename"),
|
|
"videoId": message.get("videoId"),
|
|
"url": message.get("url"),
|
|
"processedFiles": [],
|
|
}
|
|
|
|
self._publish_response(response)
|
|
|
|
channel.basic_consume(
|
|
queue=self.settings.rabbitmq.consume_queue,
|
|
on_message_callback=_on_message,
|
|
auto_ack=False,
|
|
)
|
|
logger.info("Consumidor iniciado. Aguardando mensagens...")
|
|
channel.start_consuming()
|
|
except pika.exceptions.AMQPConnectionError:
|
|
logger.exception(
|
|
"Conexao com RabbitMQ perdida. Tentando reconectar..."
|
|
)
|
|
except pika.exceptions.AMQPError:
|
|
logger.exception("Erro AMQP inesperado. Reiniciando consumo...")
|
|
except KeyboardInterrupt:
|
|
logger.info("Encerrando consumidor por interrupcao do usuario.")
|
|
break
|
|
|
|
def _publish_response(self, response: Dict[str, Any]) -> None:
|
|
payload = json.dumps(response)
|
|
try:
|
|
with pika.BlockingConnection(self._params) as publish_connection:
|
|
publish_channel = publish_connection.channel()
|
|
publish_channel.queue_declare(
|
|
queue=self.settings.rabbitmq.publish_queue, durable=True
|
|
)
|
|
publish_channel.basic_publish(
|
|
exchange="",
|
|
routing_key=self.settings.rabbitmq.publish_queue,
|
|
body=payload,
|
|
properties=pika.BasicProperties(delivery_mode=2),
|
|
)
|
|
logger.info(
|
|
"Resposta publicada para '%s'",
|
|
self.settings.rabbitmq.publish_queue,
|
|
)
|
|
except Exception:
|
|
logger.exception("Falha ao publicar a resposta na fila de upload apos ACK")
|