""" utils.py — Helpers for caching, output formatting, and metrics reporting. """ import os import json import pickle import re import hashlib import pandas as pd import numpy as np from pathlib import Path from typing import Any, Optional CACHE_DIR = Path("cache") CACHE_DIR.mkdir(exist_ok=True) # ─── GENERIC CACHE ──────────────────────────────────────────────────────────── def save_pickle(obj: Any, path: str): with open(path, "wb") as f: pickle.dump(obj, f) def load_pickle(path: str) -> Any: with open(path, "rb") as f: return pickle.load(f) def save_json(obj: Any, path: str): with open(path, "w") as f: json.dump(obj, f, indent=2) def load_json(path: str) -> Any: with open(path, "r") as f: return json.load(f) # ─── RESULTS BUILDER ───────────────────────────────────────────────────────── def build_paper_results( df: pd.DataFrame, labels: np.ndarray, cluster_results: dict, ) -> pd.DataFrame: """ Build per-paper output DataFrame: - DOI, Title, Cluster ID, Cluster Label """ rows = [] for i, row in df.iterrows(): cid = int(labels[i]) if cid == -1: label = "Noise" elif cid in cluster_results: label = cluster_results[cid]["final_label"] else: label = f"Cluster {cid}" rows.append({ "DOI": row["DOI"], "Title": row["Title"], "Cluster_ID": cid, "Cluster_Label": label, }) return pd.DataFrame(rows) def build_cluster_summary( cluster_results: dict, top_papers: dict, coherence: dict, silhouette: float, tccm_results: Optional[dict] = None, ) -> pd.DataFrame: """ Build per-cluster summary DataFrame. Optionally includes TCCM classification columns and top keywords if tccm_results provided. """ rows = [] for cid, result in cluster_results.items(): papers = top_papers.get(cid, []) top_titles = [p["title"] for p in papers] coh = coherence.get(cid, 0.0) candidates = result.get("candidates", {}) winning = result.get("winning_approach", "") council_scores = {} if winning in candidates: council_scores = candidates[winning]["scores"] # TCCM + keywords tccm = (tccm_results or {}).get(cid, {}) row = { "Cluster_ID": cid, "Final_Label": result["final_label"], "Winning_Approach": winning, "N_Papers": result.get("n_papers", 0), "Top_Paper_1": top_titles[0] if len(top_titles) > 0 else "", "Top_Paper_2": top_titles[1] if len(top_titles) > 1 else "", "Top_Paper_3": top_titles[2] if len(top_titles) > 2 else "", "Candidate_Keyword": candidates.get("keyword", {}).get("label", ""), "Candidate_Descriptive": candidates.get("descriptive", {}).get("label", ""), "Candidate_Concise": candidates.get("concise", {}).get("label", ""), "Score_Semantic": council_scores.get("semantic", 0), "Score_Keyword": council_scores.get("keyword", 0), "Score_Clarity": council_scores.get("clarity", 0), "Score_Final": council_scores.get("final", 0), "Label_Confidence": result.get("label_confidence", 0), "Cluster_Coherence": round(coh, 4), "TCCM_Theory": tccm.get("theory", ""), "TCCM_Context": tccm.get("context", ""), "TCCM_Characteristics": tccm.get("characteristics", ""), "TCCM_Methodology": tccm.get("methodology", ""), "Top_Keywords": ", ".join(tccm.get("keywords", [])), "Justification": result.get("justification", ""), } rows.append(row) df = pd.DataFrame(rows).sort_values("Cluster_ID").reset_index(drop=True) return df def print_metrics_report( silhouette: float, coherence: dict, cluster_results: dict, labels: np.ndarray, ): """Print a formatted research metrics report.""" n_clusters = len(cluster_results) n_noise = int(np.sum(labels == -1)) avg_coherence = float(np.mean(list(coherence.values()))) if coherence else 0 avg_confidence = float(np.mean([ r.get("label_confidence", 0) for r in cluster_results.values() ])) if cluster_results else 0 print("\n" + "=" * 60) print("RESEARCH METRICS REPORT") print("=" * 60) print(f" Total Clusters: {n_clusters}") print(f" Noise Points: {n_noise}") print(f" Silhouette Score: {silhouette:.4f}") print(f" Avg Cluster Coherence: {avg_coherence:.4f}") print(f" Avg Label Confidence: {avg_confidence:.4f}") print("=" * 60 + "\n") def load_env(): """Load .env file if it exists.""" env_file = Path(".env") if env_file.exists(): for line in env_file.read_text().splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: key, _, value = line.partition("=") os.environ.setdefault(key.strip(), value.strip()) # ─── AI COUNCIL CACHING WITH PROPER KEYS ───────────────────────────────────── COUNCIL_CACHE_DIR = Path("cache/council") COUNCIL_CACHE_DIR.mkdir(parents=True, exist_ok=True) def generate_council_cache_key(cluster_id: int, label_text: str, agent_name: str) -> str: """ Generate cache key including cluster_id, exact label_text, and agent_name. This ensures different labels get different cache entries. """ content = f"{cluster_id}|{label_text}|{agent_name}" return hashlib.md5(content.encode()).hexdigest() def load_cached_score(cache_key: str) -> Optional[dict]: """Load cached score if it exists. Returns dict with 'normalized_score' and 'raw_score'.""" cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json" if cache_file.exists(): try: data = json.loads(cache_file.read_text()) # Validate structure - must have both normalized_score and raw_score if "normalized_score" in data and "raw_score" in data: return data else: # Old cache format - invalidate it print(f"[Cache] Warning: Old cache format for {cache_key}, regenerating...") return None except Exception as e: print(f"[Cache] Warning: Failed to load cache {cache_key}: {e}") return None return None def save_cached_score(cache_key: str, normalized_score: float, raw_score: float) -> None: """Save both raw and normalized scores to cache.""" cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json" data = { "normalized_score": normalized_score, "raw_score": raw_score, } try: cache_file.write_text(json.dumps(data)) except Exception as e: print(f"[Cache] Warning: Failed to save cache {cache_key}: {e}") # ─── SCORE PARSING AND NORMALIZATION ──────────────────────────────────────── def extract_numeric_score(text: str) -> float: """ Extract a numeric score from LLM response. Looks for: decimal (0.xx), percentage (xx%), or integer (0-10). Returns float in [0.0, 1.0]. """ text = text.strip() # Try decimal (0.xx or 1.0) matches = re.findall(r"\b(0\.\d+|1\.0+|1\.00+)\b", text) if matches: return min(max(float(matches[0]), 0.0), 1.0) # Try percentage (XX%) pct_matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text) if pct_matches: return min(max(float(pct_matches[0]) / 100, 0.0), 1.0) # Try integer 0-10 int_matches = re.findall(r"\b([0-9]|10)\b", text) if int_matches: return min(max(float(int_matches[0]) / 10, 0.0), 1.0) # If only a single float, try parsing it try: val = float(text) if val > 1: val = val / 100 # Assume percentage return min(max(val, 0.0), 1.0) except: pass print(f"[Score Parser] Warning: Could not extract score from: {text[:100]}") return 0.5 # neutral fallback def normalize_score(raw_score: float) -> float: """ Linearly re-scale a raw LLM score from the observed [0.6, 1.0] range into [0.0, 1.0]. WHY: LLMs rarely give scores below 0.6 — they avoid harsh penalties even for mediocre outputs. This causes all three candidate labels to receive nearly identical raw scores, making the winner selection meaningless. By re-scaling, we amplify the differences between candidates so that the best label wins by a meaningful margin. FORMULA: normalized = (raw_score - 0.6) / 0.4 → clamped to [0.0, 1.0] EXAMPLES: raw 0.60 → normalized 0.00 (floor — LLM's minimum realistic score) raw 0.70 → normalized 0.25 raw 0.80 → normalized 0.50 raw 0.90 → normalized 0.75 raw 1.00 → normalized 1.00 (ceiling — perfect match) raw 0.50 → clamped to 0.00 (below realistic range) NOTE: This is a heuristic calibration, not a statistically validated mapping. It is applied consistently to all three criteria, so relative rankings are preserved. """ normalized = (raw_score - 0.6) / 0.4 return max(0.0, min(normalized, 1.0)) # ─── LABEL VALIDATION ──────────────────────────────────────────────────────── def count_words(text: str) -> int: """Count words in a label.""" return len(text.split()) def enforce_word_limit(label: str, max_words: int) -> str: """ Trim label to max_words if needed. Keeps the first N words to preserve meaning. """ words = label.split() if len(words) <= max_words: return label trimmed = " ".join(words[:max_words]) print(f"[Label] Trimmed '{label[:50]}...' to {max_words} words") return trimmed def is_valid_label(label: str) -> bool: """ Check if label is a valid single phrase (not a list or bullet points). """ # Should not contain multiple lines or bullet points if "\n" in label or "•" in label: return False # Should not be comma-separated list (unless commas are for compound terms) comma_count = label.count(",") word_count = count_words(label) # More than 1 comma per 3 words suggests list-like structure if comma_count > (word_count / 3): return False return True