"""Data loading and audit helpers for figures_v2.""" from __future__ import annotations import csv import re from pathlib import Path import numpy as np import pandas as pd def path_exists(path: Path) -> bool: """Explicit existence helper used by the audit and loaders.""" return Path(path).exists() def read_table(path: Path, **kwargs) -> pd.DataFrame | None: if not path_exists(path): return None return pd.read_csv(path, **kwargs) def read_edges(path: Path) -> pd.DataFrame | None: if not path_exists(path): return None return pd.read_csv(path, sep=r"\s+", header=None, engine="python") def load_dataset_degrees(root: Path) -> dict[str, np.ndarray] | None: dd = root / "data_and_docs" co = read_edges(dd / "author_file_ann.txt") cite = read_edges(dd / "paper_file_ann.txt") read = read_edges(dd / "bipartite_train_ann.txt") if co is None or cite is None or read is None: return None co_deg = np.bincount(np.concatenate([co[0].to_numpy(), co[1].to_numpy()]), minlength=6611) citation_in = np.bincount(cite[1].to_numpy(), minlength=79937) paper_read = np.bincount(read[1].to_numpy(), minlength=79937) author_read = np.bincount(read[0].to_numpy(), minlength=6611) return { "coauthor_degree": co_deg, "citation_indegree": citation_in, "paper_read_degree": paper_read, "author_read_degree": author_read, } def load_manual_metrics(root: Path) -> pd.DataFrame: path = root / "figures_v2" / "data" / "manual_metrics.csv" if path_exists(path): return pd.read_csv(path) rows = [ (0, "gnn_baseline", "GNN\nbaseline", 0.8850, np.nan, "reports"), (1, "lightgcn_ensemble", "LightGCN\nensemble", 0.938576, 0.93044, "validation_runs/dynamic_summary.csv; README"), (2, "graph_stack", "+ graph/meta-path\nstack", 0.95599, 0.95760, "validation_runs/stack_threshold_summary.csv; README"), (3, "content_bpr", "+ content\n+ BPR-MF", 0.95930, 0.95996, "reports; README"), (4, "deepwalk_node2vec", "+ DeepWalk\n/ Node2Vec", 0.96213, 0.96252, "error_group_calibration anchor; README"), (5, "rw7", "+ 7 RW\nblocks", 0.964947, np.nan, "high_order validation_summary.csv"), (6, "highorder_directed", "+ high-order\ncitation", 0.966874, 0.96626, "high_order validation_summary.csv; README"), ] return pd.DataFrame(rows, columns=["order", "stage", "label", "val_f1", "public_f1", "source"]) def high_order_summary(root: Path) -> pd.DataFrame: path = root / "validation_runs" / "dynamic_seed202" / "high_order_graph_stack" / "validation_summary.csv" df = read_table(path) if df is not None: return df return pd.DataFrame( [ ("base_highorder", 0.9642697338013148, 0.4554775357246399, 0.994052111749616, 0.9653815084086069, 0.9631605169836758, 108), ("rich_rw7", 0.9649474248055991, 0.49044686555862427, 0.9945549026665483, 0.9663869251458722, 0.9635122065590106, 190), ("rich_rw7_highorder", 0.9665557233547776, 0.46933943033218384, 0.9948903494937357, 0.9671087377501321, 0.9660033410509656, 214), ("rich_rw7_highorder_directed", 0.966873736337297, 0.46173080801963806, 0.9949182985645343, 0.9667037764040665, 0.9670437560446645, 259), ], columns=["stage", "validation_f1", "threshold", "auc", "precision", "recall", "n_features"], ) def rw_ensemble_metrics(root: Path) -> tuple[list[int], list[float], list[str]]: rw = root / "validation_runs" / "dynamic_seed202" / "randomwalk_systematic" sources: list[str] = [] single_df = read_table(rw / "small_ablation_table.csv") e5_df = read_table(rw / "ensemble_5_ablation.csv") e7_df = read_table(rw / "ensemble_7_ablation.csv") if single_df is None or e5_df is None or e7_df is None: return [1, 5, 7], [0.96310, 0.96393, 0.96492], ["reported fallback values"] sources = [str(rw / "small_ablation_table.csv"), str(rw / "ensemble_5_ablation.csv"), str(rw / "ensemble_7_ablation.csv")] return [1, 5, 7], [ float(single_df["validation_F1"].max()), float(e5_df["validation_F1"].iloc[0]), float(e7_df["validation_F1"].iloc[0]), ], sources def load_npy(path: Path) -> np.ndarray | None: if not path_exists(path): return None return np.load(path) def pr_curve(y_true: np.ndarray, scores: np.ndarray) -> tuple[np.ndarray, np.ndarray, float]: order = np.argsort(-scores) y = y_true[order].astype(float) tp = np.cumsum(y) fp = np.cumsum(1.0 - y) precision = tp / np.maximum(tp + fp, 1.0) recall = tp / max(float(y.sum()), 1.0) precision = np.r_[1.0, precision] recall = np.r_[0.0, recall] ap = float(np.sum((recall[1:] - recall[:-1]) * precision[1:])) return recall, precision, ap def numeric_bucket_key(bucket: str) -> float: toks = re.findall(r"-?inf|-?\d+(?:\.\d+)?(?:e-?\d+)?", str(bucket)) if not toks: return 0.0 val = toks[0] if val == "-inf": return -1e18 if val == "inf": return 1e18 return float(val) def inventory_files(root: Path, out_csv: Path) -> pd.DataFrame: watched = [ "README.md", "CLAUDE.md", "WORKSPACE_STATUS.md", "reports/final_report.md", "reports/exploration_summary.md", "reports/preliminary_report.md", "notes/experiment_history.md", "data_and_docs", "validation_runs", "cached_scores", "submissions", "code", "figures_paper", ] rows = [] for rel in watched: p = root / rel if not path_exists(p): rows.append({"path": rel, "exists": False, "kind": "missing", "size_bytes": ""}) continue if p.is_file(): rows.append({"path": rel, "exists": True, "kind": "file", "size_bytes": p.stat().st_size}) else: for child in p.rglob("*"): if child.is_file(): rows.append( { "path": child.relative_to(root).as_posix(), "exists": True, "kind": "file", "size_bytes": child.stat().st_size, } ) df = pd.DataFrame(rows) out_csv.parent.mkdir(parents=True, exist_ok=True) df.to_csv(out_csv, index=False, quoting=csv.QUOTE_MINIMAL) return df