File size: 2,995 Bytes
bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d bb05158 9cc7f8d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 | from fastapi import FastAPI, HTTPException, UploadFile, File, BackgroundTasks, Form
from pydantic import BaseModel, Field
import os
from dotenv import load_dotenv
from src.graph import workflow
from src.embedding import upload_file
import shutil
from langgraph.checkpoint.postgres import PostgresSaver
load_dotenv()
app = FastAPI(
title="Enterprise PDF RAG API",
description="A production-grade backend powering an intelligent LangGraph agent.",
version="1.0.0"
)
class ChatRequest(BaseModel):
message: str = Field(
...,
description="The raw message string from the user."
)
user_id: str = Field(
...,
description="The unique identifier for the tenant context."
)
thread_id: str = Field(
...,
description="The unique session ID tracking the short-term chat history."
)
@app.post(
"/chat",
summary="Return an answer using the RAG backend to the user query."
)
async def chat_endpoint(request: ChatRequest):
try:
config = {
"configurable": {
"thread_id": request.thread_id
}
}
initial_state = {
"messages": [("user", request.message)],
"user_id": request.user_id
}
db_uri = os.getenv("DATABASE_URI")
with PostgresSaver.from_conn_string(db_uri) as checkpointer:
checkpointer.setup()
agent = workflow.compile(
checkpointer=checkpointer
)
result = agent.invoke(
initial_state,
config=config
)
output_messages = result.get("messages", [])
if not output_messages:
raise ValueError(
"No messages returned from the graph."
)
ai_response = output_messages[-1].content
return {
"status": "success",
"thread_id": request.thread_id,
"response": ai_response
}
except Exception as e:
print(f"Backend Error: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Agent Processing Error: {str(e)}"
)
UPLOAD_DIR = "data/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
@app.post(
"/upload",
summary="Upload a PDF and process its embeddings in the background"
)
async def upload_pdf(
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
user_id: str = Form(...)
):
local_file_path = os.path.join(
UPLOAD_DIR,
file.filename
)
with open(local_file_path, "wb") as buffer:
shutil.copyfileobj(
file.file,
buffer
)
background_tasks.add_task(
upload_file,
local_file_path,
user_id
)
return {
"status": "success",
"message": (
f"'{file.filename}' received successfully. "
"Ingestion pipeline started in the background."
)
} |