CodeBuddyAI / Consumer.py
TahaFawzyElshrif
debug end
116d809
# Consumer
import time
import pika
import os
from Server import get_response
import json
from agent.agent_graph.StateTasks import ProblemState
import argparse
import redis
from encryption_utils import decrypt_token_from_json
##################################################
# VARIABLES
##################################################
# args for this file
argparse_model = argparse.ArgumentParser()
argparse_model.add_argument("--id", type=int, default=0, help="Consumer ID")
consumer_id = argparse_model.parse_args().id
RABBITMQ_URL = os.environ["RABBITMQ_URL"]
QUEUE_NAME = os.environ["QUEUE_NAME"]
redis_host = os.environ["REDIS_HOST"]
redis_port = os.environ["REDIS_PORT"]
redis_password = os.environ["REDIS_PASSWORD"]
##################################################
# PROCESSING METHODS
##################################################
def redis_send(user_id,msg_id,answer):
r = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True,
username="default",
password=redis_password,
)
success = r.set(f'ANSWER_FOR_USER_ID{user_id}_OF_{msg_id}',json.dumps(answer))
return success
def model_call(request, token):
# تأكد إن request dict
if isinstance(request, str):
request = json.loads(request)
# fill with last state
try:
state = json.loads(request.get('last_state', "")) if request.get('last_state') else {}
except Exception:
state = {}
# fallback لو مفيش state
if not state:
state = {
"question": request.get('prompt', ""),
"memory": request.get('memory', [])
}
answer = get_response(
request.get('prompt', ""),
request.get('memory', []),
token,
state,
request.get('user_email', ""),
request.get('user_name', "")
)
# drop unserializable keys
for k in ["llm", "rag_model"]:
if k in answer:
answer[k] = ""
return answer
def process_message(recieved_msg):
# decrypt token
token = decrypt_token_from_json(json.loads(recieved_msg['ht_token_encrypted_dumped']))
# call the model
model_answer = model_call(recieved_msg,token)
# send answer to redis
user_id = recieved_msg["user_id"]
msg_id = recieved_msg["msg_id"]
redis_send_res = redis_send(user_id,msg_id,model_answer)
print({"STATUS": redis_send_res , "CONSUMER": {consumer_id}}) # add monitoring but still hide user data
##################################################
# CONSUMER METHODS
##################################################
def get_connection():
params = pika.URLParameters(RABBITMQ_URL)
return pika.BlockingConnection(params)
def callback(ch, method, properties, body):
##### Recieve message and process it
recieved_msg = json.loads(body.decode())
print("-------------------------------------------------")
print(f"MSG AT CONSUMER {consumer_id}" )
##### Process Message
process_message(recieved_msg)
###### Finalize
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumer():
# when scalled each server has consumer
params = pika.URLParameters(RABBITMQ_URL)
connection = pika.BlockingConnection(params)
channel = connection.channel()
channel.queue_declare(queue=QUEUE_NAME, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
queue=QUEUE_NAME,
on_message_callback=callback
)
print("Waiting for messages...")
channel.start_consuming()
##################################################
# MAIN
##################################################
if __name__ == "__main__":
print(f"Starting New Consumer {consumer_id}...")
start_consumer()