"""Conversation Orchestrator for managing evaluation workflow.""" import time from typing import Any, Dict, List, Optional import streamlit as st from evaluators import create_evaluator class ConversationOrchestrator: """Orchestrates conversation evaluation using multiple evaluators.""" def __init__(self, api_keys: Optional[Dict[str, str]] = None): """Initialize orchestrator with API keys. Args: api_keys: Dict of API keys, e.g., {"openai": "...", "hf": "..."} """ self.api_keys = api_keys or {} def _extract_scores_flat(self, item: Any) -> Dict[str, Any]: """Normalize a per-utterance item into a flat score map. Supports both: - {"metrics": {...}} (older shape) - {...} (already flat map) """ if not isinstance(item, dict): return {} if "metrics" in item and isinstance(item["metrics"], dict): return item["metrics"] return item def evaluate_conversation( self, utterances: List[Dict[str, Any]], selected_metrics: List[str] ) -> List[Dict[str, Any]]: """Evaluate conversation using selected metrics. Returns a list of per-utterance rows. For each selected metric, the row gets a key f"{metric_name}_scores" containing a flat map of scores. """ progress_bar = st.progress(0) status_text = st.empty() all_evaluator_results: Dict[str, Dict[str, Any]] = {} total_evaluators = max(1, len(selected_metrics)) for i, metric_name in enumerate(selected_metrics): status_text.text( f"Running {metric_name} evaluator ({i+1}/{total_evaluators})..." ) progress_bar.progress((i + 1) / total_evaluators) # Create evaluator - pass api_key (singular) from the dict # Most evaluators use HuggingFace models, so try 'hf' first, then 'openai' api_key = self.api_keys.get("hf") or self.api_keys.get("openai") or None evaluator = create_evaluator(metric_name, api_key=api_key) if evaluator is None: st.warning(f"Evaluator for metric '{metric_name}' not found") continue try: # Many evaluators ignore **kwargs; it's fine. result = evaluator.execute(utterances, granularity="utterance") if result is None: st.warning(f"Evaluator '{metric_name}' returned no result") else: all_evaluator_results[metric_name] = result except Exception as e: st.warning(f"Evaluator '{metric_name}' failed: {str(e)}") time.sleep(0.05) # Merge results per utterance results: List[Dict[str, Any]] = [] for idx, utt in enumerate(utterances): row: Dict[str, Any] = { "speaker": utt.get("speaker", ""), "text": utt.get("text", ""), "index": idx, } for eval_name, eval_result in all_evaluator_results.items(): # Determine granularity (default to utterance) granularity = eval_result.get("granularity") if not granularity: # infer by available keys if "per_utterance" in eval_result: granularity = "utterance" elif "per_conversation" in eval_result or "overall" in eval_result: granularity = "conversation" elif "per_segment" in eval_result: granularity = "segment" else: granularity = "utterance" scores: Dict[str, Any] = {} if granularity == "utterance": per_u = eval_result.get("per_utterance") or [] if idx < len(per_u): scores = self._extract_scores_flat(per_u[idx]) else: scores = {} elif granularity == "conversation": # try overall, then per_conversation, normalize to flat metrics overall = eval_result.get("overall") if isinstance(overall, dict): scores = self._extract_scores_flat(overall) else: per_conv = eval_result.get("per_conversation", {}) scores = self._extract_scores_flat(per_conv) elif granularity == "segment": # Attach the first matching segment that covers this utterance seg_scores = {} segments = eval_result.get("per_segment") or [] for seg in segments: try: indices = seg.get("utterance_indices") or [] if idx in indices: seg_scores = self._extract_scores_flat( seg.get("metrics", {}) ) break except Exception: continue scores = seg_scores else: # Unknown granularity; try to be helpful per_u = eval_result.get("per_utterance") or [] if idx < len(per_u): scores = self._extract_scores_flat(per_u[idx]) row[f"{eval_name}_scores"] = scores results.append(row) status_text.text("Evaluation complete!") return results