fix RabbitMQ
Browse files- .env +6 -0
- __pycache__/inference_svm_model.cpython-310.pyc +0 -0
- __pycache__/mineru_single.cpython-310.pyc +0 -0
- __pycache__/table_row_extraction.cpython-310.pyc +0 -0
- __pycache__/topic_extraction.cpython-310.pyc +0 -0
- __pycache__/worker.cpython-310.pyc +0 -0
- test_listener.py +33 -0
- topic_extraction.log +0 -0
- worker.py +17 -4
.env
ADDED
|
@@ -0,0 +1,6 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
GEMINI_API_KEY=AIzaSyDtoakpXa2pjJwcQB6TJ5QaXHNSA5JxcrU
|
| 2 |
+
RABBITMQ_URL=amqp://pP4gN4GdD3PiUkQQ:2~i4LeUq8XZayGQc1z8awLcLu_MsJgQp@maglev.proxy.rlwy.net:57635
|
| 3 |
+
AWS_REGION=eu-west-2
|
| 4 |
+
AWS_RESOURCES_NAME=quextro-resources
|
| 5 |
+
AWS_ACCESS_KEY=AKIAXNGUVKHXIIUQZ3OE
|
| 6 |
+
AWS_SECRET_KEY=avg33Z5g8pXODhvDb5d1zSegToN+qN69vF4Z8m4C
|
__pycache__/inference_svm_model.cpython-310.pyc
CHANGED
|
Binary files a/__pycache__/inference_svm_model.cpython-310.pyc and b/__pycache__/inference_svm_model.cpython-310.pyc differ
|
|
|
__pycache__/mineru_single.cpython-310.pyc
CHANGED
|
Binary files a/__pycache__/mineru_single.cpython-310.pyc and b/__pycache__/mineru_single.cpython-310.pyc differ
|
|
|
__pycache__/table_row_extraction.cpython-310.pyc
CHANGED
|
Binary files a/__pycache__/table_row_extraction.cpython-310.pyc and b/__pycache__/table_row_extraction.cpython-310.pyc differ
|
|
|
__pycache__/topic_extraction.cpython-310.pyc
ADDED
|
Binary file (23.3 kB). View file
|
|
|
__pycache__/worker.cpython-310.pyc
CHANGED
|
Binary files a/__pycache__/worker.cpython-310.pyc and b/__pycache__/worker.cpython-310.pyc differ
|
|
|
test_listener.py
ADDED
|
@@ -0,0 +1,33 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import pika
|
| 2 |
+
import os
|
| 3 |
+
import dotenv
|
| 4 |
+
import time
|
| 5 |
+
dotenv.load_dotenv()
|
| 6 |
+
|
| 7 |
+
params = pika.URLParameters(os.getenv("RABBITMQ_URL"))
|
| 8 |
+
params.heartbeat = 5
|
| 9 |
+
params.blocked_connection_timeout = 2
|
| 10 |
+
params.connection_attempts = 3
|
| 11 |
+
params.retry_delay = 5
|
| 12 |
+
|
| 13 |
+
connection = pika.BlockingConnection(params)
|
| 14 |
+
|
| 15 |
+
channel = connection.channel()
|
| 16 |
+
|
| 17 |
+
channel.queue_declare(queue="web_server", durable=True)
|
| 18 |
+
|
| 19 |
+
def callback(ch, method, properties, body):
|
| 20 |
+
try:
|
| 21 |
+
print(f"Received message: {body}")
|
| 22 |
+
print(f"Properties: {properties}")
|
| 23 |
+
print(f"Method: {method}")
|
| 24 |
+
print(f"Channel: {ch}")
|
| 25 |
+
time.sleep(10)
|
| 26 |
+
except Exception as e:
|
| 27 |
+
print(f"Error: {e}")
|
| 28 |
+
|
| 29 |
+
|
| 30 |
+
channel.basic_consume(queue="web_server", on_message_callback=callback, auto_ack=True)
|
| 31 |
+
|
| 32 |
+
print("Waiting for messages...")
|
| 33 |
+
channel.start_consuming()
|
topic_extraction.log
ADDED
|
File without changes
|
worker.py
CHANGED
|
@@ -10,6 +10,8 @@ from typing import Tuple, Dict, Any
|
|
| 10 |
|
| 11 |
from mineru_single import Processor
|
| 12 |
|
|
|
|
|
|
|
| 13 |
import logging
|
| 14 |
|
| 15 |
logging.basicConfig(
|
|
@@ -25,6 +27,11 @@ class RabbitMQWorker:
|
|
| 25 |
logger.info("Initializing RabbitMQWorker")
|
| 26 |
self.processor = Processor()
|
| 27 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 28 |
self.publisher_connection = None
|
| 29 |
self.publisher_channel = None
|
| 30 |
|
|
@@ -124,10 +131,16 @@ class RabbitMQWorker:
|
|
| 124 |
|
| 125 |
elif pattern == "topic_extraction":
|
| 126 |
data = body_dict.get("data")
|
| 127 |
-
|
| 128 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 129 |
try:
|
| 130 |
-
topics_markdown = topic_processor.process(
|
| 131 |
data["topics_markdown"] = topics_markdown
|
| 132 |
body_dict["pattern"] = "topic_extraction_update_from_gpu_server"
|
| 133 |
body_dict["data"] = data
|
|
@@ -136,7 +149,7 @@ class RabbitMQWorker:
|
|
| 136 |
else:
|
| 137 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 138 |
except Exception as e:
|
| 139 |
-
logger.error(
|
| 140 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 141 |
|
| 142 |
else:
|
|
|
|
| 10 |
|
| 11 |
from mineru_single import Processor
|
| 12 |
|
| 13 |
+
from topic_extraction import MineruNoTextProcessor
|
| 14 |
+
|
| 15 |
import logging
|
| 16 |
|
| 17 |
logging.basicConfig(
|
|
|
|
| 27 |
logger.info("Initializing RabbitMQWorker")
|
| 28 |
self.processor = Processor()
|
| 29 |
|
| 30 |
+
self.topic_processor = MineruNoTextProcessor(
|
| 31 |
+
output_folder="/tmp/topic_extraction_outputs",
|
| 32 |
+
gemini_api_key=os.getenv("GEMINI_API_KEY")
|
| 33 |
+
)
|
| 34 |
+
|
| 35 |
self.publisher_connection = None
|
| 36 |
self.publisher_channel = None
|
| 37 |
|
|
|
|
| 131 |
|
| 132 |
elif pattern == "topic_extraction":
|
| 133 |
data = body_dict.get("data")
|
| 134 |
+
input_files = data.get("input_files")
|
| 135 |
+
if not input_files or not isinstance(input_files, list):
|
| 136 |
+
logger.error("[Worker %s] No input files provided for topic extraction.", thread_id)
|
| 137 |
+
ch.basic_ack(delivery_tag=method.delivery_tag)
|
| 138 |
+
return
|
| 139 |
+
# Use the first file's URL for topic extraction
|
| 140 |
+
pdf_url = input_files[0].get("url")
|
| 141 |
+
logger.info("[Worker %s] Processing topic extraction for URL: %s", thread_id, pdf_url)
|
| 142 |
try:
|
| 143 |
+
topics_markdown = self.topic_processor.process(pdf_url)
|
| 144 |
data["topics_markdown"] = topics_markdown
|
| 145 |
body_dict["pattern"] = "topic_extraction_update_from_gpu_server"
|
| 146 |
body_dict["data"] = data
|
|
|
|
| 149 |
else:
|
| 150 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 151 |
except Exception as e:
|
| 152 |
+
logger.error("Error processing topic extraction: %s", e)
|
| 153 |
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
|
| 154 |
|
| 155 |
else:
|