Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| from pathlib import Path | |
| from fastapi import FastAPI, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse, FileResponse | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from langchain_community.document_loaders import PyMuPDFLoader | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from llama_api import ask_ollama | |
| load_dotenv() | |
| BASE_DIR = Path(__file__).resolve().parent | |
| PDF_PATH = BASE_DIR / "src/data" | |
| DB_DIR = BASE_DIR / "chroma_digital_icfai" | |
| HF_DATASET = "Chaitu2112/ifhe-assets" | |
| EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2" | |
| PDF_PATH.mkdir(parents=True, exist_ok=True) | |
| # ----------------------------------------------------------- | |
| # DOWNLOAD PDFs FROM HUGGINGFACE | |
| # ----------------------------------------------------------- | |
| def download_pdfs(repo_id: str, local_dir: Path): | |
| api = HfApi() | |
| try: | |
| files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") | |
| except Exception as e: | |
| print(f"β οΈ Cannot list dataset files: {e}") | |
| return | |
| pdfs = [f for f in files if f.lower().endswith(".pdf")] | |
| if not pdfs: | |
| print("β οΈ No PDFs found in dataset.") | |
| return | |
| for f in pdfs: | |
| local_path = local_dir / os.path.basename(f) | |
| if local_path.exists() and local_path.stat().st_size > 0: | |
| continue | |
| try: | |
| print(f"π₯ Downloading: {f}") | |
| cached = hf_hub_download(repo_id=repo_id, filename=f, repo_type="dataset") | |
| shutil.copy(cached, local_path) | |
| print(f"β Saved to {local_path}") | |
| except Exception as e: | |
| print(f"β οΈ Download failed for {f}: {e}") | |
| download_pdfs(HF_DATASET, PDF_PATH) | |
| # ----------------------------------------------------------- | |
| # BUILD CHROMA VECTOR DB | |
| # ----------------------------------------------------------- | |
| def build_chroma_db() -> Chroma: | |
| print("πΉ Building vector DB from PDFs.") | |
| documents = [] | |
| for filename in os.listdir(PDF_PATH): | |
| if filename.lower().endswith(".pdf"): | |
| path = PDF_PATH / filename | |
| print(f" Loading PDF: {path}") | |
| loader = PyMuPDFLoader(str(path)) | |
| documents.extend(loader.load()) | |
| if not documents: | |
| print(" No PDFs found. Creating empty DB.") | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL_NAME) | |
| return Chroma.from_texts( | |
| texts=["No content available"], | |
| embedding=embeddings, | |
| persist_directory=str(DB_DIR), | |
| ) | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=150, | |
| separators=["\n\n", "\n", ".", "!", "?", " "], | |
| ) | |
| docs = splitter.split_documents(documents) | |
| print(f"πΉ Total chunks after splitting: {len(docs)}") | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL_NAME) | |
| vectordb = Chroma.from_documents( | |
| docs, | |
| embedding=embeddings, | |
| persist_directory=str(DB_DIR), | |
| ) | |
| print(" Vector DB built and persisted.") | |
| return vectordb | |
| # ----------------------------------------------------------- | |
| # LOAD OR CREATE CHROMA | |
| # ----------------------------------------------------------- | |
| def load_or_create_chroma() -> Chroma: | |
| if DB_DIR.exists() and any(DB_DIR.iterdir()): | |
| print("πΉ Loading existing Chroma DB.") | |
| embeddings = HuggingFaceEmbeddings(model_name=EMBED_MODEL_NAME) | |
| return Chroma( | |
| embedding_function=embeddings, | |
| persist_directory=str(DB_DIR), | |
| ) | |
| print("πΉ No existing DB found; building a new one...") | |
| return build_chroma_db() | |
| vectordb = load_or_create_chroma() | |
| retriever = vectordb.as_retriever(search_kwargs={"k": 6}) | |
| print("β Retriever is ready.") | |
| # ----------------------------------------------------------- | |
| # FASTAPI SETUP | |
| # ----------------------------------------------------------- | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| DIST_DIR = BASE_DIR / "dist" | |
| if DIST_DIR.exists(): | |
| assets_dir = DIST_DIR / "assets" | |
| if assets_dir.exists(): | |
| app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets") | |
| async def serve_index(): | |
| return FileResponse(DIST_DIR / "index.html") | |
| else: | |
| print(" dist/ folder not found β frontend not served by backend.") | |
| # ----------------------------------------------------------- | |
| # MAIN CHAT API | |
| # ----------------------------------------------------------- | |
| async def digital_icfai_chat_post(user_message: str = Form(...)): | |
| query = user_message.strip() | |
| print(f" /digital_icfai_chat POST query: {query!r}") | |
| # ------------------------------------------------------- | |
| # ROBUST RETRIEVER LOGIC | |
| # ------------------------------------------------------- | |
| try: | |
| docs = None | |
| # Preferred method | |
| if hasattr(retriever, "get_relevant_documents"): | |
| maybe = retriever.get_relevant_documents(query) | |
| docs = await maybe if hasattr(maybe, "__await__") else maybe | |
| # Async alternatives | |
| elif hasattr(retriever, "aget_relevant_documents"): | |
| maybe = retriever.aget_relevant_documents(query) | |
| docs = await maybe if hasattr(maybe, "__await__") else maybe | |
| elif hasattr(retriever, "get_relevant_documents_async"): | |
| maybe = retriever.get_relevant_documents_async(query) | |
| docs = await maybe if hasattr(maybe, "__await__") else maybe | |
| # Fallback: use Chroma directly | |
| elif hasattr(vectordb, "similarity_search"): | |
| maybe = vectordb.similarity_search(query, k=6) | |
| docs = await maybe if hasattr(maybe, "__await__") else maybe | |
| else: | |
| msg = "Retriever does not support document search in this environment." | |
| print("β", msg) | |
| return {"answer": msg} | |
| print(f" Retrieved {len(docs) if docs else 0} docs") | |
| except Exception as e: | |
| print(f"β Retriever error: {e}") | |
| return {"answer": f"Retriever error: {e}"} | |
| context = "\n\n".join([d.page_content for d in docs]) if docs else "" | |
| print(f" Context length: {len(context)} chars") | |
| prompt = f""" | |
| You are the Digital ICFAI Assistant. | |
| Use ONLY the context below to answer. | |
| If the context does not contain the answer, say so politely. | |
| Give a detailed, clear, student-friendly explanation with 4β6 lines. Add examples wherever helpful. | |
| Context: | |
| {context} | |
| Question: | |
| {query} | |
| Answer (clear, student-friendly): | |
| """ | |
| try: | |
| answer = ask_ollama(prompt) | |
| except Exception as e: | |
| print(f"β LLM error: {e}") | |
| answer = f"LLM error: {e}" | |
| return {"answer": answer} | |
| # ----------------------------------------------------------- | |
| # DEV SERVER | |
| # ----------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| port = int(os.environ.get("PORT", 7860)) | |
| print(f"π Starting IBS Assistant at http://0.0.0.0:{port}") | |
| uvicorn.run("app:app", host="0.0.0.0", port=port, reload=False) | |