github-actions[bot]
Deploy from GitHub Actions (commit: eb2cb1538d89b3093b6b424824dd9aecfc99086b)
cff1e0e
| """Conversation Orchestrator for managing evaluation workflow.""" | |
| import time | |
| from typing import Any, Dict, List, Optional | |
| import streamlit as st | |
| from evaluators import create_evaluator | |
| class ConversationOrchestrator: | |
| """Orchestrates conversation evaluation using multiple evaluators.""" | |
| def __init__(self, api_keys: Optional[Dict[str, str]] = None): | |
| """Initialize orchestrator with API keys. | |
| Args: | |
| api_keys: Dict of API keys, e.g., {"openai": "...", "hf": "..."} | |
| """ | |
| self.api_keys = api_keys or {} | |
| def _extract_scores_flat(self, item: Any) -> Dict[str, Any]: | |
| """Normalize a per-utterance item into a flat score map. | |
| Supports both: | |
| - {"metrics": {...}} (older shape) | |
| - {...} (already flat map) | |
| """ | |
| if not isinstance(item, dict): | |
| return {} | |
| if "metrics" in item and isinstance(item["metrics"], dict): | |
| return item["metrics"] | |
| return item | |
| def evaluate_conversation( | |
| self, utterances: List[Dict[str, Any]], selected_metrics: List[str] | |
| ) -> List[Dict[str, Any]]: | |
| """Evaluate conversation using selected metrics. | |
| Returns a list of per-utterance rows. For each selected metric, the row | |
| gets a key f"{metric_name}_scores" containing a flat map of scores. | |
| """ | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| all_evaluator_results: Dict[str, Dict[str, Any]] = {} | |
| total_evaluators = max(1, len(selected_metrics)) | |
| for i, metric_name in enumerate(selected_metrics): | |
| status_text.text( | |
| f"Running {metric_name} evaluator ({i+1}/{total_evaluators})..." | |
| ) | |
| progress_bar.progress((i + 1) / total_evaluators) | |
| # Create evaluator - pass api_key (singular) from the dict | |
| # Most evaluators use HuggingFace models, so try 'hf' first, then 'openai' | |
| api_key = self.api_keys.get("hf") or self.api_keys.get("openai") or None | |
| evaluator = create_evaluator(metric_name, api_key=api_key) | |
| if evaluator is None: | |
| st.warning(f"Evaluator for metric '{metric_name}' not found") | |
| continue | |
| try: | |
| # Many evaluators ignore **kwargs; it's fine. | |
| result = evaluator.execute(utterances, granularity="utterance") | |
| if result is None: | |
| st.warning(f"Evaluator '{metric_name}' returned no result") | |
| else: | |
| all_evaluator_results[metric_name] = result | |
| except Exception as e: | |
| st.warning(f"Evaluator '{metric_name}' failed: {str(e)}") | |
| time.sleep(0.05) | |
| # Merge results per utterance | |
| results: List[Dict[str, Any]] = [] | |
| for idx, utt in enumerate(utterances): | |
| row: Dict[str, Any] = { | |
| "speaker": utt.get("speaker", ""), | |
| "text": utt.get("text", ""), | |
| "index": idx, | |
| } | |
| for eval_name, eval_result in all_evaluator_results.items(): | |
| # Determine granularity (default to utterance) | |
| granularity = eval_result.get("granularity") | |
| if not granularity: | |
| # infer by available keys | |
| if "per_utterance" in eval_result: | |
| granularity = "utterance" | |
| elif "per_conversation" in eval_result or "overall" in eval_result: | |
| granularity = "conversation" | |
| elif "per_segment" in eval_result: | |
| granularity = "segment" | |
| else: | |
| granularity = "utterance" | |
| scores: Dict[str, Any] = {} | |
| if granularity == "utterance": | |
| per_u = eval_result.get("per_utterance") or [] | |
| if idx < len(per_u): | |
| scores = self._extract_scores_flat(per_u[idx]) | |
| else: | |
| scores = {} | |
| elif granularity == "conversation": | |
| # try overall, then per_conversation, normalize to flat metrics | |
| overall = eval_result.get("overall") | |
| if isinstance(overall, dict): | |
| scores = self._extract_scores_flat(overall) | |
| else: | |
| per_conv = eval_result.get("per_conversation", {}) | |
| scores = self._extract_scores_flat(per_conv) | |
| elif granularity == "segment": | |
| # Attach the first matching segment that covers this utterance | |
| seg_scores = {} | |
| segments = eval_result.get("per_segment") or [] | |
| for seg in segments: | |
| try: | |
| indices = seg.get("utterance_indices") or [] | |
| if idx in indices: | |
| seg_scores = self._extract_scores_flat( | |
| seg.get("metrics", {}) | |
| ) | |
| break | |
| except Exception: | |
| continue | |
| scores = seg_scores | |
| else: | |
| # Unknown granularity; try to be helpful | |
| per_u = eval_result.get("per_utterance") or [] | |
| if idx < len(per_u): | |
| scores = self._extract_scores_flat(per_u[idx]) | |
| row[f"{eval_name}_scores"] = scores | |
| results.append(row) | |
| status_text.text("Evaluation complete!") | |
| return results | |