Spaces:
Running
Running
| """Gradio UI for BPO Benchmark evaluation using CUGA SDK.""" | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import gradio as gr | |
| from agent import ( | |
| CUGAAgent, | |
| LangfuseTracker, | |
| LLMJudge, | |
| check_keywords, | |
| compare_api_calls, | |
| compute_string_similarity, | |
| compute_exact_match, | |
| compute_final_score, | |
| get_llm_judge, | |
| get_provider_models, | |
| get_provider_placeholder, | |
| get_default_model, | |
| is_langfuse_configured, | |
| get_langfuse_host, | |
| PROVIDER_CONFIGS, | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def ensure_mcp_config(): | |
| """Ensure MCP servers config file exists.""" | |
| mcp_dir = Path(__file__).parent / "mcp_servers" | |
| mcp_dir.mkdir(exist_ok=True) | |
| config_file = mcp_dir / "bpo.yaml" | |
| if not config_file.exists(): | |
| config_file.write_text("""services: | |
| - bpo: | |
| url: http://127.0.0.1:8000/openapi.json | |
| description: BPO recruiting analytics API | |
| """) | |
| return config_file | |
| # Ensure config exists | |
| ensure_mcp_config() | |
| # Test suite definitions: label -> filename | |
| TEST_SUITES = { | |
| "Core (26 tasks)": "tasks.json", | |
| "Type Mismatch (3 tasks)": "tasks_type_mismatch.json", | |
| "HTTP Errors (4 tasks)": "tasks_http_errors.json", | |
| "Schema Violations (4 tasks)": "tasks_schema_violations.json", | |
| "Edge Cases (5 tasks)": "tasks_edge_cases.json", | |
| "Undocumented Behaviors (3 tasks)": "tasks_undocumented.json", | |
| } | |
| def _find_data_dir() -> Optional[Path]: | |
| """Locate the data directory.""" | |
| candidates = [ | |
| Path(__file__).parent / "data", | |
| Path("./data"), | |
| Path("/home/user/app/data"), | |
| ] | |
| for p in candidates: | |
| if p.is_dir(): | |
| return p | |
| return None | |
| def _load_tasks_from_file(path: Path) -> List[Dict[str, Any]]: | |
| """Load test cases from a single JSON file.""" | |
| if not path.exists(): | |
| logger.warning(f"Task file not found: {path}") | |
| return [] | |
| with open(path) as f: | |
| data = json.load(f) | |
| cases = [] | |
| if isinstance(data, list): | |
| for item in data: | |
| if isinstance(item, dict) and "test_cases" in item: | |
| cases.extend(item["test_cases"]) | |
| elif isinstance(item, dict): | |
| cases.append(item) | |
| return cases | |
| def load_tasks(suite_labels: Optional[List[str]] = None) -> List[Dict[str, Any]]: | |
| """Load tasks from one or more test suite files. | |
| Args: | |
| suite_labels: List of suite labels to load (keys from TEST_SUITES). | |
| If None, loads only the core suite. | |
| """ | |
| data_dir = _find_data_dir() | |
| if data_dir is None: | |
| logger.warning("Data directory not found") | |
| return [] | |
| if suite_labels is None: | |
| suite_labels = ["Core (26 tasks)"] | |
| tasks = [] | |
| for label in suite_labels: | |
| filename = TEST_SUITES.get(label) | |
| if filename: | |
| loaded = _load_tasks_from_file(data_dir / filename) | |
| logger.info(f"Loaded {len(loaded)} tasks from {filename}") | |
| tasks.extend(loaded) | |
| return tasks | |
| def get_available_suites() -> List[str]: | |
| """Return labels of test suites that actually exist on disk.""" | |
| data_dir = _find_data_dir() | |
| if data_dir is None: | |
| return [] | |
| return [label for label, fn in TEST_SUITES.items() if (data_dir / fn).exists()] | |
| # Load core tasks at startup for the task list display | |
| AVAILABLE_SUITES = get_available_suites() | |
| ALL_TASKS_CACHE: Dict[str, List[Dict[str, Any]]] = {} | |
| for label in AVAILABLE_SUITES: | |
| ALL_TASKS_CACHE[label] = load_tasks([label]) | |
| TASKS = ALL_TASKS_CACHE.get("Core (26 tasks)", []) | |
| total_available = sum(len(v) for v in ALL_TASKS_CACHE.values()) | |
| logger.info(f"Loaded {len(TASKS)} core tasks, {total_available} total across {len(AVAILABLE_SUITES)} suites") | |
| async def _setup_agent(api_key: str, provider: str, model: str) -> CUGAAgent: | |
| """Initialize and return CUGA agent.""" | |
| agent = CUGAAgent( | |
| api_key=api_key, | |
| provider=provider.lower(), | |
| model=model if model.strip() else None, | |
| ) | |
| await agent.setup() | |
| return agent | |
| async def _run_single_task( | |
| agent: CUGAAgent, task: Dict, task_index: int, | |
| llm_judge: Any, llm_judge_requested: bool, | |
| langfuse: Any, | |
| ) -> Dict[str, Any]: | |
| """Run a single evaluation task and return the result.""" | |
| task_name = task.get("name", f"task_{task_index+1}") | |
| query = task.get("intent", "") | |
| thread_id = f"eval_{task_name}_{task_index}" | |
| try: | |
| response, tool_calls = await agent.run(query, thread_id=thread_id) | |
| # Get expected output and keywords | |
| expected_output = task.get("expected_output", {}) | |
| expected_keywords = expected_output.get("keywords", []) | |
| expected_answer = expected_output.get("response", "") or expected_output.get("answer", "") | |
| tool_calls_expected = expected_output.get("tool_calls", []) or expected_output.get("apis", []) | |
| expected_apis = [] | |
| for tc in tool_calls_expected: | |
| if isinstance(tc, dict): | |
| expected_apis.append(tc.get("name", "")) | |
| elif isinstance(tc, str): | |
| expected_apis.append(tc) | |
| # Compute metrics | |
| keyword_check = check_keywords(response, expected_keywords) | |
| similarity = compute_string_similarity(response, expected_answer) if expected_answer else 0.0 | |
| exact_match = compute_exact_match(response, expected_answer) if expected_answer else False | |
| # Extract tool names | |
| tool_names = [] | |
| for tc in tool_calls: | |
| if isinstance(tc, dict): | |
| tool_names.append(tc.get("name", str(tc))) | |
| else: | |
| tool_names.append(str(tc)) | |
| # Compare API calls | |
| api_comparison = compare_api_calls(tool_names, expected_apis) if expected_apis else { | |
| "missing": [], "extra": [], "correct": 0, "expected_count": 0, | |
| "called_count": len(tool_names), "all_expected_called": True, | |
| } | |
| # LLM Judge evaluation | |
| llm_judge_score = None | |
| llm_judge_rationale = None | |
| if llm_judge and expected_answer: | |
| try: | |
| judge_result = await llm_judge.judge(response, expected_answer, query) | |
| llm_judge_score = judge_result.get("score") | |
| llm_judge_rationale = judge_result.get("rationale", "") | |
| except Exception as e: | |
| logger.warning(f"LLM judge failed for {task_name}: {e}") | |
| # Compute final score (matches main repo logic) | |
| final_score = compute_final_score( | |
| exact_match=exact_match, | |
| similarity=similarity, | |
| llm_judge_score=llm_judge_score, | |
| llm_judge_requested=llm_judge_requested, | |
| agent_output=response, | |
| apis_missing=api_comparison["missing"], | |
| require_api_match=True, | |
| ) | |
| result = { | |
| "task_id": task_name, | |
| "difficulty": task.get("difficulty", "unknown"), | |
| "intent": query, | |
| "response": response, | |
| "expected_answer": expected_answer, | |
| "expected_keywords": expected_keywords, | |
| "found_keywords": keyword_check["found_keywords"], | |
| "missing_keywords": keyword_check["missing_keywords"], | |
| "match_rate": keyword_check["match_rate"], | |
| "similarity": similarity, | |
| "exact_match": exact_match, | |
| "llm_judge_score": llm_judge_score, | |
| "llm_judge_rationale": llm_judge_rationale, | |
| "final_score": final_score, | |
| "passed": final_score == 1, | |
| "tool_calls": tool_names, | |
| "expected_apis": expected_apis, | |
| "apis_missing": api_comparison["missing"], | |
| "apis_extra": api_comparison["extra"], | |
| "apis_correct": api_comparison["correct"], | |
| } | |
| # Score in Langfuse | |
| scores = { | |
| "similarity": similarity, | |
| "keyword_match": keyword_check["match_rate"], | |
| "final_score": float(final_score), | |
| } | |
| if llm_judge_score is not None: | |
| scores["llm_judge"] = llm_judge_score | |
| langfuse.score_task(task_name, scores) | |
| logger.info( | |
| f"Task {task_name}: {'PASS' if result['passed'] else 'FAIL'} " | |
| f"(sim={similarity:.2f}, kw={keyword_check['match_rate']:.1%}" | |
| f"{f', judge={llm_judge_score:.2f}' if llm_judge_score is not None else ''})" | |
| ) | |
| return result | |
| except Exception as e: | |
| logger.exception(f"Error in task {task_name}") | |
| return { | |
| "task_id": task_name, | |
| "difficulty": task.get("difficulty", "unknown"), | |
| "intent": task.get("intent", ""), | |
| "response": f"Error: {e}", | |
| "passed": False, | |
| "final_score": 0, | |
| "similarity": 0.0, | |
| "exact_match": False, | |
| "match_rate": 0.0, | |
| "tool_calls": [], | |
| "error": str(e), | |
| } | |
| def _build_results_markdown(results: List[Dict], langfuse: Any) -> str: | |
| """Build markdown summary from evaluation results.""" | |
| total = len(results) | |
| passed = sum(1 for r in results if r.get("passed", False)) | |
| avg_similarity = sum(r.get("similarity", 0) for r in results) / total if total else 0 | |
| avg_match = sum(r.get("match_rate", 0) for r in results) / total if total else 0 | |
| exact_matches = sum(1 for r in results if r.get("exact_match", False)) | |
| final_score_passes = sum(1 for r in results if r.get("final_score") == 1) | |
| keyword_full_matches = sum(1 for r in results if r.get("match_rate", 0) == 1.0) | |
| tasks_with_tools = sum(1 for r in results if r.get("tool_calls")) | |
| # LLM Judge metrics | |
| judge_scores = [r.get("llm_judge_score") for r in results if r.get("llm_judge_score") is not None] | |
| avg_judge_score = sum(judge_scores) / len(judge_scores) if judge_scores else None | |
| judge_passes = sum(1 for s in judge_scores if s >= 0.85) if judge_scores else 0 | |
| # API metrics | |
| tasks_with_expected_apis = [r for r in results if r.get("expected_apis")] | |
| api_correct = sum(1 for r in tasks_with_expected_apis if not r.get("apis_missing")) | |
| api_accuracy = api_correct / len(tasks_with_expected_apis) if tasks_with_expected_apis else None | |
| summary = { | |
| "total_tasks": total, | |
| "passed": passed, | |
| "pass_rate": passed / total if total else 0, | |
| "avg_similarity": avg_similarity, | |
| "avg_keyword_match": avg_match, | |
| "exact_matches": exact_matches, | |
| "final_score_passes": final_score_passes, | |
| "keyword_full_matches": keyword_full_matches, | |
| "avg_llm_judge_score": avg_judge_score, | |
| "api_accuracy": api_accuracy, | |
| } | |
| # End Langfuse trace | |
| langfuse.end_trace(summary) | |
| # Group by difficulty | |
| by_difficulty = {} | |
| for r in results: | |
| diff = r.get("difficulty", "unknown") | |
| if diff not in by_difficulty: | |
| by_difficulty[diff] = {"total": 0, "passed": 0} | |
| by_difficulty[diff]["total"] += 1 | |
| if r.get("passed", False): | |
| by_difficulty[diff]["passed"] += 1 | |
| # Build markdown output | |
| md = "## Evaluation Complete\n\n" | |
| md += f"**Total Tasks:** {total}\n" | |
| md += f"**Final Score:** {final_score_passes}/{total} ({100*final_score_passes/total:.1f}%)\n" | |
| md += f"**Exact Matches:** {exact_matches} ({100*exact_matches/total:.1f}%)\n" | |
| md += f"**Avg Similarity:** {avg_similarity:.2f}\n" | |
| md += f"**Keyword Match:** {avg_match*100:.1f}% avg ({keyword_full_matches}/{total} full matches)\n" | |
| if avg_judge_score is not None: | |
| md += f"**LLM Judge:** {len(judge_scores)} tasks, avg={avg_judge_score:.2f} ({judge_passes}/{len(judge_scores)} pass)\n" | |
| if api_accuracy is not None: | |
| md += f"**API Accuracy:** {api_correct}/{len(tasks_with_expected_apis)} ({api_accuracy*100:.1f}%)\n" | |
| md += f"**Tasks with Tool Calls:** {tasks_with_tools}/{total}\n" | |
| if langfuse.enabled: | |
| md += "\n*Langfuse tracking enabled*\n" | |
| elif langfuse.init_error: | |
| md += f"\n*Langfuse error: {langfuse.init_error}*\n" | |
| md += "\n" | |
| # By difficulty breakdown | |
| if by_difficulty: | |
| md += "### By Difficulty\n" | |
| for diff, stats in sorted(by_difficulty.items()): | |
| rate = stats["passed"] / stats["total"] * 100 if stats["total"] else 0 | |
| md += f"- **{diff}**: {stats['passed']}/{stats['total']} ({rate:.1f}%)\n" | |
| md += "\n" | |
| md += "---\n\n" | |
| # Individual results | |
| for r in results: | |
| status = "PASS" if r.get("passed") else "FAIL" | |
| md += f"### {status} - {r.get('task_id', 'unknown')} ({r.get('difficulty', 'unknown')})\n\n" | |
| md += f"**Query:** {r.get('intent', '')}\n\n" | |
| response_text = r.get("response", "") | |
| if len(response_text) > 500: | |
| response_text = response_text[:500] + "..." | |
| md += f"**Response:** {response_text}\n\n" | |
| # Enhanced metrics display | |
| md += "**Metrics:**\n" | |
| md += f"- **Final Score: {'PASS' if r.get('final_score') == 1 else 'FAIL'}**\n" | |
| md += f"- Similarity: {r.get('similarity', 0)*100:.1f}%\n" | |
| md += f"- Exact Match: {'Yes' if r.get('exact_match') else 'No'}\n" | |
| if r.get("llm_judge_score") is not None: | |
| md += f"- LLM Judge: {r['llm_judge_score']:.2f}\n" | |
| md += f"- Keyword Match: {r.get('match_rate', 0)*100:.1f}%\n" | |
| md += "\n" | |
| if r.get("missing_keywords"): | |
| missing = r["missing_keywords"][:5] | |
| md += f"**Missing keywords:** {', '.join(missing)}" | |
| if len(r.get("missing_keywords", [])) > 5: | |
| md += f" (+{len(r['missing_keywords']) - 5} more)" | |
| md += "\n\n" | |
| # API metrics | |
| if r.get("expected_apis"): | |
| correct = r.get("apis_correct", 0) | |
| expected = len(r.get("expected_apis", [])) | |
| api_status = "PASS" if not r.get("apis_missing") else "FAIL" | |
| md += f"- API Accuracy: {correct}/{expected} ({api_status})\n" | |
| if r.get("tool_calls"): | |
| md += f"- Tools used: {', '.join(r['tool_calls'])}\n" | |
| if r.get("apis_missing"): | |
| md += f"- Missing APIs: {', '.join(r['apis_missing'])}\n" | |
| md += "\n" | |
| if r.get("error"): | |
| md += f"**Error:** {r['error']}\n\n" | |
| md += "---\n\n" | |
| return md | |
| def run_evaluation(api_key, provider, model, task_ids, test_suites): | |
| """Run CUGA SDK evaluation, yielding live progress to the UI.""" | |
| if not api_key: | |
| yield "Please provide an API key", "" | |
| return | |
| # Load tasks from selected suites | |
| if not test_suites: | |
| test_suites = ["Core (26 tasks)"] | |
| all_tasks = load_tasks(test_suites) | |
| if not all_tasks: | |
| yield "No tasks loaded. Check that task files exist in the data directory.", "" | |
| return | |
| # Parse task IDs to filter within loaded tasks | |
| task_ids_str = task_ids.strip() | |
| if task_ids_str.lower() == "all" or not task_ids_str: | |
| tasks_to_run = all_tasks | |
| else: | |
| try: | |
| ids = [s.strip() for s in task_ids_str.replace(",", " ").split()] | |
| tasks_to_run = [] | |
| for task in all_tasks: | |
| task_name = task.get("name", "") | |
| task_num = task_name.replace("task_", "") if task_name.startswith("task_") else task_name | |
| if task_name in ids or task_num in ids: | |
| tasks_to_run.append(task) | |
| except Exception as e: | |
| yield f"Error parsing task IDs: {e}", "" | |
| return | |
| if not tasks_to_run: | |
| yield "No matching tasks found.", "" | |
| return | |
| total = len(tasks_to_run) | |
| yield f"**Initializing CUGA agent...** (0/{total} tasks)", "" | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| agent = loop.run_until_complete(_setup_agent(api_key, provider, model)) | |
| logger.info("CUGA agent initialized successfully") | |
| langfuse = LangfuseTracker() | |
| langfuse.start_trace( | |
| name="bpo_evaluation", | |
| metadata={ | |
| "provider": provider, | |
| "model": model or get_default_model(provider), | |
| "num_tasks": total, | |
| }, | |
| ) | |
| # Initialize LLM judge (only for Groq provider currently) | |
| llm_judge = None | |
| llm_judge_requested = False | |
| if provider.lower() == "groq": | |
| try: | |
| llm_judge = get_llm_judge(api_key=api_key, provider="groq") | |
| llm_judge_requested = True | |
| logger.info("LLM judge initialized") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize LLM judge: {e}") | |
| # Run tasks, yielding progress after each one | |
| results = [] | |
| for i, task in enumerate(tasks_to_run): | |
| task_name = task.get("name", f"task_{i+1}") | |
| logger.info(f"Evaluating task {i+1}/{total}: {task_name}") | |
| yield f"**Running {task_name}...** ({i}/{total} complete)", "" | |
| result = loop.run_until_complete( | |
| _run_single_task(agent, task, i, llm_judge, llm_judge_requested, langfuse) | |
| ) | |
| results.append(result) | |
| # Small delay between tasks | |
| if len(results) < total: | |
| loop.run_until_complete(asyncio.sleep(0.5)) | |
| # Clean up | |
| agent.close() | |
| md = _build_results_markdown(results, langfuse) | |
| yield md, json.dumps(results, indent=2) | |
| except Exception as e: | |
| logger.exception("Evaluation failed") | |
| yield f"Evaluation failed: {e}", "" | |
| finally: | |
| loop.close() | |
| def get_task_list(): | |
| """Get a formatted list of available tasks grouped by suite.""" | |
| if not ALL_TASKS_CACHE: | |
| return "No tasks loaded" | |
| lines = [] | |
| for label in AVAILABLE_SUITES: | |
| tasks = ALL_TASKS_CACHE.get(label, []) | |
| if not tasks: | |
| continue | |
| lines.append(f"### {label}\n") | |
| for task in tasks: | |
| name = task.get("name", "unknown") | |
| diff = task.get("difficulty", "unknown") | |
| intent = task.get("intent", "") | |
| if len(intent) > 60: | |
| intent = intent[:60] + "..." | |
| lines.append(f"- **{name}** ({diff}): {intent}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def update_model_choices(provider: str): | |
| """Update model dropdown choices based on provider.""" | |
| models = get_provider_models(provider) | |
| default = get_default_model(provider) | |
| return gr.update(choices=models, value=default) | |
| def update_api_key_placeholder(provider: str): | |
| """Update API key placeholder based on provider.""" | |
| placeholder = get_provider_placeholder(provider) | |
| return gr.update(placeholder=placeholder) | |
| # Gradio Interface | |
| with gr.Blocks(title="BPO Benchmark") as demo: | |
| gr.Markdown("# BPO Benchmark Evaluation") | |
| gr.Markdown( | |
| "Evaluate **CUGA SDK** on BPO recruiting analytics tasks with 32 tool APIs. " | |
| "Enter your API key, select tasks, and run the evaluation." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| provider = gr.Dropdown( | |
| choices=["Groq", "OpenAI"], | |
| value="Groq", | |
| label="LLM Provider" | |
| ) | |
| api_key = gr.Textbox( | |
| label="API Key", | |
| type="password", | |
| placeholder="gsk_... (Groq)" | |
| ) | |
| model = gr.Dropdown( | |
| choices=get_provider_models("groq"), | |
| value=get_default_model("groq"), | |
| label="Model", | |
| allow_custom_value=True, | |
| ) | |
| test_suites = gr.CheckboxGroup( | |
| choices=AVAILABLE_SUITES, | |
| value=["Core (26 tasks)"], | |
| label="Test Suites", | |
| info=f"{total_available} tasks across {len(AVAILABLE_SUITES)} suites", | |
| ) | |
| task_ids = gr.Textbox( | |
| label="Task IDs (optional filter)", | |
| placeholder="1 2 3 or task_27 task_28 (leave empty for all in selected suites)", | |
| info="Filter within selected suites by ID" | |
| ) | |
| run_btn = gr.Button("Run Evaluation", variant="primary", size="lg") | |
| with gr.Accordion("Available Tasks", open=False): | |
| gr.Markdown(get_task_list()) | |
| with gr.Accordion("Environment Info", open=False): | |
| langfuse_status = "Configured" if is_langfuse_configured() else "Not configured" | |
| public_key_set = "Yes" if os.environ.get("LANGFUSE_PUBLIC_KEY") else "No" | |
| secret_key_set = "Yes" if os.environ.get("LANGFUSE_SECRET_KEY") else "No" | |
| langfuse_host = get_langfuse_host() | |
| gr.Markdown(f""" | |
| **Langfuse Tracking:** {langfuse_status} | |
| - LANGFUSE_PUBLIC_KEY set: {public_key_set} | |
| - LANGFUSE_SECRET_KEY set: {secret_key_set} | |
| - Host: {langfuse_host} | |
| To enable Langfuse tracking in HuggingFace: | |
| 1. Go to Space Settings > Variables and secrets | |
| 2. Add **Secrets** (not variables): | |
| - `LANGFUSE_PUBLIC_KEY` | |
| - `LANGFUSE_SECRET_KEY` | |
| - `LANGFUSE_HOST` (e.g., `https://us.cloud.langfuse.com`) | |
| 3. Restart the Space for changes to take effect | |
| *Connection will be tested when you run an evaluation* | |
| """) | |
| with gr.Column(scale=2): | |
| output = gr.Markdown(label="Results") | |
| with gr.Accordion("Raw JSON Results", open=False): | |
| raw_json = gr.Code(label="Raw JSON", language="json") | |
| # Event handlers | |
| provider.change( | |
| fn=update_model_choices, | |
| inputs=[provider], | |
| outputs=[model] | |
| ) | |
| provider.change( | |
| fn=update_api_key_placeholder, | |
| inputs=[provider], | |
| outputs=[api_key] | |
| ) | |
| run_btn.click( | |
| fn=run_evaluation, | |
| inputs=[api_key, provider, model, task_ids, test_suites], | |
| outputs=[output, raw_json] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| **Agent:** [CUGA SDK](https://pypi.org/project/cuga/) | |
| | **Dataset:** [ibm-research/BPO-Bench](https://huggingface.co/datasets/ibm-research/BPO-Bench) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |