BPO-Bench / app.py
haroldshipibm's picture
Upload folder using huggingface_hub
3cfa5b5 verified
"""Gradio UI for BPO Benchmark evaluation using CUGA SDK."""
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import gradio as gr
from agent import (
CUGAAgent,
LangfuseTracker,
LLMJudge,
check_keywords,
compare_api_calls,
compute_string_similarity,
compute_exact_match,
compute_final_score,
get_llm_judge,
get_provider_models,
get_provider_placeholder,
get_default_model,
is_langfuse_configured,
get_langfuse_host,
PROVIDER_CONFIGS,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def ensure_mcp_config():
"""Ensure MCP servers config file exists."""
mcp_dir = Path(__file__).parent / "mcp_servers"
mcp_dir.mkdir(exist_ok=True)
config_file = mcp_dir / "bpo.yaml"
if not config_file.exists():
config_file.write_text("""services:
- bpo:
url: http://127.0.0.1:8000/openapi.json
description: BPO recruiting analytics API
""")
return config_file
# Ensure config exists
ensure_mcp_config()
# Test suite definitions: label -> filename
TEST_SUITES = {
"Core (26 tasks)": "tasks.json",
"Type Mismatch (3 tasks)": "tasks_type_mismatch.json",
"HTTP Errors (4 tasks)": "tasks_http_errors.json",
"Schema Violations (4 tasks)": "tasks_schema_violations.json",
"Edge Cases (5 tasks)": "tasks_edge_cases.json",
"Undocumented Behaviors (3 tasks)": "tasks_undocumented.json",
}
def _find_data_dir() -> Optional[Path]:
"""Locate the data directory."""
candidates = [
Path(__file__).parent / "data",
Path("./data"),
Path("/home/user/app/data"),
]
for p in candidates:
if p.is_dir():
return p
return None
def _load_tasks_from_file(path: Path) -> List[Dict[str, Any]]:
"""Load test cases from a single JSON file."""
if not path.exists():
logger.warning(f"Task file not found: {path}")
return []
with open(path) as f:
data = json.load(f)
cases = []
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and "test_cases" in item:
cases.extend(item["test_cases"])
elif isinstance(item, dict):
cases.append(item)
return cases
def load_tasks(suite_labels: Optional[List[str]] = None) -> List[Dict[str, Any]]:
"""Load tasks from one or more test suite files.
Args:
suite_labels: List of suite labels to load (keys from TEST_SUITES).
If None, loads only the core suite.
"""
data_dir = _find_data_dir()
if data_dir is None:
logger.warning("Data directory not found")
return []
if suite_labels is None:
suite_labels = ["Core (26 tasks)"]
tasks = []
for label in suite_labels:
filename = TEST_SUITES.get(label)
if filename:
loaded = _load_tasks_from_file(data_dir / filename)
logger.info(f"Loaded {len(loaded)} tasks from {filename}")
tasks.extend(loaded)
return tasks
def get_available_suites() -> List[str]:
"""Return labels of test suites that actually exist on disk."""
data_dir = _find_data_dir()
if data_dir is None:
return []
return [label for label, fn in TEST_SUITES.items() if (data_dir / fn).exists()]
# Load core tasks at startup for the task list display
AVAILABLE_SUITES = get_available_suites()
ALL_TASKS_CACHE: Dict[str, List[Dict[str, Any]]] = {}
for label in AVAILABLE_SUITES:
ALL_TASKS_CACHE[label] = load_tasks([label])
TASKS = ALL_TASKS_CACHE.get("Core (26 tasks)", [])
total_available = sum(len(v) for v in ALL_TASKS_CACHE.values())
logger.info(f"Loaded {len(TASKS)} core tasks, {total_available} total across {len(AVAILABLE_SUITES)} suites")
async def _setup_agent(api_key: str, provider: str, model: str) -> CUGAAgent:
"""Initialize and return CUGA agent."""
agent = CUGAAgent(
api_key=api_key,
provider=provider.lower(),
model=model if model.strip() else None,
)
await agent.setup()
return agent
async def _run_single_task(
agent: CUGAAgent, task: Dict, task_index: int,
llm_judge: Any, llm_judge_requested: bool,
langfuse: Any,
) -> Dict[str, Any]:
"""Run a single evaluation task and return the result."""
task_name = task.get("name", f"task_{task_index+1}")
query = task.get("intent", "")
thread_id = f"eval_{task_name}_{task_index}"
try:
response, tool_calls = await agent.run(query, thread_id=thread_id)
# Get expected output and keywords
expected_output = task.get("expected_output", {})
expected_keywords = expected_output.get("keywords", [])
expected_answer = expected_output.get("response", "") or expected_output.get("answer", "")
tool_calls_expected = expected_output.get("tool_calls", []) or expected_output.get("apis", [])
expected_apis = []
for tc in tool_calls_expected:
if isinstance(tc, dict):
expected_apis.append(tc.get("name", ""))
elif isinstance(tc, str):
expected_apis.append(tc)
# Compute metrics
keyword_check = check_keywords(response, expected_keywords)
similarity = compute_string_similarity(response, expected_answer) if expected_answer else 0.0
exact_match = compute_exact_match(response, expected_answer) if expected_answer else False
# Extract tool names
tool_names = []
for tc in tool_calls:
if isinstance(tc, dict):
tool_names.append(tc.get("name", str(tc)))
else:
tool_names.append(str(tc))
# Compare API calls
api_comparison = compare_api_calls(tool_names, expected_apis) if expected_apis else {
"missing": [], "extra": [], "correct": 0, "expected_count": 0,
"called_count": len(tool_names), "all_expected_called": True,
}
# LLM Judge evaluation
llm_judge_score = None
llm_judge_rationale = None
if llm_judge and expected_answer:
try:
judge_result = await llm_judge.judge(response, expected_answer, query)
llm_judge_score = judge_result.get("score")
llm_judge_rationale = judge_result.get("rationale", "")
except Exception as e:
logger.warning(f"LLM judge failed for {task_name}: {e}")
# Compute final score (matches main repo logic)
final_score = compute_final_score(
exact_match=exact_match,
similarity=similarity,
llm_judge_score=llm_judge_score,
llm_judge_requested=llm_judge_requested,
agent_output=response,
apis_missing=api_comparison["missing"],
require_api_match=True,
)
result = {
"task_id": task_name,
"difficulty": task.get("difficulty", "unknown"),
"intent": query,
"response": response,
"expected_answer": expected_answer,
"expected_keywords": expected_keywords,
"found_keywords": keyword_check["found_keywords"],
"missing_keywords": keyword_check["missing_keywords"],
"match_rate": keyword_check["match_rate"],
"similarity": similarity,
"exact_match": exact_match,
"llm_judge_score": llm_judge_score,
"llm_judge_rationale": llm_judge_rationale,
"final_score": final_score,
"passed": final_score == 1,
"tool_calls": tool_names,
"expected_apis": expected_apis,
"apis_missing": api_comparison["missing"],
"apis_extra": api_comparison["extra"],
"apis_correct": api_comparison["correct"],
}
# Score in Langfuse
scores = {
"similarity": similarity,
"keyword_match": keyword_check["match_rate"],
"final_score": float(final_score),
}
if llm_judge_score is not None:
scores["llm_judge"] = llm_judge_score
langfuse.score_task(task_name, scores)
logger.info(
f"Task {task_name}: {'PASS' if result['passed'] else 'FAIL'} "
f"(sim={similarity:.2f}, kw={keyword_check['match_rate']:.1%}"
f"{f', judge={llm_judge_score:.2f}' if llm_judge_score is not None else ''})"
)
return result
except Exception as e:
logger.exception(f"Error in task {task_name}")
return {
"task_id": task_name,
"difficulty": task.get("difficulty", "unknown"),
"intent": task.get("intent", ""),
"response": f"Error: {e}",
"passed": False,
"final_score": 0,
"similarity": 0.0,
"exact_match": False,
"match_rate": 0.0,
"tool_calls": [],
"error": str(e),
}
def _build_results_markdown(results: List[Dict], langfuse: Any) -> str:
"""Build markdown summary from evaluation results."""
total = len(results)
passed = sum(1 for r in results if r.get("passed", False))
avg_similarity = sum(r.get("similarity", 0) for r in results) / total if total else 0
avg_match = sum(r.get("match_rate", 0) for r in results) / total if total else 0
exact_matches = sum(1 for r in results if r.get("exact_match", False))
final_score_passes = sum(1 for r in results if r.get("final_score") == 1)
keyword_full_matches = sum(1 for r in results if r.get("match_rate", 0) == 1.0)
tasks_with_tools = sum(1 for r in results if r.get("tool_calls"))
# LLM Judge metrics
judge_scores = [r.get("llm_judge_score") for r in results if r.get("llm_judge_score") is not None]
avg_judge_score = sum(judge_scores) / len(judge_scores) if judge_scores else None
judge_passes = sum(1 for s in judge_scores if s >= 0.85) if judge_scores else 0
# API metrics
tasks_with_expected_apis = [r for r in results if r.get("expected_apis")]
api_correct = sum(1 for r in tasks_with_expected_apis if not r.get("apis_missing"))
api_accuracy = api_correct / len(tasks_with_expected_apis) if tasks_with_expected_apis else None
summary = {
"total_tasks": total,
"passed": passed,
"pass_rate": passed / total if total else 0,
"avg_similarity": avg_similarity,
"avg_keyword_match": avg_match,
"exact_matches": exact_matches,
"final_score_passes": final_score_passes,
"keyword_full_matches": keyword_full_matches,
"avg_llm_judge_score": avg_judge_score,
"api_accuracy": api_accuracy,
}
# End Langfuse trace
langfuse.end_trace(summary)
# Group by difficulty
by_difficulty = {}
for r in results:
diff = r.get("difficulty", "unknown")
if diff not in by_difficulty:
by_difficulty[diff] = {"total": 0, "passed": 0}
by_difficulty[diff]["total"] += 1
if r.get("passed", False):
by_difficulty[diff]["passed"] += 1
# Build markdown output
md = "## Evaluation Complete\n\n"
md += f"**Total Tasks:** {total}\n"
md += f"**Final Score:** {final_score_passes}/{total} ({100*final_score_passes/total:.1f}%)\n"
md += f"**Exact Matches:** {exact_matches} ({100*exact_matches/total:.1f}%)\n"
md += f"**Avg Similarity:** {avg_similarity:.2f}\n"
md += f"**Keyword Match:** {avg_match*100:.1f}% avg ({keyword_full_matches}/{total} full matches)\n"
if avg_judge_score is not None:
md += f"**LLM Judge:** {len(judge_scores)} tasks, avg={avg_judge_score:.2f} ({judge_passes}/{len(judge_scores)} pass)\n"
if api_accuracy is not None:
md += f"**API Accuracy:** {api_correct}/{len(tasks_with_expected_apis)} ({api_accuracy*100:.1f}%)\n"
md += f"**Tasks with Tool Calls:** {tasks_with_tools}/{total}\n"
if langfuse.enabled:
md += "\n*Langfuse tracking enabled*\n"
elif langfuse.init_error:
md += f"\n*Langfuse error: {langfuse.init_error}*\n"
md += "\n"
# By difficulty breakdown
if by_difficulty:
md += "### By Difficulty\n"
for diff, stats in sorted(by_difficulty.items()):
rate = stats["passed"] / stats["total"] * 100 if stats["total"] else 0
md += f"- **{diff}**: {stats['passed']}/{stats['total']} ({rate:.1f}%)\n"
md += "\n"
md += "---\n\n"
# Individual results
for r in results:
status = "PASS" if r.get("passed") else "FAIL"
md += f"### {status} - {r.get('task_id', 'unknown')} ({r.get('difficulty', 'unknown')})\n\n"
md += f"**Query:** {r.get('intent', '')}\n\n"
response_text = r.get("response", "")
if len(response_text) > 500:
response_text = response_text[:500] + "..."
md += f"**Response:** {response_text}\n\n"
# Enhanced metrics display
md += "**Metrics:**\n"
md += f"- **Final Score: {'PASS' if r.get('final_score') == 1 else 'FAIL'}**\n"
md += f"- Similarity: {r.get('similarity', 0)*100:.1f}%\n"
md += f"- Exact Match: {'Yes' if r.get('exact_match') else 'No'}\n"
if r.get("llm_judge_score") is not None:
md += f"- LLM Judge: {r['llm_judge_score']:.2f}\n"
md += f"- Keyword Match: {r.get('match_rate', 0)*100:.1f}%\n"
md += "\n"
if r.get("missing_keywords"):
missing = r["missing_keywords"][:5]
md += f"**Missing keywords:** {', '.join(missing)}"
if len(r.get("missing_keywords", [])) > 5:
md += f" (+{len(r['missing_keywords']) - 5} more)"
md += "\n\n"
# API metrics
if r.get("expected_apis"):
correct = r.get("apis_correct", 0)
expected = len(r.get("expected_apis", []))
api_status = "PASS" if not r.get("apis_missing") else "FAIL"
md += f"- API Accuracy: {correct}/{expected} ({api_status})\n"
if r.get("tool_calls"):
md += f"- Tools used: {', '.join(r['tool_calls'])}\n"
if r.get("apis_missing"):
md += f"- Missing APIs: {', '.join(r['apis_missing'])}\n"
md += "\n"
if r.get("error"):
md += f"**Error:** {r['error']}\n\n"
md += "---\n\n"
return md
def run_evaluation(api_key, provider, model, task_ids, test_suites):
"""Run CUGA SDK evaluation, yielding live progress to the UI."""
if not api_key:
yield "Please provide an API key", ""
return
# Load tasks from selected suites
if not test_suites:
test_suites = ["Core (26 tasks)"]
all_tasks = load_tasks(test_suites)
if not all_tasks:
yield "No tasks loaded. Check that task files exist in the data directory.", ""
return
# Parse task IDs to filter within loaded tasks
task_ids_str = task_ids.strip()
if task_ids_str.lower() == "all" or not task_ids_str:
tasks_to_run = all_tasks
else:
try:
ids = [s.strip() for s in task_ids_str.replace(",", " ").split()]
tasks_to_run = []
for task in all_tasks:
task_name = task.get("name", "")
task_num = task_name.replace("task_", "") if task_name.startswith("task_") else task_name
if task_name in ids or task_num in ids:
tasks_to_run.append(task)
except Exception as e:
yield f"Error parsing task IDs: {e}", ""
return
if not tasks_to_run:
yield "No matching tasks found.", ""
return
total = len(tasks_to_run)
yield f"**Initializing CUGA agent...** (0/{total} tasks)", ""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
agent = loop.run_until_complete(_setup_agent(api_key, provider, model))
logger.info("CUGA agent initialized successfully")
langfuse = LangfuseTracker()
langfuse.start_trace(
name="bpo_evaluation",
metadata={
"provider": provider,
"model": model or get_default_model(provider),
"num_tasks": total,
},
)
# Initialize LLM judge (only for Groq provider currently)
llm_judge = None
llm_judge_requested = False
if provider.lower() == "groq":
try:
llm_judge = get_llm_judge(api_key=api_key, provider="groq")
llm_judge_requested = True
logger.info("LLM judge initialized")
except Exception as e:
logger.warning(f"Failed to initialize LLM judge: {e}")
# Run tasks, yielding progress after each one
results = []
for i, task in enumerate(tasks_to_run):
task_name = task.get("name", f"task_{i+1}")
logger.info(f"Evaluating task {i+1}/{total}: {task_name}")
yield f"**Running {task_name}...** ({i}/{total} complete)", ""
result = loop.run_until_complete(
_run_single_task(agent, task, i, llm_judge, llm_judge_requested, langfuse)
)
results.append(result)
# Small delay between tasks
if len(results) < total:
loop.run_until_complete(asyncio.sleep(0.5))
# Clean up
agent.close()
md = _build_results_markdown(results, langfuse)
yield md, json.dumps(results, indent=2)
except Exception as e:
logger.exception("Evaluation failed")
yield f"Evaluation failed: {e}", ""
finally:
loop.close()
def get_task_list():
"""Get a formatted list of available tasks grouped by suite."""
if not ALL_TASKS_CACHE:
return "No tasks loaded"
lines = []
for label in AVAILABLE_SUITES:
tasks = ALL_TASKS_CACHE.get(label, [])
if not tasks:
continue
lines.append(f"### {label}\n")
for task in tasks:
name = task.get("name", "unknown")
diff = task.get("difficulty", "unknown")
intent = task.get("intent", "")
if len(intent) > 60:
intent = intent[:60] + "..."
lines.append(f"- **{name}** ({diff}): {intent}")
lines.append("")
return "\n".join(lines)
def update_model_choices(provider: str):
"""Update model dropdown choices based on provider."""
models = get_provider_models(provider)
default = get_default_model(provider)
return gr.update(choices=models, value=default)
def update_api_key_placeholder(provider: str):
"""Update API key placeholder based on provider."""
placeholder = get_provider_placeholder(provider)
return gr.update(placeholder=placeholder)
# Gradio Interface
with gr.Blocks(title="BPO Benchmark") as demo:
gr.Markdown("# BPO Benchmark Evaluation")
gr.Markdown(
"Evaluate **CUGA SDK** on BPO recruiting analytics tasks with 32 tool APIs. "
"Enter your API key, select tasks, and run the evaluation."
)
with gr.Row():
with gr.Column(scale=1):
provider = gr.Dropdown(
choices=["Groq", "OpenAI"],
value="Groq",
label="LLM Provider"
)
api_key = gr.Textbox(
label="API Key",
type="password",
placeholder="gsk_... (Groq)"
)
model = gr.Dropdown(
choices=get_provider_models("groq"),
value=get_default_model("groq"),
label="Model",
allow_custom_value=True,
)
test_suites = gr.CheckboxGroup(
choices=AVAILABLE_SUITES,
value=["Core (26 tasks)"],
label="Test Suites",
info=f"{total_available} tasks across {len(AVAILABLE_SUITES)} suites",
)
task_ids = gr.Textbox(
label="Task IDs (optional filter)",
placeholder="1 2 3 or task_27 task_28 (leave empty for all in selected suites)",
info="Filter within selected suites by ID"
)
run_btn = gr.Button("Run Evaluation", variant="primary", size="lg")
with gr.Accordion("Available Tasks", open=False):
gr.Markdown(get_task_list())
with gr.Accordion("Environment Info", open=False):
langfuse_status = "Configured" if is_langfuse_configured() else "Not configured"
public_key_set = "Yes" if os.environ.get("LANGFUSE_PUBLIC_KEY") else "No"
secret_key_set = "Yes" if os.environ.get("LANGFUSE_SECRET_KEY") else "No"
langfuse_host = get_langfuse_host()
gr.Markdown(f"""
**Langfuse Tracking:** {langfuse_status}
- LANGFUSE_PUBLIC_KEY set: {public_key_set}
- LANGFUSE_SECRET_KEY set: {secret_key_set}
- Host: {langfuse_host}
To enable Langfuse tracking in HuggingFace:
1. Go to Space Settings > Variables and secrets
2. Add **Secrets** (not variables):
- `LANGFUSE_PUBLIC_KEY`
- `LANGFUSE_SECRET_KEY`
- `LANGFUSE_HOST` (e.g., `https://us.cloud.langfuse.com`)
3. Restart the Space for changes to take effect
*Connection will be tested when you run an evaluation*
""")
with gr.Column(scale=2):
output = gr.Markdown(label="Results")
with gr.Accordion("Raw JSON Results", open=False):
raw_json = gr.Code(label="Raw JSON", language="json")
# Event handlers
provider.change(
fn=update_model_choices,
inputs=[provider],
outputs=[model]
)
provider.change(
fn=update_api_key_placeholder,
inputs=[provider],
outputs=[api_key]
)
run_btn.click(
fn=run_evaluation,
inputs=[api_key, provider, model, task_ids, test_suites],
outputs=[output, raw_json]
)
gr.Markdown("""
---
**Agent:** [CUGA SDK](https://pypi.org/project/cuga/)
| **Dataset:** [ibm-research/BPO-Bench](https://huggingface.co/datasets/ibm-research/BPO-Bench)
""")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)