| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import sys |
| from collections import Counter |
| from pathlib import Path |
|
|
| from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, f1_score |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) |
| from core import heuristic_classify |
|
|
|
|
| def load_rows(path: str): |
| for line in Path(path).read_text(encoding="utf-8").splitlines(): |
| if line.strip(): |
| yield json.loads(line) |
|
|
|
|
| def transformer_predictions(texts, model_id: str): |
| """Optional transformer predictions. Returns ``None`` if unavailable.""" |
| try: |
| from transformers import pipeline |
| except Exception: |
| return None |
| try: |
| pipe = pipeline("text-classification", model=model_id, tokenizer=model_id, top_k=1) |
| except Exception as exc: |
| print(f"[evaluate_baseline] Transformer unavailable: {exc}") |
| return None |
| preds = [] |
| for t in texts: |
| out = pipe(t[:512]) |
| |
| if isinstance(out, list) and out and isinstance(out[0], list): |
| out = out[0] |
| preds.append(str(out[0]["label"]).lower().replace("label_", "")) |
| return preds |
|
|
|
|
| def evaluate(y_true, y_pred, labels): |
| return { |
| "accuracy": accuracy_score(y_true, y_pred), |
| "macro_f1": f1_score(y_true, y_pred, labels=labels, average="macro", zero_division=0), |
| "classification_report": classification_report( |
| y_true, y_pred, labels=labels, output_dict=True, zero_division=0 |
| ), |
| "confusion_matrix": confusion_matrix(y_true, y_pred, labels=labels).tolist(), |
| } |
|
|
|
|
| def _length_bucket(n: int) -> str: |
| if n < 50: |
| return "xs (<50)" |
| if n < 100: |
| return "s (50-99)" |
| if n < 200: |
| return "m (100-199)" |
| if n < 400: |
| return "l (200-399)" |
| return "xl (>=400)" |
|
|
|
|
| def bucketed_metrics(rows, y_true, y_pred, labels, key): |
| """Group accuracy + macro-F1 by a row attribute (e.g. difficulty, length bucket).""" |
| buckets: dict[str, list[int]] = {} |
| for i, r in enumerate(rows): |
| if key == "length_bucket": |
| bucket = _length_bucket(int(r.get("text_length") or r.get("length") or 0)) |
| else: |
| bucket = str(r.get(key, "unknown")) |
| buckets.setdefault(bucket, []).append(i) |
| out = {} |
| for bucket, idxs in sorted(buckets.items()): |
| yt = [y_true[i] for i in idxs] |
| yp = [y_pred[i] for i in idxs] |
| out[bucket] = { |
| "n": len(idxs), |
| "accuracy": accuracy_score(yt, yp), |
| "macro_f1": f1_score(yt, yp, labels=labels, average="macro", zero_division=0), |
| } |
| return out |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser(description="Evaluate the heuristic baseline (and optionally a Transformer model).") |
| ap.add_argument("--data", default="data/cipher_examples.jsonl") |
| ap.add_argument("--out", default="reports/baseline_metrics.json") |
| ap.add_argument( |
| "--sample", type=int, default=None, |
| help="Randomly sample this many rows (stratified per label) for a quick evaluation.", |
| ) |
| ap.add_argument("--seed", type=int, default=42) |
| ap.add_argument( |
| "--model", |
| default=os.getenv("CIPHER_MODEL_ID", ""), |
| help="Optional Hugging Face model ID or local path. If set, results are reported alongside the heuristic.", |
| ) |
| args = ap.parse_args() |
|
|
| rows = list(load_rows(args.data)) |
|
|
| if args.sample and args.sample < len(rows): |
| import random as _random |
| _random.seed(args.seed) |
| |
| from collections import defaultdict as _dd |
| by_label: dict = _dd(list) |
| for r in rows: |
| by_label[r["label"]].append(r) |
| sampled: list = [] |
| per_label = max(1, args.sample // len(by_label)) |
| for lbl_rows in by_label.values(): |
| k = min(per_label, len(lbl_rows)) |
| sampled.extend(_random.sample(lbl_rows, k)) |
| rows = sampled |
| print(f"Sampled {len(rows)} rows ({per_label} per label, {len(by_label)} labels)") |
|
|
| texts = [r["text"] for r in rows] |
| y_true = [r["label"] for r in rows] |
| labels = sorted(set(y_true)) |
|
|
| y_pred_heur = [heuristic_classify(t).label for t in texts] |
| |
| y_pred_heur = [p if p in labels else "plaintext" for p in y_pred_heur] |
|
|
| heuristic_block = evaluate(y_true, y_pred_heur, labels) |
| heuristic_block["by_difficulty"] = bucketed_metrics(rows, y_true, y_pred_heur, labels, "difficulty") |
| heuristic_block["by_length"] = bucketed_metrics(rows, y_true, y_pred_heur, labels, "length_bucket") |
|
|
| report = { |
| "dataset": { |
| "path": args.data, |
| "size": len(rows), |
| "labels": labels, |
| "label_distribution": dict(Counter(y_true)), |
| }, |
| "heuristic": heuristic_block, |
| "note": ( |
| "Heuristic baseline is intentionally transparent and imperfect. " |
| "Use it as a comparison point for the Transformer model. None of these " |
| "metrics imply real-world cryptanalytic capability." |
| ), |
| } |
|
|
| if args.model: |
| ml_preds = transformer_predictions(texts, args.model) |
| if ml_preds is not None: |
| ml_preds = [p if p in labels else "plaintext" for p in ml_preds] |
| ml_block = evaluate(y_true, ml_preds, labels) |
| ml_block["by_difficulty"] = bucketed_metrics(rows, y_true, ml_preds, labels, "difficulty") |
| ml_block["by_length"] = bucketed_metrics(rows, y_true, ml_preds, labels, "length_bucket") |
| report["transformer"] = {"model_id": args.model, **ml_block} |
|
|
| out = Path(args.out) |
| out.parent.mkdir(parents=True, exist_ok=True) |
| out.write_text(json.dumps(report, indent=2), encoding="utf-8") |
| summary = {"out": str(out), "heuristic_accuracy": report["heuristic"]["accuracy"]} |
| if "transformer" in report: |
| summary["transformer_accuracy"] = report["transformer"]["accuracy"] |
| print(json.dumps(summary, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|