Spaces:
Runtime error
Runtime error
| import os | |
| os.environ["POSTHOG_DISABLED"] = "true" # Disable PostHog telemetry | |
| import requests | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| from kb_embed import collection, ingest_documents, search_knowledge_base, model | |
| from contextlib import asynccontextmanager | |
| from services.login import router as login_router | |
| from services.generate_ticket import create_incident | |
| from states import ChatState | |
| from helpers import get_session, reset_session, get_user_only_text | |
| # Load environment variables from the .env file | |
| load_dotenv() | |
| # --- 1. Initialize FastAPI --- | |
| async def lifespan(app: FastAPI): | |
| try: | |
| folder_path = os.path.join(os.getcwd(), "documents") | |
| ingest_documents(folder_path) | |
| #if collection.count() == 0: | |
| # print("🔍 KB empty. Running ingestion...") | |
| # ingest_documents(folder_path) | |
| #else: | |
| # print(f"✅ KB already populated with {collection.count()} entries. Skipping ingestion.") | |
| except Exception as e: | |
| print(f"⚠️ KB ingestion failed: {e}") | |
| yield | |
| app = FastAPI(lifespan=lifespan) | |
| # Mount the login routes | |
| app.include_router(login_router) | |
| # --- 2. Configure CORS --- | |
| origins = [ | |
| "http://localhost:5173", | |
| "http://localhost:3000", | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- 3. Define the Request Data Structure --- | |
| class ChatInput(BaseModel): | |
| user_message: str | |
| session_id: str | |
| # NEW: Request model for incident creation (from frontend) | |
| class IncidentInput(BaseModel): | |
| short_description: str | |
| description: str | |
| # Optional: add username later if you decide to map caller_id | |
| # username: Optional[str] = None | |
| INC_FALLBACK_KEYWORD = "INC_FALLBACK" | |
| MIN_SIMILARITY_THRESHOLD = 0.60 | |
| MAX_CLARIFICATIONS = 2 | |
| # --- 4. Gemini API Setup --- | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_URL = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash-lite:generateContent?key={GEMINI_API_KEY}" | |
| # --- 5. Endpoints --- | |
| async def health_check(): | |
| return {"status": "ok"} | |
| CLARIFY_QUESTIONS = [ | |
| "Can you share the exact error message you are seeing?", | |
| "When did this issue start?", | |
| "Is this happening for all users or only you?" | |
| ] | |
| async def chat_with_ai(input_data: ChatInput, request: Request): | |
| try: | |
| # ================================================= | |
| # Session Handling | |
| # ================================================= | |
| session = get_session(input_data.session_id) | |
| user_text = input_data.user_message.strip() | |
| if "miss_count" not in session: | |
| session["miss_count"] = 0 | |
| if "last_answered_docs" not in session: | |
| session["last_answered_docs"] = [] | |
| text = user_text.lower() | |
| action_words = ["raise","create","log","open"] | |
| object_words = ["incident","ticket"] | |
| has_action = any(w in text for w in action_words) | |
| has_object = any(w in text for w in object_words) | |
| if has_action and has_object: | |
| return {"bot_response":"Please provide incident details.","state":"RAISE_INCIDENT"} | |
| session["messages"].append({"role": "user", "text": user_text}) | |
| # Combine all user messages for better query context | |
| combined_query = get_user_only_text(session["messages"]) | |
| #print("combined_query: ",combined_query) | |
| # ================================================= | |
| # RAG: Knowledge Base Search | |
| # ================================================= | |
| kb_results = search_knowledge_base(input_data.user_message, top_k=2) | |
| print("kb_results: ",kb_results) | |
| # Extract relevant context from search results | |
| if not kb_results: | |
| # --- FAILURE PATH --- | |
| session["miss_count"] += 1 | |
| # Check if there was previous context | |
| if session["last_answered_docs"]: | |
| return { | |
| "bot_response": "I didn’t find anything new, but I’m happy to clarify more about what we discussed earlier. Can you specify what you want to know next?", | |
| "debug": f"Context found previously (Top {len(session['last_answered_docs'])} documents)", | |
| "followup": "You can ask more questions or request an incident if needed." | |
| } | |
| if session["miss_count"] >= 2: | |
| # Reset here so if they start a NEW topic after the ticket, | |
| # they get another 2 chances. | |
| session["miss_count"] = 0 | |
| return {"bot_response": "I couldn't find relevant SOP information. Please escalate to WMS Support or raise an incident.", "state": "SUGGEST_INCIDENT"} | |
| else: | |
| return {"bot_response": "I couldn't find a clear resolution in the SOP. Could you rephrase or add more detail?", "state": "REPHRASE"} | |
| session["miss_count"] = 0 # Reset because we found something! | |
| session["last_answered_docs"] = kb_results # store for conversational fallback | |
| context = "\n\n".join(item["doc"] for item in kb_results if "doc" in item) | |
| enhanced_prompt = f"""Use the following knowledge base context to answer the user's question accurately. | |
| If the context contains relevant information, base your answer on it. | |
| If the context doesn't help, say please raise an incident. | |
| Knowledge Base Context: | |
| {context} | |
| User Question: {input_data.user_message} | |
| Answer:""" | |
| headers = {"Content-Type": "application/json"} | |
| payload = { | |
| "contents": [ | |
| { | |
| "parts": [{"text": enhanced_prompt}] | |
| } | |
| ] | |
| } | |
| # ================================================= | |
| # Call Gemini API | |
| # ================================================= | |
| response = requests.post(GEMINI_URL, headers=headers, json=payload, verify=False) | |
| result = response.json() | |
| # Extract Gemini's response safely | |
| try: | |
| bot_response = result["candidates"][0]["content"]["parts"][0]["text"].strip() | |
| except Exception: | |
| print("response.status_code: ",response.status_code,"\nresponse.text: ",response.text) | |
| bot_response = "I encountered an error generating the response." | |
| # ================================================= | |
| # Add Debug Info & Follow-up for Frontend | |
| # ================================================= | |
| debug_info = f"Context found: {'Yes' if context else 'No'}" | |
| if context: | |
| debug_info += f" (Top {len(kb_results)} documents used)" | |
| followup = ( | |
| "Hope this helps! Would you like me to raise an incident for tracking?" | |
| if context else | |
| "I couldn't find a resolution in the knowledge base. Should I raise an incident?" | |
| ) | |
| return { | |
| "bot_response": bot_response, | |
| "debug": debug_info, | |
| "followup":followup | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def raise_incident(input_data: IncidentInput): | |
| """ | |
| Frontend calls this after the user says 'yes' and provides short & long description. | |
| """ | |
| try: | |
| result = create_incident(input_data.short_description, input_data.description) | |
| print("result: ",result) | |
| # Handle dict vs string returns | |
| if isinstance(result, dict): | |
| inc_number = result.get("number", "<unknown>") | |
| sys_id = result.get("sys_id", "<unknown>") | |
| ticket_text = f"Incident created: {inc_number}" | |
| else: | |
| ticket_text = str(result) | |
| return { | |
| "bot_response": f"✅ {ticket_text}\n\nIs there anything else I can assist you with?", | |
| "debug": "Incident created via ServiceNow" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) |