Spaces:
Running
Running
File size: 8,949 Bytes
e997190 ce3bf47 491b51c 7ef84f5 491b51c e997190 7ef84f5 e997190 2a10f9f e997190 2a10f9f e997190 2a10f9f e997190 491b51c e997190 2a10f9f e997190 7ef84f5 e997190 2a10f9f e997190 2a10f9f e997190 7ef84f5 e997190 491b51c e997190 7ef84f5 e997190 e5fe749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 | import os
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
from ddgs import DDGS
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
load_dotenv()
app = FastAPI(title="Edyx Situation Aware AI Pipeline")
# Allow requests from the Edyx gateway/frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
messages: list = []
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
# --- Local Fallback LLM Setup ---
fallback_llm = None
try:
print("Initializing Local Fallback Model (Qwen 0.5B GGUF)...")
model_path = hf_hub_download(
repo_id="Qwen/Qwen1.5-0.5B-Chat-GGUF",
filename="qwen1_5-0_5b-chat-q4_k_m.gguf",
cache_dir="./models"
)
fallback_llm = Llama(
model_path=model_path,
n_ctx=2048,
n_gpu_layers=0, # CPU only on basic spaces
verbose=False
)
print("Local Fallback Model ready.")
except Exception as e:
print(f"Failed to initialize local fallback LLM: {e}")
import datetime
async def evaluate_needs_search(query: str) -> bool:
"""Uses a fast, small model to determine if the query requires real-time data."""
if not GROQ_API_KEY:
return False
current_date = datetime.datetime.now().strftime("%B %d, %Y")
system_prompt = f"""You are a highly efficient classification router.
Today's date is {current_date}.
Determine if the user's query requires up-to-date, real-time information or current events data from the internet to answer accurately.
If the user asks about an event, person, software, or fact that changes frequently or occurred near or after {current_date}, it requires search.
Respond ONLY with "YES" if it requires search, or "NO" if it can be answered with general, static knowledge.
DO NOT provide any other text."""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
json={
"model": "llama-3.1-8b-instant", # Fast and cheap for routing
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
],
"temperature": 0.1,
"max_tokens": 10
},
timeout=10.0
)
response.raise_for_status()
result = response.json()
answer = result['choices'][0]['message']['content'].strip().upper()
return "YES" in answer
except Exception as e:
print(f"Routing evaluation error: {e}")
return False # Default to no search on error to save latency
import time
def perform_search(query: str, max_results: int = 4) -> str:
"""Performs a web search using DuckDuckGo with basic retry logic."""
for attempt in range(2):
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
if attempt == 0:
time.sleep(1) # short backoff
continue
return "Search returned no direct results. Please rely on your internal knowledge."
context = "Here is real-time web search data regarding the user's query:\n\n"
for i, r in enumerate(results):
context += f"Source {i+1} [{r.get('title', 'Unknown Title')}]: {r.get('body', '')}\nURL: {r.get('href', 'N/A')}\n\n"
return context
except Exception as e:
print(f"Search attempt {attempt + 1} failed: {e}")
if attempt == 0:
time.sleep(1)
else:
return f"Search engine was temporarily unavailable ({e}). Please rely on your internal knowledge instead."
return ""
@app.post("/chat/completions")
async def situation_aware_chat(request: ChatRequest):
if not GROQ_API_KEY and not fallback_llm:
raise HTTPException(status_code=500, detail="No AI service is currently available.")
# 1. Evaluate if search is needed
user_query = request.message
needs_search = await evaluate_needs_search(user_query)
context_injection = ""
if needs_search:
print(f"Query '{user_query}' requires search. Fetching data...")
context_injection = perform_search(user_query)
print("Search complete.")
# 2. Prepare the final prompt
current_time_str = datetime.datetime.now().strftime("%A, %B %d, %Y %I:%M %p")
system_base = f"You are 'Situation Aware AI', an advanced assistant integrated into the Edyx platform. Today's current date and time is {current_time_str}. Use this date as your absolute frame of reference for 'now'."
if context_injection:
system_base += f"\n\nThe user has asked a question that requires current knowledge. You have been provided with real-time web search results below. Synthesize a comprehensive, highly accurate, state-of-the-art response using ONLY the provided facts. Cite your sources naturally in your response.\n\n--- WEB SEARCH RESULTS ---\n{context_injection}\n--- END RESULTS ---"
# Construct message array preserving history
final_messages = [{"role": "system", "content": system_base}]
# Add previous history (excluding the current message if it's already in the list)
for msg in request.messages:
final_messages.append({"role": msg.get("role", "user"), "content": msg.get("content", "")})
# Ensure current query is at the end if not provided in history block
if not request.messages or request.messages[-1].get("content") != user_query:
final_messages.append({"role": "user", "content": user_query})
# 3. Call Primary LLM
try:
if not GROQ_API_KEY:
raise Exception("GROQ API Key missing, forcing fallback.")
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
json={
"model": "llama-3.3-70b-versatile",
"messages": final_messages,
"temperature": 0.5,
"max_tokens": 4096
},
timeout=30.0
)
response.raise_for_status()
result = response.json()
return result
except Exception as e:
print(f"Primary LLM Error: {e}")
# 4. Execute Local Fallback
if fallback_llm:
print("Primary API failed. Firing local fallback inference...")
try:
# Format for huggingface chat template (basic approximation)
prompt_text = "\n".join([f"<|im_start|>{m['role']}\n{m['content']}<|im_end|>" for m in final_messages])
prompt_text += "\n<|im_start|>assistant\n"
output = fallback_llm(
prompt_text,
max_tokens=1024,
temperature=0.7,
stop=["<|im_end|>", "<|im_start|>"]
)
# Format to match OpenAI API Spec
return {
"id": output.get("id", "fallback_id"),
"object": "chat.completion",
"created": output.get("created", 0),
"model": "qwen-0.5b-local-fallback",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": output["choices"][0]["text"].strip()
},
"finish_reason": "stop"
}
],
"usage": output.get("usage", {})
}
except Exception as fallback_e:
print(f"Fallback LLM Error: {fallback_e}")
raise HTTPException(status_code=503, detail="Primary AI and Local Fallback are currently unavailable.")
raise HTTPException(status_code=503, detail="Primary AI service is currently unavailable.")
@app.get("/health")
def health_check():
return {"status": "ok", "service": "edyx-situation-aware-pipeline"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |