rahull30's picture
Clean commit: preprocessing, clustering, embedding fixes
a0c55ac
"""
utils.py β€” Helpers for caching, output formatting, and metrics reporting.
"""
import os
import json
import pickle
import re
import hashlib
import pandas as pd
import numpy as np
from pathlib import Path
from typing import Any, Optional
CACHE_DIR = Path("cache")
CACHE_DIR.mkdir(exist_ok=True)
# ─── GENERIC CACHE ────────────────────────────────────────────────────────────
def save_pickle(obj: Any, path: str):
with open(path, "wb") as f:
pickle.dump(obj, f)
def load_pickle(path: str) -> Any:
with open(path, "rb") as f:
return pickle.load(f)
def save_json(obj: Any, path: str):
with open(path, "w") as f:
json.dump(obj, f, indent=2)
def load_json(path: str) -> Any:
with open(path, "r") as f:
return json.load(f)
# ─── RESULTS BUILDER ─────────────────────────────────────────────────────────
def build_paper_results(
df: pd.DataFrame,
labels: np.ndarray,
cluster_results: dict,
) -> pd.DataFrame:
"""
Build per-paper output DataFrame:
- DOI, Title, Cluster ID, Cluster Label
"""
rows = []
for i, row in df.iterrows():
cid = int(labels[i])
if cid == -1:
label = "Noise"
elif cid in cluster_results:
label = cluster_results[cid]["final_label"]
else:
label = f"Cluster {cid}"
rows.append({
"DOI": row["DOI"],
"Title": row["Title"],
"Cluster_ID": cid,
"Cluster_Label": label,
})
return pd.DataFrame(rows)
def build_cluster_summary(
cluster_results: dict,
top_papers: dict,
coherence: dict,
silhouette: float,
tccm_results: Optional[dict] = None,
) -> pd.DataFrame:
"""
Build per-cluster summary DataFrame.
Optionally includes TCCM classification columns and top keywords if tccm_results provided.
"""
rows = []
for cid, result in cluster_results.items():
papers = top_papers.get(cid, [])
top_titles = [p["title"] for p in papers]
coh = coherence.get(cid, 0.0)
candidates = result.get("candidates", {})
winning = result.get("winning_approach", "")
council_scores = {}
if winning in candidates:
council_scores = candidates[winning]["scores"]
# TCCM + keywords
tccm = (tccm_results or {}).get(cid, {})
row = {
"Cluster_ID": cid,
"Final_Label": result["final_label"],
"Winning_Approach": winning,
"N_Papers": result.get("n_papers", 0),
"Top_Paper_1": top_titles[0] if len(top_titles) > 0 else "",
"Top_Paper_2": top_titles[1] if len(top_titles) > 1 else "",
"Top_Paper_3": top_titles[2] if len(top_titles) > 2 else "",
"Candidate_Keyword": candidates.get("keyword", {}).get("label", ""),
"Candidate_Descriptive": candidates.get("descriptive", {}).get("label", ""),
"Candidate_Concise": candidates.get("concise", {}).get("label", ""),
"Score_Semantic": council_scores.get("semantic", 0),
"Score_Keyword": council_scores.get("keyword", 0),
"Score_Clarity": council_scores.get("clarity", 0),
"Score_Final": council_scores.get("final", 0),
"Label_Confidence": result.get("label_confidence", 0),
"Cluster_Coherence": round(coh, 4),
"TCCM_Theory": tccm.get("theory", ""),
"TCCM_Context": tccm.get("context", ""),
"TCCM_Characteristics": tccm.get("characteristics", ""),
"TCCM_Methodology": tccm.get("methodology", ""),
"Top_Keywords": ", ".join(tccm.get("keywords", [])),
"Justification": result.get("justification", ""),
}
rows.append(row)
df = pd.DataFrame(rows).sort_values("Cluster_ID").reset_index(drop=True)
return df
def print_metrics_report(
silhouette: float,
coherence: dict,
cluster_results: dict,
labels: np.ndarray,
):
"""Print a formatted research metrics report."""
n_clusters = len(cluster_results)
n_noise = int(np.sum(labels == -1))
avg_coherence = float(np.mean(list(coherence.values()))) if coherence else 0
avg_confidence = float(np.mean([
r.get("label_confidence", 0) for r in cluster_results.values()
])) if cluster_results else 0
print("\n" + "=" * 60)
print("RESEARCH METRICS REPORT")
print("=" * 60)
print(f" Total Clusters: {n_clusters}")
print(f" Noise Points: {n_noise}")
print(f" Silhouette Score: {silhouette:.4f}")
print(f" Avg Cluster Coherence: {avg_coherence:.4f}")
print(f" Avg Label Confidence: {avg_confidence:.4f}")
print("=" * 60 + "\n")
def load_env():
"""Load .env file if it exists."""
env_file = Path(".env")
if env_file.exists():
for line in env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, value = line.partition("=")
os.environ.setdefault(key.strip(), value.strip())
# ─── AI COUNCIL CACHING WITH PROPER KEYS ─────────────────────────────────────
COUNCIL_CACHE_DIR = Path("cache/council")
COUNCIL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
def generate_council_cache_key(cluster_id: int, label_text: str, agent_name: str) -> str:
"""
Generate cache key including cluster_id, exact label_text, and agent_name.
This ensures different labels get different cache entries.
"""
content = f"{cluster_id}|{label_text}|{agent_name}"
return hashlib.md5(content.encode()).hexdigest()
def load_cached_score(cache_key: str) -> Optional[dict]:
"""Load cached score if it exists. Returns dict with 'normalized_score' and 'raw_score'."""
cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json"
if cache_file.exists():
try:
data = json.loads(cache_file.read_text())
# Validate structure - must have both normalized_score and raw_score
if "normalized_score" in data and "raw_score" in data:
return data
else:
# Old cache format - invalidate it
print(f"[Cache] Warning: Old cache format for {cache_key}, regenerating...")
return None
except Exception as e:
print(f"[Cache] Warning: Failed to load cache {cache_key}: {e}")
return None
return None
def save_cached_score(cache_key: str, normalized_score: float, raw_score: float) -> None:
"""Save both raw and normalized scores to cache."""
cache_file = COUNCIL_CACHE_DIR / f"{cache_key}.json"
data = {
"normalized_score": normalized_score,
"raw_score": raw_score,
}
try:
cache_file.write_text(json.dumps(data))
except Exception as e:
print(f"[Cache] Warning: Failed to save cache {cache_key}: {e}")
# ─── SCORE PARSING AND NORMALIZATION ────────────────────────────────────────
def extract_numeric_score(text: str) -> float:
"""
Extract a numeric score from LLM response.
Looks for: decimal (0.xx), percentage (xx%), or integer (0-10).
Returns float in [0.0, 1.0].
"""
text = text.strip()
# Try decimal (0.xx or 1.0)
matches = re.findall(r"\b(0\.\d+|1\.0+|1\.00+)\b", text)
if matches:
return min(max(float(matches[0]), 0.0), 1.0)
# Try percentage (XX%)
pct_matches = re.findall(r"(\d+(?:\.\d+)?)\s*%", text)
if pct_matches:
return min(max(float(pct_matches[0]) / 100, 0.0), 1.0)
# Try integer 0-10
int_matches = re.findall(r"\b([0-9]|10)\b", text)
if int_matches:
return min(max(float(int_matches[0]) / 10, 0.0), 1.0)
# If only a single float, try parsing it
try:
val = float(text)
if val > 1:
val = val / 100 # Assume percentage
return min(max(val, 0.0), 1.0)
except:
pass
print(f"[Score Parser] Warning: Could not extract score from: {text[:100]}")
return 0.5 # neutral fallback
def normalize_score(raw_score: float) -> float:
"""
Linearly re-scale a raw LLM score from the observed [0.6, 1.0] range into [0.0, 1.0].
WHY: LLMs rarely give scores below 0.6 β€” they avoid harsh penalties even for
mediocre outputs. This causes all three candidate labels to receive nearly identical
raw scores, making the winner selection meaningless. By re-scaling, we amplify the
differences between candidates so that the best label wins by a meaningful margin.
FORMULA:
normalized = (raw_score - 0.6) / 0.4 β†’ clamped to [0.0, 1.0]
EXAMPLES:
raw 0.60 β†’ normalized 0.00 (floor β€” LLM's minimum realistic score)
raw 0.70 β†’ normalized 0.25
raw 0.80 β†’ normalized 0.50
raw 0.90 β†’ normalized 0.75
raw 1.00 β†’ normalized 1.00 (ceiling β€” perfect match)
raw 0.50 β†’ clamped to 0.00 (below realistic range)
NOTE: This is a heuristic calibration, not a statistically validated mapping.
It is applied consistently to all three criteria, so relative rankings are preserved.
"""
normalized = (raw_score - 0.6) / 0.4
return max(0.0, min(normalized, 1.0))
# ─── LABEL VALIDATION ────────────────────────────────────────────────────────
def count_words(text: str) -> int:
"""Count words in a label."""
return len(text.split())
def enforce_word_limit(label: str, max_words: int) -> str:
"""
Trim label to max_words if needed.
Keeps the first N words to preserve meaning.
"""
words = label.split()
if len(words) <= max_words:
return label
trimmed = " ".join(words[:max_words])
print(f"[Label] Trimmed '{label[:50]}...' to {max_words} words")
return trimmed
def is_valid_label(label: str) -> bool:
"""
Check if label is a valid single phrase (not a list or bullet points).
"""
# Should not contain multiple lines or bullet points
if "\n" in label or "β€’" in label:
return False
# Should not be comma-separated list (unless commas are for compound terms)
comma_count = label.count(",")
word_count = count_words(label)
# More than 1 comma per 3 words suggests list-like structure
if comma_count > (word_count / 3):
return False
return True