aamirtaymoor commited on
Commit
adec1a1
·
verified ·
1 Parent(s): 007be26

Update main.py

Browse files
Files changed (1) hide show
  1. main.py +31 -31
main.py CHANGED
@@ -13,39 +13,39 @@ import json
13
 
14
  app = FastAPI()
15
 
16
- async def kafka_consumer():
17
- consumer_topic = 'hugging-face-raw-data-topic'
18
- producer_topic = 'hugging-face-processed-data-topic'
19
- bootstrap_server = '138.197.1.52:9092'
20
- consumer = KafkaConsumer(
21
- consumer_topic,
22
- group_id='hugging-face-group',
23
- bootstrap_servers=bootstrap_server,
24
- auto_offset_reset='earliest', max_poll_records=50,
25
- enable_auto_commit=False
26
- )
27
 
28
- try:
29
- for message in consumer:
30
- data_str = message.value.decode('utf-8')
31
- data = json.loads(data_str)
32
- obj_id = data.get('obj_id')
33
- raw_data = {"text":data.get('text', str()), 'star_rating':data.get('rating', 5), "skip":False}
34
- processed_data, has_sentiments = process_single_comment(raw_data)
35
- print(f"Processed message with id : {obj_id}")
36
- processed_data.pop('color_map', None)
37
- processed_data_dict = {"processed_data":processed_data, "has_sentiments":has_sentiments,"obj_id":obj_id}
38
- producer = KafkaProducer(bootstrap_servers=bootstrap_server)
39
- msg = json.dumps(processed_data_dict)
40
- producer.send(producer_topic, msg.encode('utf-8'))
41
- producer.flush()
42
- finally:
43
- consumer.close()
44
 
45
- @app.on_event("startup")
46
- async def startup_event():
47
- # Start the Kafka consumer on application startup
48
- asyncio.create_task(kafka_consumer())
49
 
50
  class TextRatingRequest(BaseModel):
51
  text: str
 
13
 
14
  app = FastAPI()
15
 
16
+ # async def kafka_consumer():
17
+ # consumer_topic = 'hugging-face-raw-data-topic'
18
+ # producer_topic = 'hugging-face-processed-data-topic'
19
+ # bootstrap_server = '138.197.1.52:9092'
20
+ # consumer = KafkaConsumer(
21
+ # consumer_topic,
22
+ # group_id='hugging-face-group',
23
+ # bootstrap_servers=bootstrap_server,
24
+ # auto_offset_reset='earliest', max_poll_records=50,
25
+ # enable_auto_commit=False
26
+ # )
27
 
28
+ # try:
29
+ # for message in consumer:
30
+ # data_str = message.value.decode('utf-8')
31
+ # data = json.loads(data_str)
32
+ # obj_id = data.get('obj_id')
33
+ # raw_data = {"text":data.get('text', str()), 'star_rating':data.get('rating', 5), "skip":False}
34
+ # processed_data, has_sentiments = process_single_comment(raw_data)
35
+ # print(f"Processed message with id : {obj_id}")
36
+ # processed_data.pop('color_map', None)
37
+ # processed_data_dict = {"processed_data":processed_data, "has_sentiments":has_sentiments,"obj_id":obj_id}
38
+ # producer = KafkaProducer(bootstrap_servers=bootstrap_server)
39
+ # msg = json.dumps(processed_data_dict)
40
+ # producer.send(producer_topic, msg.encode('utf-8'))
41
+ # producer.flush()
42
+ # finally:
43
+ # consumer.close()
44
 
45
+ # @app.on_event("startup")
46
+ # async def startup_event():
47
+ # # Start the Kafka consumer on application startup
48
+ # asyncio.create_task(kafka_consumer())
49
 
50
  class TextRatingRequest(BaseModel):
51
  text: str