Spaces:
Paused
Paused
| from fastapi import APIRouter , UploadFile, File | |
| from routes.schemas.Requests_Models import ChatRequest | |
| from generation.AssistantRagGenerator import AssistantRagGen | |
| from indexing.indexingController import IndexingController | |
| from uuid import uuid4 | |
| from worker.tasks import process_file_task | |
| from celery.result import AsyncResult | |
| from celery_app import celery_app | |
| assisstant_router = APIRouter(tags=["assistant_rag"]) | |
| def get_job_status(job_id: str): | |
| result = AsyncResult(job_id, app=celery_app) | |
| if result.state == "PENDING": | |
| return {"job_id": job_id,"state": result.state,"message": "Job is waiting in queue",} | |
| if result.state == "STARTED": | |
| return { | |
| "job_id": job_id, | |
| "state": result.state, | |
| "message": "Job is currently processing", | |
| } | |
| if result.state == "SUCCESS": | |
| return { | |
| "job_id": job_id, | |
| "state": result.state, | |
| "result": result.result, | |
| } | |
| if result.state == "FAILURE": | |
| return { | |
| "job_id": job_id, | |
| "state": result.state, | |
| "error": str(result.result), | |
| } | |
| return { | |
| "job_id": job_id, | |
| "state": result.state, | |
| } | |
| async def process_file_endpoint(course: str , username: str , file: UploadFile = File(...)): | |
| job_id = uuid4().hex | |
| temp_path = f"./temp_{job_id}_{file.filename}" | |
| with open(temp_path, "wb") as f: | |
| f.write(await file.read()) | |
| task = process_file_task.delay(temp_path, file.filename, username, course) | |
| return { | |
| "job_id": task.id, | |
| "filename": file.filename, | |
| "status": "queued", | |
| } | |
| async def chat_complete_endpoint(request: ChatRequest): | |
| indexing_controller = IndexingController() | |
| rag_gen = AssistantRagGen() | |
| user_query = request.prompt if request.prompt else "no question provided" | |
| route = rag_gen.robust_router({"question": user_query}) | |
| results = [] | |
| context_text = "" | |
| filters = [] | |
| # Kda Kda pdf :) | |
| if request.source_file or request.bookmark: | |
| if request.bookmark and not request.source_file: | |
| request.bookmark=None | |
| route = "pdf_query" | |
| if route == "user_info": | |
| if request.role == "instructor" or request.role == "admin": | |
| context_text = ( | |
| f"User Profile Info: {request.user_info.model_dump()}\n" | |
| f"Role: {request.role}\n" | |
| f"Username: {request.username}" | |
| ) | |
| elif request.role == "student": | |
| request.user_info=request.user_info.copy(update={"instructor_owned_files": None}) | |
| context_text = ( | |
| f"User Profile Info: {request.user_info.model_dump()}\n" | |
| f"Role: {request.role}\n" | |
| f"Username: {request.username}" | |
| ) | |
| elif route == "site_query": | |
| filters = [ | |
| {"field": "course", "op": "eq", "value": "Instructions", "clause": "must"}, | |
| {"field": "username", "op": "eq", "value": "ADMIN", "clause": "must"} | |
| ] | |
| embedding = indexing_controller.embedder.embed_text(user_query) | |
| results = indexing_controller.vector_store.query_qdrant( | |
| filters=filters, | |
| embedding=embedding, | |
| top_k=request.top_k | |
| ) | |
| elif route == "pdf_query": | |
| if request.role == "student": | |
| enrolled = request.user_info.courses or [] | |
| print(f"[DEBUG] Student {request.username} is enrolled in courses: {enrolled}") | |
| filters.append({"field": "course", "op": "in", "value": enrolled, "clause": "must"}) | |
| elif request.role == "instructor": | |
| owned = request.user_info.courses | |
| # if owned == []: | |
| # owned = indexing_controller.vector_store.all_user_files_bookmarks(request.username) | |
| # owned = owned.keys() | |
| print(f"[DEBUG] Instructor {request.username} owns courses/files: {owned}") | |
| filters.append({"field": "course", "op": "in", "value": owned, "clause": "must"}) | |
| if request.source_file: | |
| filters.append({"field": "source", "op": "eq", "value": request.source_file, "clause": "must"}) | |
| if request.bookmark: | |
| filters.append({"field": "bookmark_path", "op": "text", "value": request.bookmark, "clause": "must"}) | |
| embedding = indexing_controller.embedder.embed_text(user_query) | |
| results = indexing_controller.vector_store.query_qdrant( | |
| filters=filters, | |
| embedding=embedding, | |
| top_k=request.top_k | |
| ) | |
| if not context_text and results: | |
| context_text = "\n\n".join([r["content"] for r in results if r.get("content")]) | |
| history_str = "\n".join( | |
| f"Human: {turn.Human_msg}\nAssistant: {turn.LLM_response}" | |
| for turn in request.history | |
| ) if request.history else "None" | |
| if route == "user_info": | |
| final_prompt = rag_gen.build_user_info_prompt( | |
| question=user_query, | |
| conversation_history=history_str, | |
| User_Info=str(request.user_info.model_dump()), | |
| ) | |
| elif route == "site_query": | |
| final_prompt = rag_gen.build_site_query_prompt( | |
| question=user_query, | |
| context=context_text, | |
| conversation_history=history_str | |
| ) | |
| else: | |
| final_prompt = rag_gen.build_unified_prompt( | |
| context=context_text, | |
| question=user_query, | |
| conversation_history=history_str, | |
| User_Info=str(request.user_info.model_dump()), | |
| ) | |
| llm_response = rag_gen.generator.generate_text(prompt=final_prompt) | |
| return { | |
| "session_id": request.session_id, # Return as is | |
| "route": route, | |
| "query": user_query, | |
| "history": request.history, # Return as is | |
| "results": results, | |
| "LLM_answer": llm_response, | |
| } | |