Implementa desacoplamento de I/O
This commit is contained in:
64
main.py
64
main.py
@@ -14,13 +14,33 @@ RABBITMQ_UPLOAD_QUEUE = os.environ.get('RABBITMQ_UPLOAD_QUEUE', 'to-upload')
|
|||||||
if not RABBITMQ_PASS:
|
if not RABBITMQ_PASS:
|
||||||
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
|
raise RuntimeError("RABBITMQ_PASS não definido no ambiente")
|
||||||
|
|
||||||
|
def get_next_message():
|
||||||
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||||
|
parameters = pika.ConnectionParameters(
|
||||||
|
host=RABBITMQ_HOST,
|
||||||
|
port=RABBITMQ_PORT,
|
||||||
|
credentials=credentials,
|
||||||
|
heartbeat=60,
|
||||||
|
blocked_connection_timeout=300
|
||||||
|
)
|
||||||
|
connection = pika.BlockingConnection(parameters)
|
||||||
|
channel = connection.channel()
|
||||||
|
method_frame, header_frame, body = channel.basic_get(RABBITMQ_QUEUE)
|
||||||
|
if method_frame:
|
||||||
|
channel.basic_ack(method_frame.delivery_tag)
|
||||||
|
connection.close()
|
||||||
|
return body
|
||||||
|
else:
|
||||||
|
connection.close()
|
||||||
|
return None
|
||||||
|
|
||||||
def publish_to_queue(payload):
|
def publish_to_queue(payload):
|
||||||
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
||||||
parameters = pika.ConnectionParameters(
|
parameters = pika.ConnectionParameters(
|
||||||
host=RABBITMQ_HOST,
|
host=RABBITMQ_HOST,
|
||||||
port=RABBITMQ_PORT,
|
port=RABBITMQ_PORT,
|
||||||
credentials=credentials,
|
credentials=credentials,
|
||||||
heartbeat=60, # Usando heartbeat menor
|
heartbeat=60,
|
||||||
blocked_connection_timeout=300
|
blocked_connection_timeout=300
|
||||||
)
|
)
|
||||||
connection = pika.BlockingConnection(parameters)
|
connection = pika.BlockingConnection(parameters)
|
||||||
@@ -36,14 +56,20 @@ def publish_to_queue(payload):
|
|||||||
)
|
)
|
||||||
connection.close()
|
connection.close()
|
||||||
|
|
||||||
def callback(ch, method, properties, body):
|
def main():
|
||||||
|
print(' [*] Esperando mensagens. Para sair: CTRL+C')
|
||||||
|
while True:
|
||||||
|
body = get_next_message()
|
||||||
|
if body is None:
|
||||||
|
time.sleep(5)
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = json.loads(body)
|
data = json.loads(body)
|
||||||
filename = data.get("filename")
|
filename = data.get("filename")
|
||||||
times = data.get("times", [])
|
times = data.get("times", [])
|
||||||
url = data.get("url")
|
url = data.get("url")
|
||||||
video_id = data.get("videoId")
|
video_id = data.get("videoId")
|
||||||
|
|
||||||
print(f"Processando vídeo: {filename}")
|
print(f"Processando vídeo: {filename}")
|
||||||
|
|
||||||
processed_files = process_full_video(filename, times)
|
processed_files = process_full_video(filename, times)
|
||||||
@@ -65,45 +91,13 @@ def callback(ch, method, properties, body):
|
|||||||
"videoId": video_id if 'video_id' in locals() else None,
|
"videoId": video_id if 'video_id' in locals() else None,
|
||||||
"error": str(e),
|
"error": str(e),
|
||||||
}
|
}
|
||||||
|
|
||||||
print(f"Erro no processamento: {e}")
|
print(f"Erro no processamento: {e}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
publish_to_queue(payload)
|
publish_to_queue(payload)
|
||||||
|
|
||||||
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
|
print(f"Mensagem publicada na fila '{RABBITMQ_UPLOAD_QUEUE}'.")
|
||||||
except Exception as publish_err:
|
except Exception as publish_err:
|
||||||
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
|
print(f"Erro ao publicar na fila '{RABBITMQ_UPLOAD_QUEUE}': {publish_err}")
|
||||||
finally:
|
|
||||||
try:
|
|
||||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
||||||
except Exception as ack_err:
|
|
||||||
print(f"Erro ao dar ack: {ack_err}")
|
|
||||||
|
|
||||||
def main():
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
credentials = pika.PlainCredentials(RABBITMQ_USER, RABBITMQ_PASS)
|
|
||||||
parameters = pika.ConnectionParameters(
|
|
||||||
host=RABBITMQ_HOST,
|
|
||||||
port=RABBITMQ_PORT,
|
|
||||||
credentials=credentials,
|
|
||||||
heartbeat=60, # Usando heartbeat menor
|
|
||||||
blocked_connection_timeout=300
|
|
||||||
)
|
|
||||||
connection = pika.BlockingConnection(parameters)
|
|
||||||
channel = connection.channel()
|
|
||||||
channel.queue_declare(queue=RABBITMQ_QUEUE, durable=True)
|
|
||||||
channel.basic_qos(prefetch_count=1)
|
|
||||||
channel.basic_consume(queue=RABBITMQ_QUEUE, on_message_callback=callback)
|
|
||||||
print(' [*] Esperando mensagens. Para sair: CTRL+C')
|
|
||||||
channel.start_consuming()
|
|
||||||
except pika.exceptions.StreamLostError as e:
|
|
||||||
print(f"Conexão perdida: {e}. Reconectando em 5s...")
|
|
||||||
time.sleep(5)
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Erro inesperado: {e}. Reconectando em 5s...")
|
|
||||||
time.sleep(5)
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user