CodeBuddyAI / app.py
TahaFawzyElshrif
debug end
116d809
from urllib import request
from fastapi import FastAPI
from fastapi.responses import JSONResponse
import sentry_sdk
import uvicorn
import sys
import os
import json
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))
from agent.agent_graph.StateTasks import ProblemState
import subprocess
from Queue_Producer import send_message
import redis
from utils import RequestModel, RequestAnswer
from Consumer import redis_send
import psutil
##################################################
# VARIABLES
##################################################
redis_host = os.environ["REDIS_HOST"]
redis_port = os.environ["REDIS_PORT"]
redis_password = os.environ["REDIS_PASSWORD"]
##################################################
# START CONSUMERS in a separate process
##################################################
for i in range(1,4): # Start 3 consumers
subprocess.Popen(['python','-u','Consumer.py', '--id', str(i)])
##################################################
# START API and METHODS
##################################################
# Create Redis connection (global to make the get very light)
redis_conn = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=True,
username="default",
password=redis_password,
)
# model and rag are not global for better security ,at least for this version
# Create app instance
app = FastAPI()
# Create Sentry Monitoring for better error tracking and performance monitoring
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
# see https://docs.sentry.io/platforms/python/data-management/data-collected/ for more info
send_default_pii=True,
)
app = FastAPI()
#redis_send(request.user_id,request.msg_id,{"status": "pending"}) # for performance test
print("Starting API Server...")
##################################################
# ROUTES
##################################################
@app.get("/")
def read_root():
return {"message": "Hello From CodeBuddyAI!"}
@app.get("/metrics")
def metrics():
return {
"cpu": psutil.cpu_percent(),
"ram": psutil.virtual_memory().percent
}
@app.api_route("/health", methods=["GET", "HEAD", "POST", "OPTIONS"])
def get_health():
return JSONResponse({"status": "ok"})
@app.post("/Message/Send/")
def call(request: RequestModel):
redis_send(request.user_id,request.msg_id,{"status": "pending"})
return send_message(json.dumps(request.model_dump()))
@app.post("/Message/Answer/")
def call(request: RequestAnswer):
## MUST BE LIGHTWEIGHT, JUST CHECK IF ANSWER IS READY IN REDIS, IF YES RETURN IT, ELSE RETURN PENDING
try:
answer = redis_conn.get(f'ANSWER_FOR_USER_ID{request.user_id}_OF_{request.msg_id}')
if answer is None:
return {"status": "error"}
elif "status" in answer and json.loads(answer)["status"] == "pending":
return {"status": "pending"}
else:
redis_conn.delete(f'ANSWER_FOR_USER_ID{request.user_id}_OF_{request.msg_id}') # Clean up after fetching for memory and better secure as double call is wrong
return {"status": "ready", "data": json.loads(answer)}
except Exception as e:
print(f"Error fetching answer from Redis: {e}")
return {"status": "error", "message": str(e)}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)