Spaces:
Sleeping
Sleeping
| import os | |
| os.environ["POSTHOG_DISABLED"] = "true" # Disable PostHog telemetry | |
| import requests | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| #from kb_embed import search_knowledge_base | |
| from services.kb_creation import collection, ingest_documents, search_knowledge_base | |
| from contextlib import asynccontextmanager | |
| import google.generativeai as genai | |
| # --- 0. Config --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| #print("GEMINI API KEY",GEMINI_API_KEY) | |
| if not GEMINI_API_KEY: | |
| raise RuntimeError("GEMINI_API_KEY is not set in environment.") | |
| # Configure the SDK | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| # Choose the model | |
| MODEL_NAME = "gemini-2.5-flash-lite" | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| # Load environment variables from the .env file | |
| load_dotenv() | |
| # --- 1. Initialize FastAPI --- | |
| #app = FastAPI() | |
| async def lifespan(app: FastAPI): | |
| try: | |
| folder_path = os.path.join(os.getcwd(), "documents") | |
| if collection.count() == 0: | |
| print("π KB empty. Running ingestion...") | |
| ingest_documents(folder_path) | |
| else: | |
| print(f"β KB already populated with {collection.count()} entries. Skipping ingestion.") | |
| except Exception as e: | |
| print(f"β οΈ KB ingestion failed: {e}") | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| # --- Configure CORS --- | |
| origins = [ | |
| "https://jaita-chatbot-react-frontend-v1.hf.space" | |
| "https://jaita-chatbot-fastapi-backend.hf.space/chat", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 3. Define the Request Data Structure --- | |
| class ChatInput(BaseModel): | |
| user_message: str | |
| # --- 4. Gemini API Setup --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?key={GEMINI_API_KEY}" | |
| # --- 5. Endpoints --- | |
| async def health_check(): | |
| return {"status": "ok"} | |
| #@app.on_event("startup") | |
| #async def startup(): | |
| # try: | |
| # folder_path = os.path.join(os.getcwd(), "documents") | |
| # if collection.count() == 0: | |
| # print("π KB empty. Running ingestion...") | |
| # ingest_all_documents(folder_path) | |
| # else: | |
| # print(f"β KB already populated with {collection.count()} entries. Skipping ingestion.") | |
| # except Exception as e: | |
| # print(f"β οΈ KB ingestion failed: {e}") | |
| async def chat_with_ai(input_data: ChatInput): | |
| """Handle chat interactions using Google Generative AI via requests.""" | |
| try: | |
| #folder_path = os.path.join(os.getcwd(), "documents") | |
| #print("folder_path",folder_path) | |
| # Retrieve relevant documents from knowledge base | |
| kb_results = search_knowledge_base(input_data.user_message, top_k=10) | |
| #print(f"kb_results are: {kb_results}") | |
| # Extract relevant context from search results | |
| context = "" | |
| relevant_docs=[] | |
| if kb_results and kb_results.get('documents'): | |
| # Limit context to avoid token limits - take top 2 most relevant | |
| relevant_docs = kb_results['documents'][0][:2] | |
| context = "\n\n".join(relevant_docs) | |
| # Construct enhanced prompt with context | |
| if context: | |
| enhanced_prompt = f"""Use the following knowledge base context to answer the user's question accurately. | |
| If the context contains relevant information, base your answer on it. | |
| If the context doesn't help, say you're raising a ticket and provide a ticket number. | |
| Knowledge Base Context: | |
| {context} | |
| User Question: {input_data.user_message} | |
| Answer:""" | |
| else: | |
| enhanced_prompt = f"User Question: {input_data.user_message}\n\nAnswer:" | |
| headers = {"Content-Type": "application/json"} | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [{"text": enhanced_prompt}] | |
| } | |
| ] | |
| } | |
| response = requests.post(GEMINI_URL, headers=headers, json=payload, verify=False) # SSL disabled for testing | |
| result = response.json() | |
| #print("result",result) | |
| # Extract Gemini's response | |
| bot_response = result["candidates"][0]["content"]["parts"][0]["text"] | |
| # Include debug info in response | |
| debug_info = f"Context found: {'Yes' if context else 'No'}" | |
| if context: | |
| debug_info += f" (Top {len(relevant_docs)} documents used)" | |
| return {"bot_response": bot_response, "debug": debug_info} | |
| # Make POST request to Gemini API | |
| #response = requests.post(GEMINI_URL, json=payload,verify=False) | |
| #if(response.status_code==200): | |
| # print("response",response.status_code) | |
| # data = response.json() | |
| # #print("data",data) | |
| # Extract text from response | |
| #bot_response = "" | |
| #if "candidates" in data and data["candidates"]: | |
| # parts = data["candidates"][0].get("content", {}).get("parts", []) | |
| # for part in parts: | |
| # if "text" in part: | |
| # bot_response += part["text"] | |
| # | |
| #return {"bot_response": bot_response or "No response text."} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |