Spaces:
Sleeping
Sleeping
Update main.py
Browse files
main.py
CHANGED
|
@@ -8,6 +8,7 @@ from typing import List, Dict, Union
|
|
| 8 |
import time
|
| 9 |
from kafka import KafkaConsumer, KafkaProducer
|
| 10 |
import asyncio
|
|
|
|
| 11 |
|
| 12 |
|
| 13 |
app = FastAPI()
|
|
@@ -31,11 +32,12 @@ async def kafka_consumer():
|
|
| 31 |
obj_id = data.get('obj_id')
|
| 32 |
raw_data = {"text":data.get('text', str()), 'star_rating':data.get('rating', 5), "skip":False}
|
| 33 |
processed_data, has_sentiments = process_single_comment(raw_data)
|
|
|
|
| 34 |
processed_data.pop('color_map', None)
|
| 35 |
processed_data_dict = {"processed_data":processed_data, "has_sentiments":has_sentiments,"obj_id":obj_id}
|
| 36 |
producer = KafkaProducer(bootstrap_servers=bootstrap_server)
|
| 37 |
msg = json.dumps(processed_data_dict)
|
| 38 |
-
producer.send(
|
| 39 |
producer.flush()
|
| 40 |
finally:
|
| 41 |
consumer.close()
|
|
|
|
| 8 |
import time
|
| 9 |
from kafka import KafkaConsumer, KafkaProducer
|
| 10 |
import asyncio
|
| 11 |
+
import json
|
| 12 |
|
| 13 |
|
| 14 |
app = FastAPI()
|
|
|
|
| 32 |
obj_id = data.get('obj_id')
|
| 33 |
raw_data = {"text":data.get('text', str()), 'star_rating':data.get('rating', 5), "skip":False}
|
| 34 |
processed_data, has_sentiments = process_single_comment(raw_data)
|
| 35 |
+
print(f"Processed message with id : {obj_id}")
|
| 36 |
processed_data.pop('color_map', None)
|
| 37 |
processed_data_dict = {"processed_data":processed_data, "has_sentiments":has_sentiments,"obj_id":obj_id}
|
| 38 |
producer = KafkaProducer(bootstrap_servers=bootstrap_server)
|
| 39 |
msg = json.dumps(processed_data_dict)
|
| 40 |
+
producer.send(producer_topic, msg.encode('utf-8'))
|
| 41 |
producer.flush()
|
| 42 |
finally:
|
| 43 |
consumer.close()
|