Spaces:
Running
Running
| """ | |
| utils.py β Helpers for caching, output formatting, and metrics reporting. | |
| """ | |
| import os | |
| import json | |
| import pickle | |
| import re | |
| import hashlib | |
| import pandas as pd | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Any, Optional | |
| CACHE_DIR = Path("cache") | |
| CACHE_DIR.mkdir(exist_ok=True) | |
| # βββ GENERIC CACHE ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_pickle(obj: Any, path: str): | |
| with open(path, "wb") as f: | |
| pickle.dump(obj, f) | |
| def load_pickle(path: str) -> Any: | |
| with open(path, "rb") as f: | |
| return pickle.load(f) | |
| def save_json(obj: Any, path: str): | |
| with open(path, "w") as f: | |
| json.dump(obj, f, indent=2) | |
| def load_json(path: str) -> Any: | |
| with open(path, "r") as f: | |
| return json.load(f) | |
| # βββ RESULTS BUILDER βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_paper_results( | |
| df: pd.DataFrame, | |
| labels: np.ndarray, | |
| cluster_results: dict, | |
| ) -> pd.DataFrame: | |
| """ | |
| Build per-paper output DataFrame: | |
| - DOI, Title, Cluster ID, Cluster Label | |
| """ | |
| rows = [] | |
| for i, row in df.iterrows(): | |
| cid = int(labels[i]) | |
| if cid == -1: | |
| label = "Noise" | |
| elif cid in cluster_results: | |
| label = cluster_results[cid]["final_label"] | |
| else: | |
| label = f"Cluster {cid}" | |
| rows.append({ | |
| "DOI": row["DOI"], | |
| "Title": row["Title"], | |
| "Cluster_ID": cid, | |
| "Cluster_Label": label, | |
| }) | |
| return pd.DataFrame(rows) | |
| def build_cluster_summary( | |
| cluster_results: dict, | |
| top_papers: dict, | |
| coherence: dict, | |
| silhouette: float, | |
| tccm_results: Optional[dict] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Build per-cluster summary DataFrame. | |
| Optionally includes TCCM classification columns and top keywords if tccm_results provided. | |
| """ | |
| rows = [] | |
| for cid, result in cluster_results.items(): | |
| papers = top_papers.get(cid, []) | |
| top_titles = [p["title"] for p in papers] | |
| coh = coherence.get(cid, 0.0) | |
| candidates = result.get("candidates", {}) | |
| winning = result.get("winning_approach", "") | |
| council_scores = {} | |
| if winning in candidates: | |
| council_scores = candidates[winning]["scores"] | |
| # TCCM + keywords | |
| tccm = (tccm_results or {}).get(cid, {}) | |
| row = { | |
| "Cluster_ID": cid, | |
| "Final_Label": result["final_label"], | |
| "Winning_Approach": winning, | |
| "N_Papers": result.get("n_papers", 0), | |
| "Top_Paper_1": top_titles[0] if len(top_titles) > 0 else "", | |
| "Top_Paper_2": top_titles[1] if len(top_titles) > 1 else "", | |
| "Top_Paper_3": top_titles[2] if len(top_titles) > 2 else "", | |
| "Candidate_Keyword": candidates.get("keyword", {}).get("label", ""), | |
| "Candidate_Descriptive": candidates.get("descriptive", {}).get("label", ""), | |
| "Candidate_Concise": candidates.get("concise", {}).get("label", ""), | |
| "Score_Semantic": council_scores.get("semantic", 0), | |
| "Score_Keyword": council_scores.get("keyword", 0), | |
| "Score_Clarity": council_scores.get("clarity", 0), | |
| "Score_Final": council_scores.get("final", 0), | |
| "Label_Confidence": result.get("label_confidence", 0), | |
| "Cluster_Coherence": round(coh, 4), | |
| "TCCM_Theory": tccm.get("theory", ""), | |
| "TCCM_Context": tccm.get("context", ""), | |
| "TCCM_Characteristics": tccm.get("characteristics", ""), | |
| "TCCM_Methodology": tccm.get("methodology", ""), | |
| "Top_Keywords": ", ".join(tccm.get("keywords", [])), | |
| "Justification": result.get("justification", ""), | |
| } | |
| rows.append(row) | |
| df = pd.DataFrame(rows).sort_values("Cluster_ID").reset_index(drop=True) | |
| return df | |
| def print_metrics_report( | |
| silhouette: float, | |
| coherence: dict, | |
| cluster_results: dict, | |
| labels: np.ndarray, | |
| ): | |
| """Print a formatted research metrics report.""" | |
| n_clusters = len(cluster_results) | |
| n_noise = int(np.sum(labels == -1)) | |
| avg_coherence = float(np.mean(list(coherence.values()))) if coherence else 0 | |
| avg_confidence = float(np.mean([ | |
| r.get("label_confidence", 0) for r in cluster_results.values() | |
| ])) if cluster_results else 0 | |
| print("\n" + "=" * 60) | |
| print("RESEARCH METRICS REPORT") | |
| print("=" * 60) | |
| print(f" Total Clusters: {n_clusters}") | |
| print(f" Noise Points: {n_noise}") | |
| print(f" Silhouette Score: {silhouette:.4f}") | |
| print(f" Avg Cluster Coherence: {avg_coherence:.4f}") | |
| print(f" Avg Label Confidence: {avg_confidence:.4f}") | |
| print("=" * 60 + "\n") | |
| def load_env(): | |
| """Load .env file if it exists.""" | |
| env_file = Path(".env") | |
| if env_file.exists(): | |
| for line in env_file.read_text().splitlines(): | |
| line = line.strip() | |
| if line and not line.startswith("#") and "=" in line: | |
| key, _, value = line.partition("=") | |
| os.environ.setdefault(key.strip(), value.strip()) | |
| # βββ AI COUNCIL CACHING WITH PROPER KEYS βββββββββββββββββββββββββββββββββββββ | |
| COUNCIL_CACHE_DIR = Path("cache/council") | |
| COUNCIL_CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| def generate_council_cache_key(cluster_id: int, label_text: str, agent_name: str) -> str: | |
| """ | |
| Generate cache key including cluster_id, exact label_text, and agent_name. | |
| This ensures different labels get different cache entries. | |
| """ | |
| content = f"{cluster_id}|{label_text}|{agent_name}" | |
| return hashlib.md5(content.encode()).hexdigest() | |
| def load_cached_score(cache_key: str) -> Optional[dict]: | |
| """Load cached score if it exists. Returns dict with 'normalized_score' and 'raw_score'.""" | |
| cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json" | |
| if cache_file.exists(): | |
| try: | |
| data = json.loads(cache_file.read_text()) | |
| # Validate structure - must have both normalized_score and raw_score | |
| if "normalized_score" in data and "raw_score" in data: | |
| return data | |
| else: | |
| # Old cache format - invalidate it | |
| print(f"[Cache] Warning: Old cache format for {cache_key}, regenerating...") | |
| return None | |
| except Exception as e: | |
| print(f"[Cache] Warning: Failed to load cache {cache_key}: {e}") | |
| return None | |
| return None | |
| def save_cached_score(cache_key: str, normalized_score: float, raw_score: float) -> None: | |
| """Save both raw and normalized scores to cache.""" | |
| cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json" | |
| data = { | |
| "normalized_score": normalized_score, | |
| "raw_score": raw_score, | |
| } | |
| try: | |
| cache_file.write_text(json.dumps(data)) | |
| except Exception as e: | |
| print(f"[Cache] Warning: Failed to save cache {cache_key}: {e}") | |
| # βββ SCORE PARSING AND NORMALIZATION ββββββββββββββββββββββββββββββββββββββββ | |
| def extract_numeric_score(text: str) -> float: | |
| """ | |
| Extract a numeric score from LLM response. | |
| Looks for: decimal (0.xx), percentage (xx%), or integer (0-10). | |
| Returns float in [0.0, 1.0]. | |
| """ | |
| text = text.strip() | |
| # Try decimal (0.xx or 1.0) | |
| matches = re.findall(r"\b(0\.\d+|1\.0+|1\.00+)\b", text) | |
| if matches: | |
| return min(max(float(matches[0]), 0.0), 1.0) | |
| # Try percentage (XX%) | |
| pct_matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text) | |
| if pct_matches: | |
| return min(max(float(pct_matches[0]) / 100, 0.0), 1.0) | |
| # Try integer 0-10 | |
| int_matches = re.findall(r"\b([0-9]|10)\b", text) | |
| if int_matches: | |
| return min(max(float(int_matches[0]) / 10, 0.0), 1.0) | |
| # If only a single float, try parsing it | |
| try: | |
| val = float(text) | |
| if val > 1: | |
| val = val / 100 # Assume percentage | |
| return min(max(val, 0.0), 1.0) | |
| except: | |
| pass | |
| print(f"[Score Parser] Warning: Could not extract score from: {text[:100]}") | |
| return 0.5 # neutral fallback | |
| def normalize_score(raw_score: float) -> float: | |
| """ | |
| Linearly re-scale a raw LLM score from the observed [0.6, 1.0] range into [0.0, 1.0]. | |
| WHY: LLMs rarely give scores below 0.6 β they avoid harsh penalties even for | |
| mediocre outputs. This causes all three candidate labels to receive nearly identical | |
| raw scores, making the winner selection meaningless. By re-scaling, we amplify the | |
| differences between candidates so that the best label wins by a meaningful margin. | |
| FORMULA: | |
| normalized = (raw_score - 0.6) / 0.4 β clamped to [0.0, 1.0] | |
| EXAMPLES: | |
| raw 0.60 β normalized 0.00 (floor β LLM's minimum realistic score) | |
| raw 0.70 β normalized 0.25 | |
| raw 0.80 β normalized 0.50 | |
| raw 0.90 β normalized 0.75 | |
| raw 1.00 β normalized 1.00 (ceiling β perfect match) | |
| raw 0.50 β clamped to 0.00 (below realistic range) | |
| NOTE: This is a heuristic calibration, not a statistically validated mapping. | |
| It is applied consistently to all three criteria, so relative rankings are preserved. | |
| """ | |
| normalized = (raw_score - 0.6) / 0.4 | |
| return max(0.0, min(normalized, 1.0)) | |
| # βββ LABEL VALIDATION ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def count_words(text: str) -> int: | |
| """Count words in a label.""" | |
| return len(text.split()) | |
| def enforce_word_limit(label: str, max_words: int) -> str: | |
| """ | |
| Trim label to max_words if needed. | |
| Keeps the first N words to preserve meaning. | |
| """ | |
| words = label.split() | |
| if len(words) <= max_words: | |
| return label | |
| trimmed = " ".join(words[:max_words]) | |
| print(f"[Label] Trimmed '{label[:50]}...' to {max_words} words") | |
| return trimmed | |
| def is_valid_label(label: str) -> bool: | |
| """ | |
| Check if label is a valid single phrase (not a list or bullet points). | |
| """ | |
| # Should not contain multiple lines or bullet points | |
| if "\n" in label or "β’" in label: | |
| return False | |
| # Should not be comma-separated list (unless commas are for compound terms) | |
| comma_count = label.count(",") | |
| word_count = count_words(label) | |
| # More than 1 comma per 3 words suggests list-like structure | |
| if comma_count > (word_count / 3): | |
| return False | |
| return True | |