github-actions
Sync from GitHub 2025-12-17T12:18:53Z
5a3b322
from __future__ import annotations
"""
Chat-style agent using Gemini for planning + explanation, deterministic tools for retrieval/rerank.
Set GOOGLE_API_KEY in your environment.
"""
import json
import os
from typing import Callable
import pandas as pd
from data.catalog_loader import load_catalog
from recommenders.bm25 import BM25Recommender
from recommenders.vector_recommender import VectorRecommender
from retrieval.vector_index import VectorIndex
from models.embedding_model import EmbeddingModel
from rerankers.cross_encoder import CrossEncoderReranker
from tools.query_plan_tool_llm import build_query_plan_llm
from tools.query_plan_tool import build_query_plan as deterministic_plan
from tools.retrieve_tool import retrieve_candidates
from tools.rerank_tool import rerank_candidates
from tools.constraints_tool import apply_constraints
from tools.explain_tool import explain
from schemas.query_plan import QueryPlan
def load_resources():
df_catalog, _, _ = load_catalog("data/catalog_docs_rich.jsonl")
bm25 = BM25Recommender(df_catalog)
embed = EmbeddingModel("BAAI/bge-small-en-v1.5")
index = VectorIndex.load("data/faiss_index/index_bge.faiss")
with open("data/embeddings_bge/assessment_ids.json") as f:
ids = json.load(f)
vec = VectorRecommender(embed, index, df_catalog, ids, k_candidates=200)
catalog_by_id = {row["assessment_id"]: row for _, row in df_catalog.iterrows()}
return df_catalog, bm25, vec, catalog_by_id
def make_catalog_lookup(df_catalog: pd.DataFrame) -> Callable[[str], dict]:
cat = df_catalog.set_index("assessment_id")
def lookup(aid: str) -> dict:
if aid in cat.index:
return cat.loc[aid].to_dict()
return {}
return lookup
def _maybe_clarify(plan: QueryPlan, cand_count: int, topn: int) -> str | None:
# LLM-flagged clarification
if plan.needs_clarification and plan.clarifying_question:
return plan.clarifying_question
# Coverage-based triggers
if cand_count < max(10, int(0.25 * topn)):
return "Results look thin. Clarify: are you looking for (1) personality/culture fit, (2) leadership judgment (SJT), or (3) role capability?"
if plan.intent in {"BEHAVIORAL", "UNKNOWN", "MIXED"} and cand_count < max(20, int(0.5 * topn)):
return "For culture/behavioral focus, choose: (1) personality/culture fit, (2) leadership judgment (SJT), or (3) role capability. Please pick one."
return None
def run_chat(
user_text: str,
vocab_path: str = "data/catalog_role_vocab.json",
model_name: str = "gemini-2.5-flash-lite",
clarification_answer: str | None = None,
topn: int = 200,
verbose: bool = False,
):
vocab = json.load(open(vocab_path)) if vocab_path and os.path.exists(vocab_path) else {}
df_catalog, bm25, vec, catalog_by_id = load_resources()
catalog_lookup = make_catalog_lookup(df_catalog)
trace_id = f"trace-{abs(hash(user_text))}"
log = {"trace_id": trace_id, "raw_query": user_text}
# Plan with LLM; fallback deterministic if LLM fails
try:
plan = build_query_plan_llm(user_text, vocab=vocab, model_name=model_name)
QueryPlan.model_validate(plan.dict()) # schema guard
log["plan_source"] = "llm"
except Exception as e:
plan = deterministic_plan(user_text, vocab=vocab)
log["plan_source"] = f"deterministic (llm_fail={str(e)})"
log["query_plan"] = plan.dict()
# Retrieve union
cand_set = retrieve_candidates(plan, bm25, vec, topn=topn, catalog_df=df_catalog)
if verbose:
log["candidates"] = [c.model_dump() for c in cand_set.candidates[:10]]
# Clarification loop
question = _maybe_clarify(plan, cand_count=len(cand_set.candidates), topn=topn)
if question and not clarification_answer:
log["clarification"] = question
if verbose:
print(json.dumps(log, indent=2))
return f"Clarification needed: {question}"
if question and clarification_answer:
clarified_text = f"{user_text}\nUser clarification: {clarification_answer}"
try:
plan = build_query_plan_llm(clarified_text, vocab=vocab, model_name=model_name)
QueryPlan.model_validate(plan.dict())
except Exception:
plan = deterministic_plan(clarified_text, vocab=vocab)
log["query_plan_clarified"] = plan.dict()
cand_set = retrieve_candidates(plan, bm25, vec, topn=topn, catalog_df=df_catalog)
if verbose:
log["candidates_clarified"] = [c.model_dump() for c in cand_set.candidates[:10]]
# Rerank
reranker = CrossEncoderReranker(model_name="models/reranker_crossenc/v0.1.0")
ranked = rerank_candidates(plan, cand_set, reranker, df_catalog, k=10)
log["rerank"] = [item.model_dump() for item in ranked.items]
# Constraints
final_list = apply_constraints(plan, ranked, catalog_by_id, k=10)
log["final"] = [item.model_dump() for item in final_list.items]
# Explain
summary = explain(plan, final_list, catalog_lookup)
log["summary"] = summary
# Compact output: top-10 with metadata
final_results = []
for item in final_list.items:
meta = catalog_lookup(item.assessment_id)
final_results.append(
{
"assessment_id": item.assessment_id,
"score": item.score,
"name": meta.get("name"),
"url": meta.get("url"),
"test_type_full": meta.get("test_type_full") or meta.get("test_type"),
"duration": meta.get("duration_minutes") or meta.get("duration"),
}
)
if verbose:
log["final_results"] = final_results
print(json.dumps(log, indent=2))
else:
print(json.dumps({"trace_id": trace_id, "final_results": final_results}, indent=2))
return summary
if __name__ == "__main__":
import sys
if "GOOGLE_API_KEY" not in os.environ:
print("Please set GOOGLE_API_KEY for Gemini.")
user_text = " ".join(sys.argv[1:]) or "Find a 1 hour culture fit assessment for a COO"
print(run_chat(user_text, verbose=False))