| """ |
| Evaluation Pipeline for Contextual Similarity Engine |
| |
| Provides metrics and benchmarks to assess the quality of contextual |
| keyword matching: |
| - Cosine similarity distributions |
| - Precision@K and Recall@K for retrieval |
| - Normalized Mutual Information (NMI) for clustering quality |
| - Mean Reciprocal Rank (MRR) for ranking quality |
| - Keyword disambiguation accuracy against ground truth |
| - Full evaluation reports with summary statistics |
| """ |
|
|
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field, asdict |
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
| from sklearn.metrics import ( |
| normalized_mutual_info_score, |
| adjusted_rand_score, |
| precision_score, |
| recall_score, |
| f1_score, |
| confusion_matrix, |
| ) |
|
|
| from contextual_similarity import ContextualSimilarityEngine, KeywordAnalysis |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class GroundTruthEntry: |
| """A single labeled keyword occurrence for evaluation.""" |
| keyword: str |
| text: str |
| true_meaning: str |
|
|
|
|
| @dataclass |
| class RetrievalMetrics: |
| """Metrics for a single retrieval query.""" |
| query: str |
| precision_at_k: dict[int, float] = field(default_factory=dict) |
| recall_at_k: dict[int, float] = field(default_factory=dict) |
| mrr: float = 0.0 |
| ndcg_at_k: dict[int, float] = field(default_factory=dict) |
| avg_similarity: float = 0.0 |
| top_score: float = 0.0 |
|
|
|
|
| @dataclass |
| class ClusteringMetrics: |
| """Metrics for clustering quality against ground truth.""" |
| keyword: str |
| nmi: float = 0.0 |
| ari: float = 0.0 |
| num_predicted_clusters: int = 0 |
| num_true_clusters: int = 0 |
| cluster_sizes: list[int] = field(default_factory=list) |
|
|
|
|
| @dataclass |
| class DisambiguationMetrics: |
| """Metrics for keyword meaning disambiguation.""" |
| keyword: str |
| accuracy: float = 0.0 |
| weighted_f1: float = 0.0 |
| per_meaning_precision: dict[str, float] = field(default_factory=dict) |
| per_meaning_recall: dict[str, float] = field(default_factory=dict) |
| per_meaning_f1: dict[str, float] = field(default_factory=dict) |
| confusion: Optional[list] = None |
| total_samples: int = 0 |
|
|
|
|
| @dataclass |
| class EvaluationReport: |
| """Complete evaluation report.""" |
| timestamp: str = "" |
| model_name: str = "" |
| corpus_stats: dict = field(default_factory=dict) |
| retrieval_metrics: list[RetrievalMetrics] = field(default_factory=list) |
| clustering_metrics: list[ClusteringMetrics] = field(default_factory=list) |
| disambiguation_metrics: list[DisambiguationMetrics] = field(default_factory=list) |
| similarity_distribution: dict = field(default_factory=dict) |
| timing: dict = field(default_factory=dict) |
|
|
| def summary(self) -> dict: |
| """Return a concise summary of the evaluation.""" |
| summary = { |
| "model": self.model_name, |
| "corpus": self.corpus_stats, |
| "timing": self.timing, |
| } |
|
|
| if self.retrieval_metrics: |
| avg_mrr = float(np.mean([m.mrr for m in self.retrieval_metrics])) |
| avg_p5 = float(np.mean([m.precision_at_k.get(5, 0) for m in self.retrieval_metrics])) |
| avg_p10 = float(np.mean([m.precision_at_k.get(10, 0) for m in self.retrieval_metrics])) |
| summary["retrieval"] = { |
| "mean_mrr": round(avg_mrr, 4), |
| "mean_precision_at_5": round(avg_p5, 4), |
| "mean_precision_at_10": round(avg_p10, 4), |
| "num_queries": len(self.retrieval_metrics), |
| } |
|
|
| if self.clustering_metrics: |
| avg_nmi = float(np.mean([m.nmi for m in self.clustering_metrics])) |
| avg_ari = float(np.mean([m.ari for m in self.clustering_metrics])) |
| summary["clustering"] = { |
| "mean_nmi": round(avg_nmi, 4), |
| "mean_ari": round(avg_ari, 4), |
| "num_keywords": len(self.clustering_metrics), |
| } |
|
|
| if self.disambiguation_metrics: |
| avg_acc = float(np.mean([m.accuracy for m in self.disambiguation_metrics])) |
| avg_f1 = float(np.mean([m.weighted_f1 for m in self.disambiguation_metrics])) |
| summary["disambiguation"] = { |
| "mean_accuracy": round(avg_acc, 4), |
| "mean_weighted_f1": round(avg_f1, 4), |
| "num_keywords": len(self.disambiguation_metrics), |
| } |
|
|
| if self.similarity_distribution: |
| summary["similarity_distribution"] = self.similarity_distribution |
|
|
| return summary |
|
|
| def to_json(self, indent: int = 2) -> str: |
| """Serialize the full report to JSON.""" |
| return json.dumps(asdict(self), indent=indent, default=str) |
|
|
| def save(self, path: str) -> None: |
| """Save the report to a JSON file.""" |
| Path(path).write_text(self.to_json()) |
| logger.info(f"Evaluation report saved to {path}") |
|
|
|
|
| |
| |
| |
|
|
| class Evaluator: |
| """ |
| Evaluation pipeline for the ContextualSimilarityEngine. |
| |
| Usage: |
| engine = ContextualSimilarityEngine() |
| engine.add_document("doc1", text) |
| engine.build_index() |
| |
| evaluator = Evaluator(engine) |
| |
| # Evaluate retrieval quality |
| evaluator.evaluate_retrieval(queries_with_relevance) |
| |
| # Evaluate keyword disambiguation |
| evaluator.evaluate_disambiguation(ground_truth, candidate_meanings) |
| |
| # Evaluate clustering |
| evaluator.evaluate_clustering(ground_truth) |
| |
| # Get full report |
| report = evaluator.get_report() |
| """ |
|
|
| def __init__(self, engine: ContextualSimilarityEngine): |
| self.engine = engine |
| self._report = EvaluationReport( |
| timestamp=time.strftime("%Y-%m-%d %H:%M:%S"), |
| model_name=engine._model_name, |
| corpus_stats=engine.get_stats(), |
| ) |
|
|
| |
| |
| |
|
|
| def evaluate_retrieval( |
| self, |
| queries: list[dict], |
| k_values: list[int] = None, |
| ) -> list[RetrievalMetrics]: |
| """ |
| Evaluate retrieval quality given labeled queries. |
| |
| Args: |
| queries: List of dicts with keys: |
| - "query": str, the query text |
| - "relevant_doc_ids": list[str], doc IDs that are relevant |
| OR |
| - "relevant_texts": list[str], text snippets considered relevant |
| k_values: List of K values for P@K, R@K, NDCG@K. |
| |
| Returns: |
| List of RetrievalMetrics, one per query. |
| """ |
| if k_values is None: |
| k_values = [1, 3, 5, 10] |
|
|
| t0 = time.time() |
| all_metrics = [] |
|
|
| for q in queries: |
| query_text = q["query"] |
| max_k = max(k_values) |
| results = self.engine.query(query_text, top_k=max_k) |
|
|
| |
| relevant_doc_ids = set(q.get("relevant_doc_ids", [])) |
| relevant_texts = set(q.get("relevant_texts", [])) |
|
|
| def is_relevant(result): |
| if relevant_doc_ids and result.chunk.doc_id in relevant_doc_ids: |
| return True |
| if relevant_texts: |
| return any(rt.lower() in result.chunk.text.lower() for rt in relevant_texts) |
| return False |
|
|
| relevance = [is_relevant(r) for r in results] |
| scores = [r.score for r in results] |
|
|
| metrics = RetrievalMetrics(query=query_text) |
|
|
| |
| total_relevant = sum(relevance) |
| for k in k_values: |
| top_k_rel = relevance[:k] |
| metrics.precision_at_k[k] = sum(top_k_rel) / k if k > 0 else 0 |
| metrics.recall_at_k[k] = ( |
| sum(top_k_rel) / total_relevant if total_relevant > 0 else 0 |
| ) |
| metrics.ndcg_at_k[k] = self._compute_ndcg(relevance[:k], k) |
|
|
| |
| for i, rel in enumerate(relevance): |
| if rel: |
| metrics.mrr = 1.0 / (i + 1) |
| break |
|
|
| metrics.avg_similarity = float(np.mean(scores)) if scores else 0.0 |
| metrics.top_score = float(scores[0]) if scores else 0.0 |
|
|
| all_metrics.append(metrics) |
|
|
| elapsed = time.time() - t0 |
| self._report.retrieval_metrics = all_metrics |
| self._report.timing["retrieval_eval_seconds"] = round(elapsed, 3) |
| return all_metrics |
|
|
| @staticmethod |
| def _compute_ndcg(relevance: list[bool], k: int) -> float: |
| """Compute NDCG@K for binary relevance.""" |
| dcg = sum( |
| (1 if rel else 0) / np.log2(i + 2) |
| for i, rel in enumerate(relevance[:k]) |
| ) |
| |
| ideal = sorted(relevance[:k], reverse=True) |
| idcg = sum( |
| (1 if rel else 0) / np.log2(i + 2) |
| for i, rel in enumerate(ideal) |
| ) |
| return dcg / idcg if idcg > 0 else 0.0 |
|
|
| |
| |
| |
|
|
| def evaluate_clustering( |
| self, |
| ground_truth: list[GroundTruthEntry], |
| cluster_threshold: float = 0.35, |
| ) -> list[ClusteringMetrics]: |
| """ |
| Evaluate clustering quality by comparing engine's auto-clusters |
| against ground truth meaning labels. |
| |
| Args: |
| ground_truth: Labeled entries with keyword, text, and true_meaning. |
| cluster_threshold: Threshold for agglomerative clustering. |
| |
| Returns: |
| List of ClusteringMetrics, one per keyword. |
| """ |
| t0 = time.time() |
|
|
| |
| by_keyword: dict[str, list[GroundTruthEntry]] = {} |
| for entry in ground_truth: |
| by_keyword.setdefault(entry.keyword, []).append(entry) |
|
|
| all_metrics = [] |
| for keyword, entries in by_keyword.items(): |
| analysis = self.engine.analyze_keyword( |
| keyword, cluster_threshold=cluster_threshold |
| ) |
|
|
| if not analysis.meaning_clusters: |
| all_metrics.append(ClusteringMetrics(keyword=keyword)) |
| continue |
|
|
| |
| true_labels = [] |
| pred_labels = [] |
| meaning_to_id = {} |
|
|
| for entry in entries: |
| |
| if entry.true_meaning not in meaning_to_id: |
| meaning_to_id[entry.true_meaning] = len(meaning_to_id) |
| true_labels.append(meaning_to_id[entry.true_meaning]) |
|
|
| |
| best_cluster = -1 |
| best_sim = -1 |
| entry_vec = self.engine.model.encode( |
| [entry.text], normalize_embeddings=True, convert_to_numpy=True |
| ) |
| for cluster in analysis.meaning_clusters: |
| for ctx in cluster["contexts"]: |
| idx = self.engine.chunks.index(ctx.chunk) |
| sim = float(np.dot(entry_vec[0], self.engine.embeddings[idx])) |
| if sim > best_sim: |
| best_sim = sim |
| best_cluster = cluster["cluster_id"] |
| pred_labels.append(best_cluster) |
|
|
| metrics = ClusteringMetrics( |
| keyword=keyword, |
| nmi=normalized_mutual_info_score(true_labels, pred_labels), |
| ari=adjusted_rand_score(true_labels, pred_labels), |
| num_predicted_clusters=len(analysis.meaning_clusters), |
| num_true_clusters=len(meaning_to_id), |
| cluster_sizes=[c["size"] for c in analysis.meaning_clusters], |
| ) |
| all_metrics.append(metrics) |
|
|
| elapsed = time.time() - t0 |
| self._report.clustering_metrics = all_metrics |
| self._report.timing["clustering_eval_seconds"] = round(elapsed, 3) |
| return all_metrics |
|
|
| |
| |
| |
|
|
| def evaluate_disambiguation( |
| self, |
| ground_truth: list[GroundTruthEntry], |
| candidate_meanings: dict[str, list[str]], |
| ) -> list[DisambiguationMetrics]: |
| """ |
| Evaluate keyword meaning disambiguation accuracy. |
| |
| For each ground truth entry, uses match_keyword_to_meaning() and compares |
| the predicted best match against the true label. |
| |
| Args: |
| ground_truth: Labeled entries with keyword, text, and true_meaning. |
| candidate_meanings: Dict mapping keyword -> list of candidate meaning strings. |
| Each candidate should be a descriptive phrase, e.g. {"pizza": ["food", "school"]}. |
| |
| Returns: |
| List of DisambiguationMetrics, one per keyword. |
| """ |
| t0 = time.time() |
|
|
| by_keyword: dict[str, list[GroundTruthEntry]] = {} |
| for entry in ground_truth: |
| by_keyword.setdefault(entry.keyword, []).append(entry) |
|
|
| all_metrics = [] |
| for keyword, entries in by_keyword.items(): |
| candidates = candidate_meanings.get(keyword, []) |
| if not candidates: |
| logger.warning(f"No candidate meanings for '{keyword}', skipping.") |
| continue |
|
|
| true_labels = [] |
| pred_labels = [] |
|
|
| for entry in entries: |
| |
| entry_vec = self.engine.model.encode( |
| [entry.text], normalize_embeddings=True, convert_to_tensor=True |
| ) |
| cand_vecs = self.engine.model.encode( |
| candidates, normalize_embeddings=True, convert_to_tensor=True |
| ) |
| from sentence_transformers import util as st_util |
| scores = st_util.pytorch_cos_sim(entry_vec, cand_vecs)[0] |
| best_idx = int(scores.argmax()) |
| predicted = candidates[best_idx] |
|
|
| true_labels.append(entry.true_meaning) |
| pred_labels.append(predicted) |
|
|
| |
| unique_labels = sorted(set(true_labels + pred_labels)) |
| accuracy = sum(t == p for t, p in zip(true_labels, pred_labels)) / len(true_labels) |
|
|
| |
| per_meaning_p = {} |
| per_meaning_r = {} |
| per_meaning_f = {} |
| for label in unique_labels: |
| t_binary = [1 if t == label else 0 for t in true_labels] |
| p_binary = [1 if p == label else 0 for p in pred_labels] |
| p_val = precision_score(t_binary, p_binary, zero_division=0) |
| r_val = recall_score(t_binary, p_binary, zero_division=0) |
| f_val = f1_score(t_binary, p_binary, zero_division=0) |
| per_meaning_p[label] = round(p_val, 4) |
| per_meaning_r[label] = round(r_val, 4) |
| per_meaning_f[label] = round(f_val, 4) |
|
|
| weighted_f = f1_score( |
| true_labels, pred_labels, average="weighted", zero_division=0 |
| ) |
|
|
| cm = confusion_matrix(true_labels, pred_labels, labels=unique_labels) |
|
|
| metrics = DisambiguationMetrics( |
| keyword=keyword, |
| accuracy=round(accuracy, 4), |
| weighted_f1=round(weighted_f, 4), |
| per_meaning_precision=per_meaning_p, |
| per_meaning_recall=per_meaning_r, |
| per_meaning_f1=per_meaning_f, |
| confusion=cm.tolist(), |
| total_samples=len(entries), |
| ) |
| all_metrics.append(metrics) |
|
|
| elapsed = time.time() - t0 |
| self._report.disambiguation_metrics = all_metrics |
| self._report.timing["disambiguation_eval_seconds"] = round(elapsed, 3) |
| return all_metrics |
|
|
| |
| |
| |
|
|
| def analyze_similarity_distribution( |
| self, sample_size: int = 1000, seed: int = 42 |
| ) -> dict: |
| """ |
| Analyze the distribution of pairwise similarities in the corpus. |
| Useful for calibrating thresholds and understanding embedding space. |
| |
| Returns: |
| Dict with mean, std, percentiles, and histogram data. |
| """ |
| self.engine._ensure_index() |
| n = len(self.engine.chunks) |
| rng = np.random.RandomState(seed) |
|
|
| |
| actual_sample = min(sample_size, n * (n - 1) // 2) |
| pairs_i = rng.randint(0, n, size=actual_sample) |
| pairs_j = rng.randint(0, n, size=actual_sample) |
| |
| mask = pairs_i != pairs_j |
| pairs_i, pairs_j = pairs_i[mask], pairs_j[mask] |
|
|
| sims = np.sum( |
| self.engine.embeddings[pairs_i] * self.engine.embeddings[pairs_j], axis=1 |
| ) |
|
|
| percentiles = { |
| str(p): round(float(np.percentile(sims, p)), 4) |
| for p in [5, 10, 25, 50, 75, 90, 95] |
| } |
|
|
| |
| hist, bin_edges = np.histogram(sims, bins=20, range=(-1, 1)) |
| histogram = [ |
| {"bin_start": round(float(bin_edges[i]), 3), "bin_end": round(float(bin_edges[i + 1]), 3), "count": int(hist[i])} |
| for i in range(len(hist)) |
| ] |
|
|
| dist_info = { |
| "sample_size": int(len(sims)), |
| "mean": round(float(np.mean(sims)), 4), |
| "std": round(float(np.std(sims)), 4), |
| "min": round(float(np.min(sims)), 4), |
| "max": round(float(np.max(sims)), 4), |
| "percentiles": percentiles, |
| "histogram": histogram, |
| } |
|
|
| self._report.similarity_distribution = dist_info |
| return dist_info |
|
|
| |
| |
| |
|
|
| def run_full_evaluation( |
| self, |
| ground_truth: Optional[list[GroundTruthEntry]] = None, |
| candidate_meanings: Optional[dict[str, list[str]]] = None, |
| retrieval_queries: Optional[list[dict]] = None, |
| cluster_threshold: float = 0.35, |
| ) -> EvaluationReport: |
| """ |
| Run the complete evaluation pipeline. |
| |
| Args: |
| ground_truth: Labeled data for clustering and disambiguation eval. |
| candidate_meanings: Keyword -> candidate meanings for disambiguation. |
| retrieval_queries: Labeled queries for retrieval eval. |
| cluster_threshold: Clustering distance threshold. |
| |
| Returns: |
| Full EvaluationReport. |
| """ |
| logger.info("Running full evaluation pipeline...") |
| t0 = time.time() |
|
|
| |
| self.analyze_similarity_distribution() |
|
|
| if retrieval_queries: |
| self.evaluate_retrieval(retrieval_queries) |
|
|
| if ground_truth: |
| self.evaluate_clustering(ground_truth, cluster_threshold) |
| if candidate_meanings: |
| self.evaluate_disambiguation(ground_truth, candidate_meanings) |
|
|
| self._report.timing["total_eval_seconds"] = round(time.time() - t0, 3) |
| logger.info("Evaluation complete.") |
| return self._report |
|
|
| def get_report(self) -> EvaluationReport: |
| """Return the current evaluation report.""" |
| return self._report |
|
|