EXAM_RAG_API / routes /assisstant_rag.py
MinaNasser's picture
1st
1bc3f18
from fastapi import APIRouter , UploadFile, File
from routes.schemas.Requests_Models import ChatRequest
from generation.AssistantRagGenerator import AssistantRagGen
from indexing.indexingController import IndexingController
from uuid import uuid4
from worker.tasks import process_file_task
from celery.result import AsyncResult
from celery_app import celery_app
assisstant_router = APIRouter(tags=["assistant_rag"])
@assisstant_router.get("/jobs/{job_id}")
def get_job_status(job_id: str):
result = AsyncResult(job_id, app=celery_app)
if result.state == "PENDING":
return {"job_id": job_id,"state": result.state,"message": "Job is waiting in queue",}
if result.state == "STARTED":
return {
"job_id": job_id,
"state": result.state,
"message": "Job is currently processing",
}
if result.state == "SUCCESS":
return {
"job_id": job_id,
"state": result.state,
"result": result.result,
}
if result.state == "FAILURE":
return {
"job_id": job_id,
"state": result.state,
"error": str(result.result),
}
return {
"job_id": job_id,
"state": result.state,
}
@assisstant_router.post("/process-file")
async def process_file_endpoint(course: str , username: str , file: UploadFile = File(...)):
job_id = uuid4().hex
temp_path = f"./temp_{job_id}_{file.filename}"
with open(temp_path, "wb") as f:
f.write(await file.read())
task = process_file_task.delay(temp_path, file.filename, username, course)
return {
"job_id": task.id,
"filename": file.filename,
"status": "queued",
}
@assisstant_router.post("/chat/complete")
async def chat_complete_endpoint(request: ChatRequest):
indexing_controller = IndexingController()
rag_gen = AssistantRagGen()
user_query = request.prompt if request.prompt else "no question provided"
route = rag_gen.robust_router({"question": user_query})
results = []
context_text = ""
filters = []
# Kda Kda pdf :)
if request.source_file or request.bookmark:
if request.bookmark and not request.source_file:
request.bookmark=None
route = "pdf_query"
if route == "user_info":
if request.role == "instructor" or request.role == "admin":
context_text = (
f"User Profile Info: {request.user_info.model_dump()}\n"
f"Role: {request.role}\n"
f"Username: {request.username}"
)
elif request.role == "student":
request.user_info=request.user_info.copy(update={"instructor_owned_files": None})
context_text = (
f"User Profile Info: {request.user_info.model_dump()}\n"
f"Role: {request.role}\n"
f"Username: {request.username}"
)
elif route == "site_query":
filters = [
{"field": "course", "op": "eq", "value": "Instructions", "clause": "must"},
{"field": "username", "op": "eq", "value": "ADMIN", "clause": "must"}
]
embedding = indexing_controller.embedder.embed_text(user_query)
results = indexing_controller.vector_store.query_qdrant(
filters=filters,
embedding=embedding,
top_k=request.top_k
)
elif route == "pdf_query":
if request.role == "student":
enrolled = request.user_info.courses or []
print(f"[DEBUG] Student {request.username} is enrolled in courses: {enrolled}")
filters.append({"field": "course", "op": "in", "value": enrolled, "clause": "must"})
elif request.role == "instructor":
owned = request.user_info.courses
# if owned == []:
# owned = indexing_controller.vector_store.all_user_files_bookmarks(request.username)
# owned = owned.keys()
print(f"[DEBUG] Instructor {request.username} owns courses/files: {owned}")
filters.append({"field": "course", "op": "in", "value": owned, "clause": "must"})
if request.source_file:
filters.append({"field": "source", "op": "eq", "value": request.source_file, "clause": "must"})
if request.bookmark:
filters.append({"field": "bookmark_path", "op": "text", "value": request.bookmark, "clause": "must"})
embedding = indexing_controller.embedder.embed_text(user_query)
results = indexing_controller.vector_store.query_qdrant(
filters=filters,
embedding=embedding,
top_k=request.top_k
)
if not context_text and results:
context_text = "\n\n".join([r["content"] for r in results if r.get("content")])
history_str = "\n".join(
f"Human: {turn.Human_msg}\nAssistant: {turn.LLM_response}"
for turn in request.history
) if request.history else "None"
if route == "user_info":
final_prompt = rag_gen.build_user_info_prompt(
question=user_query,
conversation_history=history_str,
User_Info=str(request.user_info.model_dump()),
)
elif route == "site_query":
final_prompt = rag_gen.build_site_query_prompt(
question=user_query,
context=context_text,
conversation_history=history_str
)
else:
final_prompt = rag_gen.build_unified_prompt(
context=context_text,
question=user_query,
conversation_history=history_str,
User_Info=str(request.user_info.model_dump()),
)
llm_response = rag_gen.generator.generate_text(prompt=final_prompt)
return {
"session_id": request.session_id, # Return as is
"route": route,
"query": user_query,
"history": request.history, # Return as is
"results": results,
"LLM_answer": llm_response,
}