Spaces:
Running
Running
File size: 4,545 Bytes
e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c 13dae7e e27c97c e6b1ea5 e27c97c e6b1ea5 e27c97c e6b1ea5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | from fastapi import FastAPI, UploadFile, File, status
import os
from fastapi.exceptions import HTTPException
import shutil
from rag.smart_chunking import get_chunked_docs
from rag.chain import store_documents, load_documents, get_rag_chain
from langchain_huggingface import HuggingFaceEmbeddings
from datetime import datetime
from fastapi.middleware.cors import CORSMiddleware
from functools import lru_cache
from pathlib import Path
@lru_cache
def get_embeddings():
return HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
@lru_cache
def get_vectorstore():
return load_documents(embedding_model=get_embeddings())
BASE_DIR = Path("/app")
upload_dir = BASE_DIR / "uploads"
upload_dir.mkdir(parents=True, exist_ok=True)
app = FastAPI(
title="Multi_Rag_System_API",
description="This is Api for Multi Rag System",
version="V1"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Track system stats
system_stats = {
"total_uploads": 0,
"total_queries": 0,
"start_time": datetime.now().isoformat()
}
@app.on_event("startup")
def startup_event():
print("๐ Preloading embedding model...")
get_embeddings()
print("โ
Embedding model loaded")
# Info about API
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"message": "Multi-Modal RAG System API",
"version": "v1.0.0",
"endpoints": {
"health": "/health",
"upload": "/upload",
"query": "/query",
"stats": "/stats",
"docs": "/docs"
}
}
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
try:
# Check if upload directory exists
upload_dir_exists = upload_dir.exists()
# Count uploaded files
uploaded_files = len(list(upload_dir.glob("*.pdf"))) if upload_dir_exists else 0
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"upload_directory": upload_dir_exists,
"uploaded_documents": uploaded_files,
"embeddings_model": "sentence-transformers/all-MiniLM-L6-v2"
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=f"Health check failed: {str(e)}"
)
# Tracks the System_stats
@app.get("/stats")
async def get_stats():
"""Get system statistics"""
return {
"stats": system_stats,
"uploaded_documents": len(list(upload_dir.glob("*.pdf"))),
"current_time": datetime.now().isoformat()
}
# This Endpoint upload Pdf and store into VectorDatabase
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
try:
if not file.filename.endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are supported")
file_path = upload_dir / file.filename
with open(file_path, "wb") as f:
shutil.copyfileobj(file.file, f)
chunked_docs = get_chunked_docs(file_path)
if not chunked_docs:
raise HTTPException(status_code=500, detail="No content extracted from PDF")
store_documents(chunked_docs, get_embeddings())
system_stats["total_uploads"] += 1
return {
"message": "PDF uploaded and indexed successfully",
"chunks_created": len(chunked_docs)
}
except Exception as e:
print("โ UPLOAD ERROR:", str(e)) # <-- shows in HF logs
raise HTTPException(status_code=500, detail=str(e))
from pydantic import BaseModel
class QueryRequest(BaseModel):
input: str
# This Endpoint Load the VectorDataBase and answer the User question
@app.post("/query")
async def get_response(req: QueryRequest):
try:
vectorstore = get_vectorstore()
retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 3}
)
chain = get_rag_chain(retriever)
response = chain.invoke(req.input)
system_stats["total_queries"] += 1
return {
"question": req.input,
"response": response.content
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Query processing failed: {str(e)}"
)
|