Spaces:
Running
Running
| import os | |
| import uvicorn | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| import httpx | |
| from ddgs import DDGS | |
| from dotenv import load_dotenv | |
| from huggingface_hub import hf_hub_download | |
| from llama_cpp import Llama | |
| load_dotenv() | |
| app = FastAPI(title="Edyx Situation Aware AI Pipeline") | |
| # Allow requests from the Edyx gateway/frontend | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class ChatRequest(BaseModel): | |
| message: str | |
| messages: list = [] | |
| GROQ_API_KEY = os.environ.get("GROQ_API_KEY") | |
| # --- Local Fallback LLM Setup --- | |
| fallback_llm = None | |
| try: | |
| print("Initializing Local Fallback Model (Qwen 0.5B GGUF)...") | |
| model_path = hf_hub_download( | |
| repo_id="Qwen/Qwen1.5-0.5B-Chat-GGUF", | |
| filename="qwen1_5-0_5b-chat-q4_k_m.gguf", | |
| cache_dir="./models" | |
| ) | |
| fallback_llm = Llama( | |
| model_path=model_path, | |
| n_ctx=2048, | |
| n_gpu_layers=0, # CPU only on basic spaces | |
| verbose=False | |
| ) | |
| print("Local Fallback Model ready.") | |
| except Exception as e: | |
| print(f"Failed to initialize local fallback LLM: {e}") | |
| import datetime | |
| async def evaluate_needs_search(query: str) -> bool: | |
| """Uses a fast, small model to determine if the query requires real-time data.""" | |
| if not GROQ_API_KEY: | |
| return False | |
| current_date = datetime.datetime.now().strftime("%B %d, %Y") | |
| system_prompt = f"""You are a highly efficient classification router. | |
| Today's date is {current_date}. | |
| Determine if the user's query requires up-to-date, real-time information or current events data from the internet to answer accurately. | |
| If the user asks about an event, person, software, or fact that changes frequently or occurred near or after {current_date}, it requires search. | |
| Respond ONLY with "YES" if it requires search, or "NO" if it can be answered with general, static knowledge. | |
| DO NOT provide any other text.""" | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.groq.com/openai/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, | |
| json={ | |
| "model": "llama-3.1-8b-instant", # Fast and cheap for routing | |
| "messages": [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": query} | |
| ], | |
| "temperature": 0.1, | |
| "max_tokens": 10 | |
| }, | |
| timeout=10.0 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| answer = result['choices'][0]['message']['content'].strip().upper() | |
| return "YES" in answer | |
| except Exception as e: | |
| print(f"Routing evaluation error: {e}") | |
| return False # Default to no search on error to save latency | |
| import time | |
| def perform_search(query: str, max_results: int = 4) -> str: | |
| """Performs a web search using DuckDuckGo with basic retry logic.""" | |
| for attempt in range(2): | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| if not results: | |
| if attempt == 0: | |
| time.sleep(1) # short backoff | |
| continue | |
| return "Search returned no direct results. Please rely on your internal knowledge." | |
| context = "Here is real-time web search data regarding the user's query:\n\n" | |
| for i, r in enumerate(results): | |
| context += f"Source {i+1} [{r.get('title', 'Unknown Title')}]: {r.get('body', '')}\nURL: {r.get('href', 'N/A')}\n\n" | |
| return context | |
| except Exception as e: | |
| print(f"Search attempt {attempt + 1} failed: {e}") | |
| if attempt == 0: | |
| time.sleep(1) | |
| else: | |
| return f"Search engine was temporarily unavailable ({e}). Please rely on your internal knowledge instead." | |
| return "" | |
| async def situation_aware_chat(request: ChatRequest): | |
| if not GROQ_API_KEY and not fallback_llm: | |
| raise HTTPException(status_code=500, detail="No AI service is currently available.") | |
| # 1. Evaluate if search is needed | |
| user_query = request.message | |
| needs_search = await evaluate_needs_search(user_query) | |
| context_injection = "" | |
| if needs_search: | |
| print(f"Query '{user_query}' requires search. Fetching data...") | |
| context_injection = perform_search(user_query) | |
| print("Search complete.") | |
| # 2. Prepare the final prompt | |
| current_time_str = datetime.datetime.now().strftime("%A, %B %d, %Y %I:%M %p") | |
| system_base = f"You are 'Situation Aware AI', an advanced assistant integrated into the Edyx platform. Today's current date and time is {current_time_str}. Use this date as your absolute frame of reference for 'now'." | |
| if context_injection: | |
| system_base += f"\n\nThe user has asked a question that requires current knowledge. You have been provided with real-time web search results below. Synthesize a comprehensive, highly accurate, state-of-the-art response using ONLY the provided facts. Cite your sources naturally in your response.\n\n--- WEB SEARCH RESULTS ---\n{context_injection}\n--- END RESULTS ---" | |
| # Construct message array preserving history | |
| final_messages = [{"role": "system", "content": system_base}] | |
| # Add previous history (excluding the current message if it's already in the list) | |
| for msg in request.messages: | |
| final_messages.append({"role": msg.get("role", "user"), "content": msg.get("content", "")}) | |
| # Ensure current query is at the end if not provided in history block | |
| if not request.messages or request.messages[-1].get("content") != user_query: | |
| final_messages.append({"role": "user", "content": user_query}) | |
| # 3. Call Primary LLM | |
| try: | |
| if not GROQ_API_KEY: | |
| raise Exception("GROQ API Key missing, forcing fallback.") | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| "https://api.groq.com/openai/v1/chat/completions", | |
| headers={"Authorization": f"Bearer {GROQ_API_KEY}"}, | |
| json={ | |
| "model": "llama-3.3-70b-versatile", | |
| "messages": final_messages, | |
| "temperature": 0.5, | |
| "max_tokens": 4096 | |
| }, | |
| timeout=30.0 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| return result | |
| except Exception as e: | |
| print(f"Primary LLM Error: {e}") | |
| # 4. Execute Local Fallback | |
| if fallback_llm: | |
| print("Primary API failed. Firing local fallback inference...") | |
| try: | |
| # Format for huggingface chat template (basic approximation) | |
| prompt_text = "\n".join([f"<|im_start|>{m['role']}\n{m['content']}<|im_end|>" for m in final_messages]) | |
| prompt_text += "\n<|im_start|>assistant\n" | |
| output = fallback_llm( | |
| prompt_text, | |
| max_tokens=1024, | |
| temperature=0.7, | |
| stop=["<|im_end|>", "<|im_start|>"] | |
| ) | |
| # Format to match OpenAI API Spec | |
| return { | |
| "id": output.get("id", "fallback_id"), | |
| "object": "chat.completion", | |
| "created": output.get("created", 0), | |
| "model": "qwen-0.5b-local-fallback", | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": output["choices"][0]["text"].strip() | |
| }, | |
| "finish_reason": "stop" | |
| } | |
| ], | |
| "usage": output.get("usage", {}) | |
| } | |
| except Exception as fallback_e: | |
| print(f"Fallback LLM Error: {fallback_e}") | |
| raise HTTPException(status_code=503, detail="Primary AI and Local Fallback are currently unavailable.") | |
| raise HTTPException(status_code=503, detail="Primary AI service is currently unavailable.") | |
| def health_check(): | |
| return {"status": "ok", "service": "edyx-situation-aware-pipeline"} | |
| if __name__ == "__main__": | |
| uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |