|
|
import os |
|
|
import requests |
|
|
import httpx |
|
|
from fastapi import FastAPI, HTTPException, Security, Header |
|
|
from pydantic import BaseModel |
|
|
from typing import Optional |
|
|
from llama_cpp import Llama |
|
|
from fastembed import TextEmbedding |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
QDRANT_URL = os.environ["QDRANT_URL"].rstrip("/") |
|
|
QDRANT_API_KEY = os.environ["QDRANT_API_KEY"] |
|
|
COLLECTION = "well_vectors" |
|
|
|
|
|
SERVICE_API_KEY = os.environ.get("SERVICE_API_KEY") |
|
|
SERVICE_API_URL = "https://api.groq.com/openai/v1/chat/completions" |
|
|
SERVICE_MODEL = "llama-3.3-70b-versatile" |
|
|
|
|
|
EDYX_ACCESS_TOKEN = os.environ.get("EDYX_ACCESS_TOKEN") |
|
|
|
|
|
PHYSICS_SYSTEM_PROMPT = """You are an expert physics researcher and teacher. |
|
|
You are given retrieved scientific material from a physics knowledge base. |
|
|
Your job: |
|
|
- Use the retrieved material as grounding evidence |
|
|
- Ignore irrelevant technical artifacts (paths, array shapes, file names) |
|
|
- If information is incomplete, use your physics knowledge to complete the explanation |
|
|
- Do NOT invent specific papers, experiments, or citations |
|
|
- Produce a clean, coherent, human-readable explanation |
|
|
Style: Clear, structured, graduate-level physics understanding.""" |
|
|
|
|
|
local_llm = None |
|
|
|
|
|
def get_local_llm(): |
|
|
global local_llm |
|
|
if local_llm is None: |
|
|
print("Loading local fallback model...") |
|
|
local_llm = Llama( |
|
|
model_path="/app/model.gguf", |
|
|
n_ctx=4096, |
|
|
n_threads=2, |
|
|
n_batch=128, |
|
|
) |
|
|
return local_llm |
|
|
|
|
|
embedder = TextEmbedding( |
|
|
model_name="BAAI/bge-large-en-v1.5", |
|
|
) |
|
|
|
|
|
class QueryRequest(BaseModel): |
|
|
question: str |
|
|
top_k: Optional[int] = 5 |
|
|
max_tokens: Optional[int] = 512 |
|
|
|
|
|
async def verify_token(x_edyx_token: str = Header(None)): |
|
|
if EDYX_ACCESS_TOKEN and x_edyx_token != EDYX_ACCESS_TOKEN: |
|
|
raise HTTPException(status_code=403, detail="Unauthorized: Invalid Access Token") |
|
|
return x_edyx_token |
|
|
|
|
|
@app.get("/") |
|
|
def root(): |
|
|
return {"status": "edyx-phy running", "mode": "accelerated-primary"} |
|
|
|
|
|
def search_qdrant(question: str, top_k: int): |
|
|
vector = [float(x) for x in next(embedder.embed(question))] |
|
|
|
|
|
r = requests.post( |
|
|
f"{QDRANT_URL}/collections/{COLLECTION}/points/search", |
|
|
headers={ |
|
|
"Content-Type": "application/json", |
|
|
"api-key": QDRANT_API_KEY, |
|
|
}, |
|
|
json={ |
|
|
"vector": vector, |
|
|
"limit": top_k, |
|
|
"with_payload": True, |
|
|
}, |
|
|
timeout=30, |
|
|
) |
|
|
|
|
|
if r.status_code != 200: |
|
|
return None, f"Qdrant search failed: {r.text}" |
|
|
|
|
|
hits = r.json().get("result", []) |
|
|
|
|
|
collected = [] |
|
|
for h in hits: |
|
|
payload = h.get("payload", {}) |
|
|
if "content" in payload: |
|
|
collected.append(str(payload["content"])) |
|
|
if "text" in payload: |
|
|
collected.append(str(payload["text"])) |
|
|
|
|
|
context = "\n\n".join(collected)[:12000] |
|
|
return context, len(hits) |
|
|
|
|
|
async def call_service_api(question: str, context: str, max_tokens: int): |
|
|
if not SERVICE_API_KEY: |
|
|
raise Exception("Service API key not configured") |
|
|
|
|
|
user_prompt = f"""CONTEXT (retrieved evidence): |
|
|
{context} |
|
|
QUESTION: |
|
|
{question} |
|
|
Now produce a high-quality physics explanation that a serious learner would trust.""" |
|
|
|
|
|
async with httpx.AsyncClient(timeout=60.0) as client: |
|
|
response = await client.post( |
|
|
SERVICE_API_URL, |
|
|
headers={ |
|
|
"Content-Type": "application/json", |
|
|
"Authorization": f"Bearer {SERVICE_API_KEY}" |
|
|
}, |
|
|
json={ |
|
|
"model": SERVICE_MODEL, |
|
|
"messages": [ |
|
|
{"role": "system", "content": PHYSICS_SYSTEM_PROMPT}, |
|
|
{"role": "user", "content": user_prompt} |
|
|
], |
|
|
"max_tokens": max_tokens, |
|
|
"temperature": 0.2 |
|
|
} |
|
|
) |
|
|
|
|
|
if response.status_code != 200: |
|
|
raise Exception(f"Service API error: {response.status_code} - {response.text}") |
|
|
|
|
|
data = response.json() |
|
|
return data["choices"][0]["message"]["content"] |
|
|
|
|
|
def call_local_model(question: str, context: str, max_tokens: int): |
|
|
llm = get_local_llm() |
|
|
|
|
|
prompt = f""" |
|
|
You are an expert physics researcher and teacher. |
|
|
You are given raw, fragmented scientific material retrieved from a large physics knowledge base. |
|
|
This material may include: |
|
|
- incomplete sentences |
|
|
- dataset paths or filenames |
|
|
- low-level implementation details |
|
|
- broken or partial explanations |
|
|
Your job: |
|
|
- Use the retrieved material as grounding evidence |
|
|
- Ignore irrelevant technical artifacts (paths, array shapes, file names) |
|
|
- If the retrieved information is incomplete, use your physics knowledge to complete the explanation |
|
|
- Do NOT invent specific papers, experiments, or citations |
|
|
- Do NOT mention datasets, storage paths, or indexing systems |
|
|
- Produce a clean, coherent, human-readable explanation |
|
|
Style rules: |
|
|
- Clear, structured explanation |
|
|
- Intuitive where possible |
|
|
- Graduate-level physics understanding |
|
|
- Text-first (formulas only if they genuinely help) |
|
|
- No raw fragments, no broken sentences |
|
|
CONTEXT (retrieved evidence): |
|
|
{context} |
|
|
QUESTION: |
|
|
{question} |
|
|
Now produce a high-quality physics explanation that a serious learner would trust. |
|
|
""" |
|
|
|
|
|
out = llm( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=0.2, |
|
|
top_p=0.9, |
|
|
stop=["SOURCE:", "QUESTION:"], |
|
|
) |
|
|
|
|
|
return out["choices"][0]["text"].strip() |
|
|
|
|
|
@app.post("/v1/query", dependencies=[Security(verify_token)]) |
|
|
async def query(req: QueryRequest): |
|
|
|
|
|
context, sources = search_qdrant(req.question, req.top_k) |
|
|
|
|
|
if context is None: |
|
|
return {"error": "Qdrant search failed", "details": sources} |
|
|
|
|
|
if not context: |
|
|
return {"answer": "No relevant scientific data found.", "sources_used": 0} |
|
|
|
|
|
try: |
|
|
answer = await call_service_api(req.question, context, req.max_tokens) |
|
|
return { |
|
|
"answer": answer, |
|
|
"sources_used": sources, |
|
|
"source": "primary" |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Service API failed: {e}, falling back to local model...") |
|
|
|
|
|
|
|
|
try: |
|
|
answer = call_local_model(req.question, context, req.max_tokens) |
|
|
return { |
|
|
"answer": answer, |
|
|
"sources_used": sources, |
|
|
"source": "fallback" |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"answer": f"Error: Both primary and fallback failed. {str(e)}", |
|
|
"sources_used": 0, |
|
|
"source": "error" |
|
|
} |