Kushalguptaiitb commited on
Commit
dedcc8c
·
verified ·
1 Parent(s): b533173

Upload rabbit_mq_investor_report.py

Browse files
Files changed (1) hide show
  1. rabbit_mq_investor_report.py +141 -0
rabbit_mq_investor_report.py ADDED
@@ -0,0 +1,141 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import pika
3
+ import json
4
+ from datetime import datetime
5
+
6
+ import requests
7
+ import sys
8
+ sys.path.append("/shared_disk/kushal/db_str_chunking/new_ws_structured_code")
9
+ from db_structured_chunking.structure_chunking.API_backend.iqeq_app_latest import upload_documents
10
+ from db_structured_chunking.structure_chunking.API_backend.rabbitmq_config_investor_report import RABBITMQ
11
+
12
+ # RabbitMQ connection parameters
13
+ connection_params = pika.ConnectionParameters(
14
+ host=RABBITMQ["HOST"],
15
+ port=RABBITMQ["PORT"],
16
+ virtual_host=RABBITMQ["VIRTUAL_HOST"],
17
+ credentials=pika.PlainCredentials(
18
+ RABBITMQ["USERNAME"], RABBITMQ["PASSWORD"]
19
+ )
20
+ )
21
+
22
+ # Define the names of the queues
23
+ EXCHANGE_NAME = RABBITMQ["EXCHANGE_NAME"]
24
+ EXCHANGE_TYPE = RABBITMQ["EXCHANGE_TYPE"]
25
+
26
+ INPUT_FILE_QUEUE = RABBITMQ["QUEUES"]["INPUT_FILE_QUEUE"]
27
+ FILE_RESPONSE_QUEUE = RABBITMQ["QUEUES"]["FILE_RESPONSE_QUEUE"]
28
+
29
+ PRIORITY_ARGS = {'x-max-priority': 5}
30
+
31
+ def on_message(channel, method_frame, header_frame, body):
32
+ processingStartTime = datetime.now().isoformat()
33
+
34
+ msg = json.loads(body)
35
+ print(f"::::::::::::::::{msg}")
36
+ filePath = msg.get("path")
37
+ result = upload_documents(filePath)
38
+ response_json_path = result["saved_json_path"]
39
+
40
+ response_message = {
41
+
42
+ "documentId": msg["documentId"],
43
+
44
+ "documentType": msg["documentType"],
45
+
46
+ "documentSubType": msg["documentSubType"],
47
+
48
+ "status": 200,
49
+
50
+ "accuracy": "",
51
+
52
+ "queueInsertTime": msg["queueInsertTime"],
53
+
54
+ "processingStartTime": "",
55
+
56
+ "processingEndTime": "",
57
+
58
+ "manualQaRequired": "",
59
+
60
+ "errorMessages": None,
61
+
62
+ "result": {"path":response_json_path}
63
+
64
+ }
65
+
66
+ print(f"::::::::::::::::::::::::::::::::::\n{response_message}")
67
+
68
+ # Publish the response to the response queue
69
+ channel.basic_publish(
70
+ exchange=EXCHANGE_NAME,
71
+ routing_key=FILE_RESPONSE_QUEUE,
72
+ body=json.dumps(response_message)
73
+ )
74
+
75
+ # Acknowledge the original message
76
+ channel.basic_ack(delivery_tag=method_frame.delivery_tag)
77
+
78
+
79
+ def main():
80
+ # Establish connection to RabbitMQ
81
+ try:
82
+ connection = pika.BlockingConnection(connection_params)
83
+ channel = connection.channel()
84
+ print("Connection to RabbitMQ established successfully!")
85
+ except Exception as e:
86
+ print(f"Failed to connect to RabbitMQ:{e}")
87
+ return
88
+
89
+ channel.exchange_declare(
90
+ exchange=EXCHANGE_NAME,
91
+ exchange_type=EXCHANGE_TYPE,
92
+ durable=True
93
+ )
94
+
95
+ # Declare the queues
96
+ channel.queue_declare(
97
+ queue = INPUT_FILE_QUEUE,
98
+ durable=True,
99
+ arguments=PRIORITY_ARGS
100
+ )
101
+
102
+ channel.queue_declare(
103
+ queue=FILE_RESPONSE_QUEUE,
104
+ durable=True,
105
+ arguments=PRIORITY_ARGS
106
+ )
107
+ print(f"Queues '{INPUT_FILE_QUEUE}' and '{FILE_RESPONSE_QUEUE}' are ready.")
108
+
109
+ # Bind each queue to the exchange with a routing key matching the queue name
110
+ channel.queue_bind(
111
+ queue=INPUT_FILE_QUEUE,
112
+ exchange=EXCHANGE_NAME,
113
+ routing_key=INPUT_FILE_QUEUE
114
+ )
115
+ channel.queue_bind(
116
+ queue=FILE_RESPONSE_QUEUE,
117
+ exchange=EXCHANGE_NAME,
118
+ routing_key=FILE_RESPONSE_QUEUE
119
+ )
120
+
121
+ print(f"Exchange '{EXCHANGE_NAME}' declared and queues bound.")
122
+
123
+ # Start consuming messages from the request queue
124
+ channel.basic_consume(
125
+ queue=INPUT_FILE_QUEUE,
126
+ on_message_callback=on_message
127
+ )
128
+
129
+ print("Waiting for messages. To exit press CTRL+C")
130
+
131
+ try:
132
+ channel.start_consuming()
133
+ except KeyboardInterrupt:
134
+ channel.stop_consuming()
135
+ finally:
136
+ connection.close()
137
+ print("Connection closed.")
138
+
139
+
140
+ if __name__ == '__main__':
141
+ main()