hhh-test / services /orchestrator.py
github-actions[bot]
Deploy from GitHub Actions (commit: eb2cb1538d89b3093b6b424824dd9aecfc99086b)
cff1e0e
"""Conversation Orchestrator for managing evaluation workflow."""
import time
from typing import Any, Dict, List, Optional
import streamlit as st
from evaluators import create_evaluator
class ConversationOrchestrator:
"""Orchestrates conversation evaluation using multiple evaluators."""
def __init__(self, api_keys: Optional[Dict[str, str]] = None):
"""Initialize orchestrator with API keys.
Args:
api_keys: Dict of API keys, e.g., {"openai": "...", "hf": "..."}
"""
self.api_keys = api_keys or {}
def _extract_scores_flat(self, item: Any) -> Dict[str, Any]:
"""Normalize a per-utterance item into a flat score map.
Supports both:
- {"metrics": {...}} (older shape)
- {...} (already flat map)
"""
if not isinstance(item, dict):
return {}
if "metrics" in item and isinstance(item["metrics"], dict):
return item["metrics"]
return item
def evaluate_conversation(
self, utterances: List[Dict[str, Any]], selected_metrics: List[str]
) -> List[Dict[str, Any]]:
"""Evaluate conversation using selected metrics.
Returns a list of per-utterance rows. For each selected metric, the row
gets a key f"{metric_name}_scores" containing a flat map of scores.
"""
progress_bar = st.progress(0)
status_text = st.empty()
all_evaluator_results: Dict[str, Dict[str, Any]] = {}
total_evaluators = max(1, len(selected_metrics))
for i, metric_name in enumerate(selected_metrics):
status_text.text(
f"Running {metric_name} evaluator ({i+1}/{total_evaluators})..."
)
progress_bar.progress((i + 1) / total_evaluators)
# Create evaluator - pass api_key (singular) from the dict
# Most evaluators use HuggingFace models, so try 'hf' first, then 'openai'
api_key = self.api_keys.get("hf") or self.api_keys.get("openai") or None
evaluator = create_evaluator(metric_name, api_key=api_key)
if evaluator is None:
st.warning(f"Evaluator for metric '{metric_name}' not found")
continue
try:
# Many evaluators ignore **kwargs; it's fine.
result = evaluator.execute(utterances, granularity="utterance")
if result is None:
st.warning(f"Evaluator '{metric_name}' returned no result")
else:
all_evaluator_results[metric_name] = result
except Exception as e:
st.warning(f"Evaluator '{metric_name}' failed: {str(e)}")
time.sleep(0.05)
# Merge results per utterance
results: List[Dict[str, Any]] = []
for idx, utt in enumerate(utterances):
row: Dict[str, Any] = {
"speaker": utt.get("speaker", ""),
"text": utt.get("text", ""),
"index": idx,
}
for eval_name, eval_result in all_evaluator_results.items():
# Determine granularity (default to utterance)
granularity = eval_result.get("granularity")
if not granularity:
# infer by available keys
if "per_utterance" in eval_result:
granularity = "utterance"
elif "per_conversation" in eval_result or "overall" in eval_result:
granularity = "conversation"
elif "per_segment" in eval_result:
granularity = "segment"
else:
granularity = "utterance"
scores: Dict[str, Any] = {}
if granularity == "utterance":
per_u = eval_result.get("per_utterance") or []
if idx < len(per_u):
scores = self._extract_scores_flat(per_u[idx])
else:
scores = {}
elif granularity == "conversation":
# try overall, then per_conversation, normalize to flat metrics
overall = eval_result.get("overall")
if isinstance(overall, dict):
scores = self._extract_scores_flat(overall)
else:
per_conv = eval_result.get("per_conversation", {})
scores = self._extract_scores_flat(per_conv)
elif granularity == "segment":
# Attach the first matching segment that covers this utterance
seg_scores = {}
segments = eval_result.get("per_segment") or []
for seg in segments:
try:
indices = seg.get("utterance_indices") or []
if idx in indices:
seg_scores = self._extract_scores_flat(
seg.get("metrics", {})
)
break
except Exception:
continue
scores = seg_scores
else:
# Unknown granularity; try to be helpful
per_u = eval_result.get("per_utterance") or []
if idx < len(per_u):
scores = self._extract_scores_flat(per_u[idx])
row[f"{eval_name}_scores"] = scores
results.append(row)
status_text.text("Evaluation complete!")
return results