"""Gradio UI for BPO Benchmark evaluation using CUGA SDK.""" import asyncio import json import logging import os from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import gradio as gr from agent import ( CUGAAgent, LangfuseTracker, LLMJudge, check_keywords, compare_api_calls, compute_string_similarity, compute_exact_match, compute_final_score, get_llm_judge, get_provider_models, get_provider_placeholder, get_default_model, is_langfuse_configured, get_langfuse_host, PROVIDER_CONFIGS, ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def ensure_mcp_config(): """Ensure MCP servers config file exists.""" mcp_dir = Path(__file__).parent / "mcp_servers" mcp_dir.mkdir(exist_ok=True) config_file = mcp_dir / "bpo.yaml" if not config_file.exists(): config_file.write_text("""services: - bpo: url: http://127.0.0.1:8000/openapi.json description: BPO recruiting analytics API """) return config_file # Ensure config exists ensure_mcp_config() # Test suite definitions: label -> filename TEST_SUITES = { "Core (26 tasks)": "tasks.json", "Type Mismatch (3 tasks)": "tasks_type_mismatch.json", "HTTP Errors (4 tasks)": "tasks_http_errors.json", "Schema Violations (4 tasks)": "tasks_schema_violations.json", "Edge Cases (5 tasks)": "tasks_edge_cases.json", "Undocumented Behaviors (3 tasks)": "tasks_undocumented.json", } def _find_data_dir() -> Optional[Path]: """Locate the data directory.""" candidates = [ Path(__file__).parent / "data", Path("./data"), Path("/home/user/app/data"), ] for p in candidates: if p.is_dir(): return p return None def _load_tasks_from_file(path: Path) -> List[Dict[str, Any]]: """Load test cases from a single JSON file.""" if not path.exists(): logger.warning(f"Task file not found: {path}") return [] with open(path) as f: data = json.load(f) cases = [] if isinstance(data, list): for item in data: if isinstance(item, dict) and "test_cases" in item: cases.extend(item["test_cases"]) elif isinstance(item, dict): cases.append(item) return cases def load_tasks(suite_labels: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Load tasks from one or more test suite files. Args: suite_labels: List of suite labels to load (keys from TEST_SUITES). If None, loads only the core suite. """ data_dir = _find_data_dir() if data_dir is None: logger.warning("Data directory not found") return [] if suite_labels is None: suite_labels = ["Core (26 tasks)"] tasks = [] for label in suite_labels: filename = TEST_SUITES.get(label) if filename: loaded = _load_tasks_from_file(data_dir / filename) logger.info(f"Loaded {len(loaded)} tasks from {filename}") tasks.extend(loaded) return tasks def get_available_suites() -> List[str]: """Return labels of test suites that actually exist on disk.""" data_dir = _find_data_dir() if data_dir is None: return [] return [label for label, fn in TEST_SUITES.items() if (data_dir / fn).exists()] # Load core tasks at startup for the task list display AVAILABLE_SUITES = get_available_suites() ALL_TASKS_CACHE: Dict[str, List[Dict[str, Any]]] = {} for label in AVAILABLE_SUITES: ALL_TASKS_CACHE[label] = load_tasks([label]) TASKS = ALL_TASKS_CACHE.get("Core (26 tasks)", []) total_available = sum(len(v) for v in ALL_TASKS_CACHE.values()) logger.info(f"Loaded {len(TASKS)} core tasks, {total_available} total across {len(AVAILABLE_SUITES)} suites") async def _setup_agent(api_key: str, provider: str, model: str) -> CUGAAgent: """Initialize and return CUGA agent.""" agent = CUGAAgent( api_key=api_key, provider=provider.lower(), model=model if model.strip() else None, ) await agent.setup() return agent async def _run_single_task( agent: CUGAAgent, task: Dict, task_index: int, llm_judge: Any, llm_judge_requested: bool, langfuse: Any, ) -> Dict[str, Any]: """Run a single evaluation task and return the result.""" task_name = task.get("name", f"task_{task_index+1}") query = task.get("intent", "") thread_id = f"eval_{task_name}_{task_index}" try: response, tool_calls = await agent.run(query, thread_id=thread_id) # Get expected output and keywords expected_output = task.get("expected_output", {}) expected_keywords = expected_output.get("keywords", []) expected_answer = expected_output.get("response", "") or expected_output.get("answer", "") tool_calls_expected = expected_output.get("tool_calls", []) or expected_output.get("apis", []) expected_apis = [] for tc in tool_calls_expected: if isinstance(tc, dict): expected_apis.append(tc.get("name", "")) elif isinstance(tc, str): expected_apis.append(tc) # Compute metrics keyword_check = check_keywords(response, expected_keywords) similarity = compute_string_similarity(response, expected_answer) if expected_answer else 0.0 exact_match = compute_exact_match(response, expected_answer) if expected_answer else False # Extract tool names tool_names = [] for tc in tool_calls: if isinstance(tc, dict): tool_names.append(tc.get("name", str(tc))) else: tool_names.append(str(tc)) # Compare API calls api_comparison = compare_api_calls(tool_names, expected_apis) if expected_apis else { "missing": [], "extra": [], "correct": 0, "expected_count": 0, "called_count": len(tool_names), "all_expected_called": True, } # LLM Judge evaluation llm_judge_score = None llm_judge_rationale = None if llm_judge and expected_answer: try: judge_result = await llm_judge.judge(response, expected_answer, query) llm_judge_score = judge_result.get("score") llm_judge_rationale = judge_result.get("rationale", "") except Exception as e: logger.warning(f"LLM judge failed for {task_name}: {e}") # Compute final score (matches main repo logic) final_score = compute_final_score( exact_match=exact_match, similarity=similarity, llm_judge_score=llm_judge_score, llm_judge_requested=llm_judge_requested, agent_output=response, apis_missing=api_comparison["missing"], require_api_match=True, ) result = { "task_id": task_name, "difficulty": task.get("difficulty", "unknown"), "intent": query, "response": response, "expected_answer": expected_answer, "expected_keywords": expected_keywords, "found_keywords": keyword_check["found_keywords"], "missing_keywords": keyword_check["missing_keywords"], "match_rate": keyword_check["match_rate"], "similarity": similarity, "exact_match": exact_match, "llm_judge_score": llm_judge_score, "llm_judge_rationale": llm_judge_rationale, "final_score": final_score, "passed": final_score == 1, "tool_calls": tool_names, "expected_apis": expected_apis, "apis_missing": api_comparison["missing"], "apis_extra": api_comparison["extra"], "apis_correct": api_comparison["correct"], } # Score in Langfuse scores = { "similarity": similarity, "keyword_match": keyword_check["match_rate"], "final_score": float(final_score), } if llm_judge_score is not None: scores["llm_judge"] = llm_judge_score langfuse.score_task(task_name, scores) logger.info( f"Task {task_name}: {'PASS' if result['passed'] else 'FAIL'} " f"(sim={similarity:.2f}, kw={keyword_check['match_rate']:.1%}" f"{f', judge={llm_judge_score:.2f}' if llm_judge_score is not None else ''})" ) return result except Exception as e: logger.exception(f"Error in task {task_name}") return { "task_id": task_name, "difficulty": task.get("difficulty", "unknown"), "intent": task.get("intent", ""), "response": f"Error: {e}", "passed": False, "final_score": 0, "similarity": 0.0, "exact_match": False, "match_rate": 0.0, "tool_calls": [], "error": str(e), } def _build_results_markdown(results: List[Dict], langfuse: Any) -> str: """Build markdown summary from evaluation results.""" total = len(results) passed = sum(1 for r in results if r.get("passed", False)) avg_similarity = sum(r.get("similarity", 0) for r in results) / total if total else 0 avg_match = sum(r.get("match_rate", 0) for r in results) / total if total else 0 exact_matches = sum(1 for r in results if r.get("exact_match", False)) final_score_passes = sum(1 for r in results if r.get("final_score") == 1) keyword_full_matches = sum(1 for r in results if r.get("match_rate", 0) == 1.0) tasks_with_tools = sum(1 for r in results if r.get("tool_calls")) # LLM Judge metrics judge_scores = [r.get("llm_judge_score") for r in results if r.get("llm_judge_score") is not None] avg_judge_score = sum(judge_scores) / len(judge_scores) if judge_scores else None judge_passes = sum(1 for s in judge_scores if s >= 0.85) if judge_scores else 0 # API metrics tasks_with_expected_apis = [r for r in results if r.get("expected_apis")] api_correct = sum(1 for r in tasks_with_expected_apis if not r.get("apis_missing")) api_accuracy = api_correct / len(tasks_with_expected_apis) if tasks_with_expected_apis else None summary = { "total_tasks": total, "passed": passed, "pass_rate": passed / total if total else 0, "avg_similarity": avg_similarity, "avg_keyword_match": avg_match, "exact_matches": exact_matches, "final_score_passes": final_score_passes, "keyword_full_matches": keyword_full_matches, "avg_llm_judge_score": avg_judge_score, "api_accuracy": api_accuracy, } # End Langfuse trace langfuse.end_trace(summary) # Group by difficulty by_difficulty = {} for r in results: diff = r.get("difficulty", "unknown") if diff not in by_difficulty: by_difficulty[diff] = {"total": 0, "passed": 0} by_difficulty[diff]["total"] += 1 if r.get("passed", False): by_difficulty[diff]["passed"] += 1 # Build markdown output md = "## Evaluation Complete\n\n" md += f"**Total Tasks:** {total}\n" md += f"**Final Score:** {final_score_passes}/{total} ({100*final_score_passes/total:.1f}%)\n" md += f"**Exact Matches:** {exact_matches} ({100*exact_matches/total:.1f}%)\n" md += f"**Avg Similarity:** {avg_similarity:.2f}\n" md += f"**Keyword Match:** {avg_match*100:.1f}% avg ({keyword_full_matches}/{total} full matches)\n" if avg_judge_score is not None: md += f"**LLM Judge:** {len(judge_scores)} tasks, avg={avg_judge_score:.2f} ({judge_passes}/{len(judge_scores)} pass)\n" if api_accuracy is not None: md += f"**API Accuracy:** {api_correct}/{len(tasks_with_expected_apis)} ({api_accuracy*100:.1f}%)\n" md += f"**Tasks with Tool Calls:** {tasks_with_tools}/{total}\n" if langfuse.enabled: md += "\n*Langfuse tracking enabled*\n" elif langfuse.init_error: md += f"\n*Langfuse error: {langfuse.init_error}*\n" md += "\n" # By difficulty breakdown if by_difficulty: md += "### By Difficulty\n" for diff, stats in sorted(by_difficulty.items()): rate = stats["passed"] / stats["total"] * 100 if stats["total"] else 0 md += f"- **{diff}**: {stats['passed']}/{stats['total']} ({rate:.1f}%)\n" md += "\n" md += "---\n\n" # Individual results for r in results: status = "PASS" if r.get("passed") else "FAIL" md += f"### {status} - {r.get('task_id', 'unknown')} ({r.get('difficulty', 'unknown')})\n\n" md += f"**Query:** {r.get('intent', '')}\n\n" response_text = r.get("response", "") if len(response_text) > 500: response_text = response_text[:500] + "..." md += f"**Response:** {response_text}\n\n" # Enhanced metrics display md += "**Metrics:**\n" md += f"- **Final Score: {'PASS' if r.get('final_score') == 1 else 'FAIL'}**\n" md += f"- Similarity: {r.get('similarity', 0)*100:.1f}%\n" md += f"- Exact Match: {'Yes' if r.get('exact_match') else 'No'}\n" if r.get("llm_judge_score") is not None: md += f"- LLM Judge: {r['llm_judge_score']:.2f}\n" md += f"- Keyword Match: {r.get('match_rate', 0)*100:.1f}%\n" md += "\n" if r.get("missing_keywords"): missing = r["missing_keywords"][:5] md += f"**Missing keywords:** {', '.join(missing)}" if len(r.get("missing_keywords", [])) > 5: md += f" (+{len(r['missing_keywords']) - 5} more)" md += "\n\n" # API metrics if r.get("expected_apis"): correct = r.get("apis_correct", 0) expected = len(r.get("expected_apis", [])) api_status = "PASS" if not r.get("apis_missing") else "FAIL" md += f"- API Accuracy: {correct}/{expected} ({api_status})\n" if r.get("tool_calls"): md += f"- Tools used: {', '.join(r['tool_calls'])}\n" if r.get("apis_missing"): md += f"- Missing APIs: {', '.join(r['apis_missing'])}\n" md += "\n" if r.get("error"): md += f"**Error:** {r['error']}\n\n" md += "---\n\n" return md def run_evaluation(api_key, provider, model, task_ids, test_suites): """Run CUGA SDK evaluation, yielding live progress to the UI.""" if not api_key: yield "Please provide an API key", "" return # Load tasks from selected suites if not test_suites: test_suites = ["Core (26 tasks)"] all_tasks = load_tasks(test_suites) if not all_tasks: yield "No tasks loaded. Check that task files exist in the data directory.", "" return # Parse task IDs to filter within loaded tasks task_ids_str = task_ids.strip() if task_ids_str.lower() == "all" or not task_ids_str: tasks_to_run = all_tasks else: try: ids = [s.strip() for s in task_ids_str.replace(",", " ").split()] tasks_to_run = [] for task in all_tasks: task_name = task.get("name", "") task_num = task_name.replace("task_", "") if task_name.startswith("task_") else task_name if task_name in ids or task_num in ids: tasks_to_run.append(task) except Exception as e: yield f"Error parsing task IDs: {e}", "" return if not tasks_to_run: yield "No matching tasks found.", "" return total = len(tasks_to_run) yield f"**Initializing CUGA agent...** (0/{total} tasks)", "" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: agent = loop.run_until_complete(_setup_agent(api_key, provider, model)) logger.info("CUGA agent initialized successfully") langfuse = LangfuseTracker() langfuse.start_trace( name="bpo_evaluation", metadata={ "provider": provider, "model": model or get_default_model(provider), "num_tasks": total, }, ) # Initialize LLM judge (only for Groq provider currently) llm_judge = None llm_judge_requested = False if provider.lower() == "groq": try: llm_judge = get_llm_judge(api_key=api_key, provider="groq") llm_judge_requested = True logger.info("LLM judge initialized") except Exception as e: logger.warning(f"Failed to initialize LLM judge: {e}") # Run tasks, yielding progress after each one results = [] for i, task in enumerate(tasks_to_run): task_name = task.get("name", f"task_{i+1}") logger.info(f"Evaluating task {i+1}/{total}: {task_name}") yield f"**Running {task_name}...** ({i}/{total} complete)", "" result = loop.run_until_complete( _run_single_task(agent, task, i, llm_judge, llm_judge_requested, langfuse) ) results.append(result) # Small delay between tasks if len(results) < total: loop.run_until_complete(asyncio.sleep(0.5)) # Clean up agent.close() md = _build_results_markdown(results, langfuse) yield md, json.dumps(results, indent=2) except Exception as e: logger.exception("Evaluation failed") yield f"Evaluation failed: {e}", "" finally: loop.close() def get_task_list(): """Get a formatted list of available tasks grouped by suite.""" if not ALL_TASKS_CACHE: return "No tasks loaded" lines = [] for label in AVAILABLE_SUITES: tasks = ALL_TASKS_CACHE.get(label, []) if not tasks: continue lines.append(f"### {label}\n") for task in tasks: name = task.get("name", "unknown") diff = task.get("difficulty", "unknown") intent = task.get("intent", "") if len(intent) > 60: intent = intent[:60] + "..." lines.append(f"- **{name}** ({diff}): {intent}") lines.append("") return "\n".join(lines) def update_model_choices(provider: str): """Update model dropdown choices based on provider.""" models = get_provider_models(provider) default = get_default_model(provider) return gr.update(choices=models, value=default) def update_api_key_placeholder(provider: str): """Update API key placeholder based on provider.""" placeholder = get_provider_placeholder(provider) return gr.update(placeholder=placeholder) # Gradio Interface with gr.Blocks(title="BPO Benchmark") as demo: gr.Markdown("# BPO Benchmark Evaluation") gr.Markdown( "Evaluate **CUGA SDK** on BPO recruiting analytics tasks with 32 tool APIs. " "Enter your API key, select tasks, and run the evaluation." ) with gr.Row(): with gr.Column(scale=1): provider = gr.Dropdown( choices=["Groq", "OpenAI"], value="Groq", label="LLM Provider" ) api_key = gr.Textbox( label="API Key", type="password", placeholder="gsk_... (Groq)" ) model = gr.Dropdown( choices=get_provider_models("groq"), value=get_default_model("groq"), label="Model", allow_custom_value=True, ) test_suites = gr.CheckboxGroup( choices=AVAILABLE_SUITES, value=["Core (26 tasks)"], label="Test Suites", info=f"{total_available} tasks across {len(AVAILABLE_SUITES)} suites", ) task_ids = gr.Textbox( label="Task IDs (optional filter)", placeholder="1 2 3 or task_27 task_28 (leave empty for all in selected suites)", info="Filter within selected suites by ID" ) run_btn = gr.Button("Run Evaluation", variant="primary", size="lg") with gr.Accordion("Available Tasks", open=False): gr.Markdown(get_task_list()) with gr.Accordion("Environment Info", open=False): langfuse_status = "Configured" if is_langfuse_configured() else "Not configured" public_key_set = "Yes" if os.environ.get("LANGFUSE_PUBLIC_KEY") else "No" secret_key_set = "Yes" if os.environ.get("LANGFUSE_SECRET_KEY") else "No" langfuse_host = get_langfuse_host() gr.Markdown(f""" **Langfuse Tracking:** {langfuse_status} - LANGFUSE_PUBLIC_KEY set: {public_key_set} - LANGFUSE_SECRET_KEY set: {secret_key_set} - Host: {langfuse_host} To enable Langfuse tracking in HuggingFace: 1. Go to Space Settings > Variables and secrets 2. Add **Secrets** (not variables): - `LANGFUSE_PUBLIC_KEY` - `LANGFUSE_SECRET_KEY` - `LANGFUSE_HOST` (e.g., `https://us.cloud.langfuse.com`) 3. Restart the Space for changes to take effect *Connection will be tested when you run an evaluation* """) with gr.Column(scale=2): output = gr.Markdown(label="Results") with gr.Accordion("Raw JSON Results", open=False): raw_json = gr.Code(label="Raw JSON", language="json") # Event handlers provider.change( fn=update_model_choices, inputs=[provider], outputs=[model] ) provider.change( fn=update_api_key_placeholder, inputs=[provider], outputs=[api_key] ) run_btn.click( fn=run_evaluation, inputs=[api_key, provider, model, task_ids, test_suites], outputs=[output, raw_json] ) gr.Markdown(""" --- **Agent:** [CUGA SDK](https://pypi.org/project/cuga/) | **Dataset:** [ibm-research/BPO-Bench](https://huggingface.co/datasets/ibm-research/BPO-Bench) """) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)