| | from fastapi import FastAPI, UploadFile, Request, HTTPException,BackgroundTasks, File, Depends
|
| | from fastapi.responses import JSONResponse
|
| | from fastapi.middleware.cors import CORSMiddleware
|
| | from contextlib import asynccontextmanager
|
| | from functools import lru_cache
|
| | from langchain_huggingface.embeddings import HuggingFaceEmbeddings
|
| | import shutil
|
| | from app.services.document_ingester import Ingester
|
| | from app.services.retriever import Retriever
|
| | from app.services.generation import Generation
|
| | from app.services.database import Database
|
| | from pydantic import BaseModel
|
| | from werkzeug.utils import secure_filename
|
| | from dotenv import load_dotenv
|
| | import os
|
| | import time
|
| | import logging
|
| | import json
|
| |
|
| |
|
| | class JSONFormatter(logging.Formatter):
|
| | def format(self, record):
|
| | log_record = {
|
| | "timestamp": self.formatTime(record, self.datefmt),
|
| | "level": record.levelname,
|
| | "message": record.getMessage(),
|
| | "module": record.module,
|
| | "function": record.funcName,
|
| | }
|
| | if hasattr(record, "extra"):
|
| | log_record.update(record.extra)
|
| | if record.exc_info:
|
| | log_record["exception"] = self.formatException(record.exc_info)
|
| | return json.dumps(log_record)
|
| |
|
| | logger = logging.getLogger("app")
|
| | logger.setLevel(logging.INFO)
|
| | handler = logging.StreamHandler()
|
| | handler.setFormatter(JSONFormatter())
|
| | logger.addHandler(handler)
|
| |
|
| | database = Database()
|
| | @asynccontextmanager
|
| | async def lifespan(app: FastAPI):
|
| | logger.info("Starting up the application...")
|
| | logger.info("Database connection established.")
|
| | load_dotenv()
|
| | ingest_uploaded_docs()
|
| | yield
|
| | database.disconnect()
|
| | logger.info("Database connection closed. Application shutdown complete.")
|
| | logger.info("Application has been stopped.")
|
| |
|
| |
|
| | app = FastAPI(lifespan=lifespan)
|
| |
|
| |
|
| | app.add_middleware(
|
| | CORSMiddleware,
|
| | allow_origins=["*"],
|
| | allow_credentials=True,
|
| | allow_methods=["*"],
|
| | allow_headers=["*"],
|
| | )
|
| |
|
| |
|
| | @app.middleware("http")
|
| | async def log_requests(request: Request, call_next):
|
| | start_time = time.perf_counter()
|
| | response = await call_next(request)
|
| | process_time = (time.perf_counter() - start_time) * 1000
|
| | logger.info(
|
| | f"{request.method} {request.url.path}",
|
| | extra={"extra": {"method": request.method, "path": request.url.path, "status_code": response.status_code, "duration_ms": round(process_time, 2)}}
|
| | )
|
| | return response
|
| |
|
| |
|
| | @app.exception_handler(Exception)
|
| | async def global_exception_handler(request: Request, exc: Exception):
|
| | logger.error(f"Unhandled exception: {exc}", exc_info=True)
|
| | return JSONResponse(
|
| | status_code=500,
|
| | content={"message": "An unexpected internal server error occurred."}
|
| | )
|
| |
|
| | @app.exception_handler(HTTPException)
|
| | async def http_exception_handler(request: Request, exc: HTTPException):
|
| | return JSONResponse(status_code=exc.status_code, content={"message": exc.detail})
|
| |
|
| |
|
| | embed_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
|
| |
|
| | @lru_cache()
|
| | def get_ingester():
|
| | return Ingester(embedding_model=embed_model)
|
| |
|
| | @lru_cache()
|
| | def get_retriever():
|
| | return Retriever(embedding_model=embed_model)
|
| |
|
| | @lru_cache()
|
| | def get_generator():
|
| | return Generation()
|
| |
|
| |
|
| | def ingest_documents(path:str):
|
| | ingester=get_ingester()
|
| | logger.info(f"Starting document ingestion for {path}", extra={"extra": {"document_path": path}})
|
| | ingester.ingest_documents(path)
|
| | logger.info(f"Document ingestion completed for {path}", extra={"extra": {"document_path": path}})
|
| | database.update_document_status(path, "ingested")
|
| | logger.info(f"Document status updated to 'ingested' for {path}", extra={"extra": {"document_path": path}})
|
| |
|
| | def ingest_uploaded_docs():
|
| | to_be_ingested = database.list_documents()
|
| | for doc in to_be_ingested:
|
| | if doc[1] == "uploaded":
|
| | ingest_documents(doc[3])
|
| | logger.info(f"Background ingestion completed for {doc[3]}", extra={"extra": {"document_path": doc[3]}})
|
| |
|
| |
|
| | @app.get("/")
|
| | async def health_check():
|
| | return {"status": "ok"}
|
| |
|
| | @app.post("/document")
|
| | async def upload_file(background_tasks: BackgroundTasks,file:UploadFile=File(...) ):
|
| | base_dir = os.getenv("HF_HOME", "/home/user/app")
|
| | upload_dir = os.path.join(base_dir, "data")
|
| | os.makedirs(upload_dir, exist_ok=True)
|
| |
|
| | safe_filename = secure_filename(file.filename)
|
| | file_path = os.path.join(upload_dir, f"{os.path.splitext(safe_filename)[0]}_{int(time.time())}{os.path.splitext(safe_filename)[1]}")
|
| |
|
| | with open(file_path, "wb") as buffer:
|
| | shutil.copyfileobj(file.file, buffer)
|
| | database.add_document(filename=safe_filename, path=file_path)
|
| | logger.info(f"Uploading file: {file.filename}", extra={"extra": {"original_filename": file.filename, "safe_path": file_path}})
|
| | background_tasks.add_task(ingest_documents, path=file_path)
|
| | return {"filename": file.filename, "message": "File uploaded successfully."}
|
| |
|
| | @app.get("/documents")
|
| | def list_documents():
|
| | documents = database.list_documents()
|
| | logger.info("Fetched document list", extra={"extra": {"document_count": len(documents)}})
|
| | return {"documents": documents}
|
| |
|
| | class DeleteRequest(BaseModel):
|
| | source: str
|
| |
|
| | @app.delete("/document")
|
| | def clear_document(payload: DeleteRequest, ingester: Ingester = Depends(get_ingester)):
|
| | logger.info(f"Deleting document: {payload.source}")
|
| | message = ingester.delete_document(payload.source)
|
| | logger.info(f"Vector deletion completed for: {payload.source,message}")
|
| | db_msg=database.delete_document(payload.source)
|
| | logger.info(f"Document deletion completed for: {payload.source}")
|
| | return {"message": message, "db_msg": db_msg}
|
| |
|
| | class ChatRequest(BaseModel):
|
| | question: str
|
| | history: str
|
| |
|
| | @app.post("/chat")
|
| | async def chat_endpoint(request: ChatRequest, retriever: Retriever = Depends(get_retriever), generator: Generation = Depends(get_generator)):
|
| | logger.info(f"Chat request received", extra={"extra": {"question_length": len(request.question)}})
|
| | context = retriever.retrieve_context(request.question)
|
| | response = generator.generate_response(request.question, context, request.history)
|
| | return {"response": response}
|
| |
|