situationaware / main.py
Adi362's picture
Update main.py
ce3bf47 verified
import os
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
from ddgs import DDGS
from dotenv import load_dotenv
from huggingface_hub import hf_hub_download
from llama_cpp import Llama
load_dotenv()
app = FastAPI(title="Edyx Situation Aware AI Pipeline")
# Allow requests from the Edyx gateway/frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
messages: list = []
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
# --- Local Fallback LLM Setup ---
fallback_llm = None
try:
print("Initializing Local Fallback Model (Qwen 0.5B GGUF)...")
model_path = hf_hub_download(
repo_id="Qwen/Qwen1.5-0.5B-Chat-GGUF",
filename="qwen1_5-0_5b-chat-q4_k_m.gguf",
cache_dir="./models"
)
fallback_llm = Llama(
model_path=model_path,
n_ctx=2048,
n_gpu_layers=0, # CPU only on basic spaces
verbose=False
)
print("Local Fallback Model ready.")
except Exception as e:
print(f"Failed to initialize local fallback LLM: {e}")
import datetime
async def evaluate_needs_search(query: str) -> bool:
"""Uses a fast, small model to determine if the query requires real-time data."""
if not GROQ_API_KEY:
return False
current_date = datetime.datetime.now().strftime("%B %d, %Y")
system_prompt = f"""You are a highly efficient classification router.
Today's date is {current_date}.
Determine if the user's query requires up-to-date, real-time information or current events data from the internet to answer accurately.
If the user asks about an event, person, software, or fact that changes frequently or occurred near or after {current_date}, it requires search.
Respond ONLY with "YES" if it requires search, or "NO" if it can be answered with general, static knowledge.
DO NOT provide any other text."""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
json={
"model": "llama-3.1-8b-instant", # Fast and cheap for routing
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": query}
],
"temperature": 0.1,
"max_tokens": 10
},
timeout=10.0
)
response.raise_for_status()
result = response.json()
answer = result['choices'][0]['message']['content'].strip().upper()
return "YES" in answer
except Exception as e:
print(f"Routing evaluation error: {e}")
return False # Default to no search on error to save latency
import time
def perform_search(query: str, max_results: int = 4) -> str:
"""Performs a web search using DuckDuckGo with basic retry logic."""
for attempt in range(2):
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
if not results:
if attempt == 0:
time.sleep(1) # short backoff
continue
return "Search returned no direct results. Please rely on your internal knowledge."
context = "Here is real-time web search data regarding the user's query:\n\n"
for i, r in enumerate(results):
context += f"Source {i+1} [{r.get('title', 'Unknown Title')}]: {r.get('body', '')}\nURL: {r.get('href', 'N/A')}\n\n"
return context
except Exception as e:
print(f"Search attempt {attempt + 1} failed: {e}")
if attempt == 0:
time.sleep(1)
else:
return f"Search engine was temporarily unavailable ({e}). Please rely on your internal knowledge instead."
return ""
@app.post("/chat/completions")
async def situation_aware_chat(request: ChatRequest):
if not GROQ_API_KEY and not fallback_llm:
raise HTTPException(status_code=500, detail="No AI service is currently available.")
# 1. Evaluate if search is needed
user_query = request.message
needs_search = await evaluate_needs_search(user_query)
context_injection = ""
if needs_search:
print(f"Query '{user_query}' requires search. Fetching data...")
context_injection = perform_search(user_query)
print("Search complete.")
# 2. Prepare the final prompt
current_time_str = datetime.datetime.now().strftime("%A, %B %d, %Y %I:%M %p")
system_base = f"You are 'Situation Aware AI', an advanced assistant integrated into the Edyx platform. Today's current date and time is {current_time_str}. Use this date as your absolute frame of reference for 'now'."
if context_injection:
system_base += f"\n\nThe user has asked a question that requires current knowledge. You have been provided with real-time web search results below. Synthesize a comprehensive, highly accurate, state-of-the-art response using ONLY the provided facts. Cite your sources naturally in your response.\n\n--- WEB SEARCH RESULTS ---\n{context_injection}\n--- END RESULTS ---"
# Construct message array preserving history
final_messages = [{"role": "system", "content": system_base}]
# Add previous history (excluding the current message if it's already in the list)
for msg in request.messages:
final_messages.append({"role": msg.get("role", "user"), "content": msg.get("content", "")})
# Ensure current query is at the end if not provided in history block
if not request.messages or request.messages[-1].get("content") != user_query:
final_messages.append({"role": "user", "content": user_query})
# 3. Call Primary LLM
try:
if not GROQ_API_KEY:
raise Exception("GROQ API Key missing, forcing fallback.")
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.groq.com/openai/v1/chat/completions",
headers={"Authorization": f"Bearer {GROQ_API_KEY}"},
json={
"model": "llama-3.3-70b-versatile",
"messages": final_messages,
"temperature": 0.5,
"max_tokens": 4096
},
timeout=30.0
)
response.raise_for_status()
result = response.json()
return result
except Exception as e:
print(f"Primary LLM Error: {e}")
# 4. Execute Local Fallback
if fallback_llm:
print("Primary API failed. Firing local fallback inference...")
try:
# Format for huggingface chat template (basic approximation)
prompt_text = "\n".join([f"<|im_start|>{m['role']}\n{m['content']}<|im_end|>" for m in final_messages])
prompt_text += "\n<|im_start|>assistant\n"
output = fallback_llm(
prompt_text,
max_tokens=1024,
temperature=0.7,
stop=["<|im_end|>", "<|im_start|>"]
)
# Format to match OpenAI API Spec
return {
"id": output.get("id", "fallback_id"),
"object": "chat.completion",
"created": output.get("created", 0),
"model": "qwen-0.5b-local-fallback",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": output["choices"][0]["text"].strip()
},
"finish_reason": "stop"
}
],
"usage": output.get("usage", {})
}
except Exception as fallback_e:
print(f"Fallback LLM Error: {fallback_e}")
raise HTTPException(status_code=503, detail="Primary AI and Local Fallback are currently unavailable.")
raise HTTPException(status_code=503, detail="Primary AI service is currently unavailable.")
@app.get("/health")
def health_check():
return {"status": "ok", "service": "edyx-situation-aware-pipeline"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)