import os import uvicorn from fastapi import FastAPI, Request, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import httpx from ddgs import DDGS from dotenv import load_dotenv from huggingface_hub import hf_hub_download from llama_cpp import Llama load_dotenv() app = FastAPI(title="Edyx Situation Aware AI Pipeline") # Allow requests from the Edyx gateway/frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class ChatRequest(BaseModel): message: str messages: list = [] GROQ_API_KEY = os.environ.get("GROQ_API_KEY") # --- Local Fallback LLM Setup --- fallback_llm = None try: print("Initializing Local Fallback Model (Qwen 0.5B GGUF)...") model_path = hf_hub_download( repo_id="Qwen/Qwen1.5-0.5B-Chat-GGUF", filename="qwen1_5-0_5b-chat-q4_k_m.gguf", cache_dir="./models" ) fallback_llm = Llama( model_path=model_path, n_ctx=2048, n_gpu_layers=0, # CPU only on basic spaces verbose=False ) print("Local Fallback Model ready.") except Exception as e: print(f"Failed to initialize local fallback LLM: {e}") import datetime async def evaluate_needs_search(query: str) -> bool: """Uses a fast, small model to determine if the query requires real-time data.""" if not GROQ_API_KEY: return False current_date = datetime.datetime.now().strftime("%B %d, %Y") system_prompt = f"""You are a highly efficient classification router. Today's date is {current_date}. Determine if the user's query requires up-to-date, real-time information or current events data from the internet to answer accurately. If the user asks about an event, person, software, or fact that changes frequently or occurred near or after {current_date}, it requires search. Respond ONLY with "YES" if it requires search, or "NO" if it can be answered with general, static knowledge. DO NOT provide any other text.""" try: async with httpx.AsyncClient() as client: response = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json={ "model": "llama-3.1-8b-instant", # Fast and cheap for routing "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": query} ], "temperature": 0.1, "max_tokens": 10 }, timeout=10.0 ) response.raise_for_status() result = response.json() answer = result['choices'][0]['message']['content'].strip().upper() return "YES" in answer except Exception as e: print(f"Routing evaluation error: {e}") return False # Default to no search on error to save latency import time def perform_search(query: str, max_results: int = 4) -> str: """Performs a web search using DuckDuckGo with basic retry logic.""" for attempt in range(2): try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) if not results: if attempt == 0: time.sleep(1) # short backoff continue return "Search returned no direct results. Please rely on your internal knowledge." context = "Here is real-time web search data regarding the user's query:\n\n" for i, r in enumerate(results): context += f"Source {i+1} [{r.get('title', 'Unknown Title')}]: {r.get('body', '')}\nURL: {r.get('href', 'N/A')}\n\n" return context except Exception as e: print(f"Search attempt {attempt + 1} failed: {e}") if attempt == 0: time.sleep(1) else: return f"Search engine was temporarily unavailable ({e}). Please rely on your internal knowledge instead." return "" @app.post("/chat/completions") async def situation_aware_chat(request: ChatRequest): if not GROQ_API_KEY and not fallback_llm: raise HTTPException(status_code=500, detail="No AI service is currently available.") # 1. Evaluate if search is needed user_query = request.message needs_search = await evaluate_needs_search(user_query) context_injection = "" if needs_search: print(f"Query '{user_query}' requires search. Fetching data...") context_injection = perform_search(user_query) print("Search complete.") # 2. Prepare the final prompt current_time_str = datetime.datetime.now().strftime("%A, %B %d, %Y %I:%M %p") system_base = f"You are 'Situation Aware AI', an advanced assistant integrated into the Edyx platform. Today's current date and time is {current_time_str}. Use this date as your absolute frame of reference for 'now'." if context_injection: system_base += f"\n\nThe user has asked a question that requires current knowledge. You have been provided with real-time web search results below. Synthesize a comprehensive, highly accurate, state-of-the-art response using ONLY the provided facts. Cite your sources naturally in your response.\n\n--- WEB SEARCH RESULTS ---\n{context_injection}\n--- END RESULTS ---" # Construct message array preserving history final_messages = [{"role": "system", "content": system_base}] # Add previous history (excluding the current message if it's already in the list) for msg in request.messages: final_messages.append({"role": msg.get("role", "user"), "content": msg.get("content", "")}) # Ensure current query is at the end if not provided in history block if not request.messages or request.messages[-1].get("content") != user_query: final_messages.append({"role": "user", "content": user_query}) # 3. Call Primary LLM try: if not GROQ_API_KEY: raise Exception("GROQ API Key missing, forcing fallback.") async with httpx.AsyncClient() as client: response = await client.post( "https://api.groq.com/openai/v1/chat/completions", headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, json={ "model": "llama-3.3-70b-versatile", "messages": final_messages, "temperature": 0.5, "max_tokens": 4096 }, timeout=30.0 ) response.raise_for_status() result = response.json() return result except Exception as e: print(f"Primary LLM Error: {e}") # 4. Execute Local Fallback if fallback_llm: print("Primary API failed. Firing local fallback inference...") try: # Format for huggingface chat template (basic approximation) prompt_text = "\n".join([f"<|im_start|>{m['role']}\n{m['content']}<|im_end|>" for m in final_messages]) prompt_text += "\n<|im_start|>assistant\n" output = fallback_llm( prompt_text, max_tokens=1024, temperature=0.7, stop=["<|im_end|>", "<|im_start|>"] ) # Format to match OpenAI API Spec return { "id": output.get("id", "fallback_id"), "object": "chat.completion", "created": output.get("created", 0), "model": "qwen-0.5b-local-fallback", "choices": [ { "index": 0, "message": { "role": "assistant", "content": output["choices"][0]["text"].strip() }, "finish_reason": "stop" } ], "usage": output.get("usage", {}) } except Exception as fallback_e: print(f"Fallback LLM Error: {fallback_e}") raise HTTPException(status_code=503, detail="Primary AI and Local Fallback are currently unavailable.") raise HTTPException(status_code=503, detail="Primary AI service is currently unavailable.") @app.get("/health") def health_check(): return {"status": "ok", "service": "edyx-situation-aware-pipeline"} if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)