feat: BYO-LLM pipeline — gap detection + Gemini cleaner + their LLM answers
Browse files
acra.py
CHANGED
|
@@ -265,10 +265,130 @@ async def query_pipeline(query, namespace, top_k, rerank, user_id, use_web=False
|
|
| 265 |
return {"answer": r.text.strip(), "sources": _local_sources(hits[:len(lc)]),
|
| 266 |
"complexity": cls, "retrieval_source": "local", "cost": _cost()}
|
| 267 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 268 |
async def run_acra_pipeline(mode, **kw):
|
| 269 |
if mode == "ingest":
|
| 270 |
return await ingest_pipeline(kw["texts"], kw["metadata"],
|
| 271 |
kw["namespace"], kw["user_id"])
|
|
|
|
|
|
|
|
|
|
|
|
|
| 272 |
return await query_pipeline(kw["query"], kw["namespace"], kw["top_k"],
|
| 273 |
kw["rerank"], kw["user_id"],
|
| 274 |
use_web=kw.get("use_web", False))
|
|
|
|
| 265 |
return {"answer": r.text.strip(), "sources": _local_sources(hits[:len(lc)]),
|
| 266 |
"complexity": cls, "retrieval_source": "local", "cost": _cost()}
|
| 267 |
|
| 268 |
+
|
| 269 |
+
async def byo_llm_pipeline(query, llm_endpoint, llm_api_key, llm_model, namespace, user_id):
|
| 270 |
+
"""
|
| 271 |
+
BYO-LLM pipeline:
|
| 272 |
+
1. Ask their LLM what it doesn't know (gap detection)
|
| 273 |
+
2. Jina fetches exactly those gaps
|
| 274 |
+
3. Gemini cleans raw web mess → clean bullet points
|
| 275 |
+
4. Their LLM answers with tiny clean context
|
| 276 |
+
"""
|
| 277 |
+
global _total_input_tokens, _total_output_tokens
|
| 278 |
+
_total_input_tokens = _total_output_tokens = 0
|
| 279 |
+
|
| 280 |
+
cls = classify_query(query)
|
| 281 |
+
level = cls["level"]
|
| 282 |
+
|
| 283 |
+
def _cost():
|
| 284 |
+
return calc_cost(_total_input_tokens, _total_output_tokens)
|
| 285 |
+
|
| 286 |
+
def _call_their_llm(messages):
|
| 287 |
+
"""Call their OpenAI-compatible endpoint."""
|
| 288 |
+
r = httpx.post(
|
| 289 |
+
llm_endpoint,
|
| 290 |
+
headers={
|
| 291 |
+
"Authorization": f"Bearer {llm_api_key}",
|
| 292 |
+
"Content-Type": "application/json"
|
| 293 |
+
},
|
| 294 |
+
json={
|
| 295 |
+
"model": llm_model,
|
| 296 |
+
"messages": messages,
|
| 297 |
+
"max_tokens": 300,
|
| 298 |
+
"temperature": 0.1,
|
| 299 |
+
},
|
| 300 |
+
timeout=30.0
|
| 301 |
+
)
|
| 302 |
+
r.raise_for_status()
|
| 303 |
+
return r.json()["choices"][0]["message"]["content"].strip()
|
| 304 |
+
|
| 305 |
+
# ── Step 1: Ask their LLM what it doesn't know ──────────────
|
| 306 |
+
gap_prompt = (
|
| 307 |
+
f"You will answer a user query. Before answering, identify ONLY what you are "
|
| 308 |
+
f"uncertain or lack recent data about.\n"
|
| 309 |
+
f"Reply with max 6 short lines like:\n"
|
| 310 |
+
f"- I don't know: [specific gap]\n\n"
|
| 311 |
+
f"Query: {query}\n\n"
|
| 312 |
+
f"What are your knowledge gaps? 6 lines max, be specific."
|
| 313 |
+
)
|
| 314 |
+
try:
|
| 315 |
+
gaps_text = _call_their_llm([{"role": "user", "content": gap_prompt}])
|
| 316 |
+
print(f"Gaps detected: {gaps_text[:200]}")
|
| 317 |
+
except Exception as e:
|
| 318 |
+
print(f"Gap detection failed: {e} — falling back to full query search")
|
| 319 |
+
gaps_text = query
|
| 320 |
+
|
| 321 |
+
# ── Step 2: Jina fetches exactly those gaps ──────────────────
|
| 322 |
+
# Extract gap lines and search each one
|
| 323 |
+
gap_lines = [l.strip().lstrip("- ").replace("I don't know:", "").replace("I am unsure about:", "").strip()
|
| 324 |
+
for l in gaps_text.split("\n") if l.strip() and len(l.strip()) > 10][:6]
|
| 325 |
+
|
| 326 |
+
all_hits = []
|
| 327 |
+
seen_urls = set()
|
| 328 |
+
for gap in gap_lines or [query]:
|
| 329 |
+
hits = web_search(gap, max_results=2)
|
| 330 |
+
for h in hits:
|
| 331 |
+
if h["url"] not in seen_urls:
|
| 332 |
+
seen_urls.add(h["url"])
|
| 333 |
+
all_hits.append(h)
|
| 334 |
+
|
| 335 |
+
if not all_hits:
|
| 336 |
+
# No web results — just send query directly to their LLM
|
| 337 |
+
try:
|
| 338 |
+
answer = _call_their_llm([{"role": "user", "content": query}])
|
| 339 |
+
except Exception as e:
|
| 340 |
+
answer = f"LLM call failed: {e}"
|
| 341 |
+
return {"answer": answer, "sources": [], "complexity": cls,
|
| 342 |
+
"retrieval_source": "model_knowledge", "cost": _cost()}
|
| 343 |
+
|
| 344 |
+
# ── Step 3: Gemini cleans raw web mess ───────────────────────
|
| 345 |
+
raw_ctx = "\n\n---\n\n".join(
|
| 346 |
+
f"Source: {h['title']}\n{h['snippet']}" for h in all_hits)
|
| 347 |
+
|
| 348 |
+
clean_prompt = (
|
| 349 |
+
f"You are a data cleaner. Extract ONLY facts relevant to this query.\n"
|
| 350 |
+
f"Format: bullet points, max 15 words per bullet, no fluff, no URLs.\n"
|
| 351 |
+
f"Output max 10 bullets total.\n\n"
|
| 352 |
+
f"Query: {query}\n\nRaw web data:\n{raw_ctx}\n\nClean bullets:"
|
| 353 |
+
)
|
| 354 |
+
clean_r = _generate(clean_prompt)
|
| 355 |
+
clean_ctx = clean_r.text.strip()
|
| 356 |
+
print(f"Cleaned context ({len(clean_ctx)} chars):\n{clean_ctx[:300]}")
|
| 357 |
+
|
| 358 |
+
# ── Step 4: Their LLM answers with clean context ─────────────
|
| 359 |
+
final_messages = [
|
| 360 |
+
{"role": "system", "content":
|
| 361 |
+
"You are a helpful assistant. Use the provided context to answer accurately. "
|
| 362 |
+
"If context doesn't help, use your own knowledge."},
|
| 363 |
+
{"role": "user", "content":
|
| 364 |
+
f"Context (verified web facts):\n{clean_ctx}\n\nQuestion: {query}\nAnswer:"}
|
| 365 |
+
]
|
| 366 |
+
try:
|
| 367 |
+
answer = _call_their_llm(final_messages)
|
| 368 |
+
except Exception as e:
|
| 369 |
+
# Fallback to Gemini if their LLM fails
|
| 370 |
+
print(f"Their LLM failed: {e} — falling back to Gemini")
|
| 371 |
+
r = _generate(f"Context:\n{clean_ctx}\n\nQuestion: {query}\nAnswer:")
|
| 372 |
+
answer = r.text.strip()
|
| 373 |
+
|
| 374 |
+
return {
|
| 375 |
+
"answer": answer,
|
| 376 |
+
"sources": [{"content": h["snippet"][:200],
|
| 377 |
+
"metadata": {"title": h["title"], "url": h["url"]},
|
| 378 |
+
"score": 1.0, "source": "web"} for h in all_hits],
|
| 379 |
+
"complexity": cls,
|
| 380 |
+
"retrieval_source": "byo_llm+web",
|
| 381 |
+
"cost": _cost(),
|
| 382 |
+
}
|
| 383 |
+
|
| 384 |
async def run_acra_pipeline(mode, **kw):
|
| 385 |
if mode == "ingest":
|
| 386 |
return await ingest_pipeline(kw["texts"], kw["metadata"],
|
| 387 |
kw["namespace"], kw["user_id"])
|
| 388 |
+
if kw.get("llm_endpoint"):
|
| 389 |
+
return await byo_llm_pipeline(
|
| 390 |
+
kw["query"], kw["llm_endpoint"], kw["llm_api_key"],
|
| 391 |
+
kw["llm_model"], kw["namespace"], kw["user_id"])
|
| 392 |
return await query_pipeline(kw["query"], kw["namespace"], kw["top_k"],
|
| 393 |
kw["rerank"], kw["user_id"],
|
| 394 |
use_web=kw.get("use_web", False))
|