| |
| """End-to-end pipeline: CL-native macro discovery + expansion + classification. |
| |
| 1. For each library, runs SBCL with generate.lisp to produce verified JSONL |
| 2. Merges with macro definitions from Phase 1 extractions |
| 3. Classifies and builds train/val/test splits |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import subprocess |
| import sys |
| from pathlib import Path |
|
|
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) |
|
|
| from cl_macros.classifier import classify_all, quality_report, detect_techniques, detect_category, assess_complexity, assess_capture_risk |
| from cl_macros.dataset import build_dataset, save_dataset, build_training_record |
| from cl_macros.ext.library_index import LibraryIndex |
| from cl_macros.knowledge_base import ALL_EXAMPLES |
| from cl_macros.schema import ( |
| TransformationExample, |
| Complexity, |
| Source, |
| ) |
|
|
| SBCL = "sbcl" |
| GENERATE_LISP = Path(__file__).parent.parent / "src" / "cl_macros" / "verif" / "generate.lisp" |
| OUTPUT_DIR = Path("data/generated") |
|
|
|
|
| def run_sbcl(lib_name: str, output_path: Path) -> bool: |
| """Run SBCL with generate.lisp for one library.""" |
| result = subprocess.run( |
| [ |
| SBCL, "--noinform", "--non-interactive", |
| "--load", str(GENERATE_LISP), |
| "--eval", f'(lol-gen:main "{lib_name}" "{output_path}")', |
| ], |
| capture_output=True, text=True, timeout=300, |
| ) |
| return result.returncode == 0 |
|
|
|
|
| def load_macro_defs(lib_name: str) -> dict[str, str]: |
| """Load macro definitions from Phase 1 extraction files.""" |
| ext_path = Path("data/extractions") / f"{lib_name}_extractions.jsonl" |
| defs = {} |
| if ext_path.exists(): |
| with open(ext_path) as f: |
| for line in f: |
| rec = json.loads(line.strip()) |
| name = rec.get("macro_name", "") |
| full = rec.get("macro_definition", "") |
| if name and full: |
| defs[name.upper()] = full |
| return defs |
|
|
|
|
| def source_for_lib(lib_name: str) -> Source: |
| """Map library name to Source enum.""" |
| mapping = { |
| "alexandria": Source.ALEXANDRIA, |
| "serapeum": Source.SERAPEUM, |
| "anaphora": Source.ANAPHORA, |
| "iterate": Source.ITERATE, |
| "trivia": Source.TRIVIA, |
| "arrow-macros": Source.ARROW_MACROS, |
| "modf": Source.MODF, |
| "access": Source.ACCESS, |
| "for": Source.FOR, |
| "cl-interpol": Source.CL_INTERPOL, |
| "screamer": Source.SCREAMER, |
| "coalton": Source.COALTON, |
| "nhooks": Source.NHOOKS, |
| "generic-cl": Source.GENERIC_CL, |
| } |
| return mapping.get(lib_name, Source.OTHER_LIBRARY) |
|
|
|
|
| def build_examples(records: list[dict], lib_name: str) -> list[TransformationExample]: |
| """Convert raw SBCL output to TransformationExample objects.""" |
| macro_defs = load_macro_defs(lib_name) |
| examples = [] |
| seen = set() |
|
|
| for rec in records: |
| if rec.get("status") != "verified": |
| continue |
|
|
| macro_name = rec["macro_name"] |
| call_form = rec["call_form"] |
| expanded = rec.get("expanded", "") |
|
|
| |
| key = (macro_name, expanded) |
| if key in seen: |
| continue |
| seen.add(key) |
|
|
| |
| if call_form.strip() == expanded.strip(): |
| continue |
|
|
| |
| macro_def = macro_defs.get(macro_name.upper(), f"(defmacro {macro_name} ...)") |
|
|
| ex = TransformationExample( |
| id=f"{lib_name}-{macro_name}-{len(examples)}", |
| before_code=call_form, |
| problem_pattern=f"Macro call that should be transformed by {macro_name}", |
| macro_definition=macro_def, |
| after_expansion=expanded, |
| macro_category=None, |
| technique=[], |
| source=source_for_lib(lib_name), |
| complexity=Complexity.BASIC, |
| library_name=lib_name, |
| macro_name=macro_name, |
| is_verified=True, |
| macroexpand_1_result=expanded, |
| formulation="macro-from-usage", |
| ) |
| examples.append(ex) |
|
|
| return examples |
|
|
|
|
| def main(): |
| import argparse |
| ap = argparse.ArgumentParser() |
| ap.add_argument("--tier", default="tier1") |
| ap.add_argument("--library", default=None) |
| args = ap.parse_args() |
|
|
| idx = LibraryIndex() |
| if args.library: |
| libs = [idx.get(args.library)] |
| else: |
| libs = idx.list_libraries(args.tier) |
|
|
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| all_examples = list(ALL_EXAMPLES) |
| stats = {} |
|
|
| for lib in libs: |
| print(f"\n--- {lib.name} ({lib.tier}) ---") |
| output_path = OUTPUT_DIR / f"{lib.name}_generated.jsonl" |
|
|
| if not output_path.exists(): |
| print(f" Running SBCL...") |
| if not run_sbcl(lib.name, output_path): |
| print(f" SBCL FAILED for {lib.name}") |
| continue |
|
|
| |
| with open(output_path) as f: |
| records = [json.loads(line) for line in f if line.strip()] |
|
|
| verified = sum(1 for r in records if r.get("status") == "verified") |
| errors = sum(1 for r in records if r.get("status") != "verified") |
| print(f" Records: {len(records)} ({verified} verified, {errors} errors)") |
|
|
| examples = build_examples(records, lib.name) |
| print(f" Valid examples: {len(examples)}") |
| all_examples.extend(examples) |
| stats[lib.name] = len(examples) |
|
|
| |
| print(f"\n=== Classification ===") |
| print(f"Total examples: {len(all_examples)}") |
| classified = classify_all(all_examples) |
| report = quality_report(classified) |
| print(f"Mean score: {report['mean_score']:.3f}") |
| print(f"Categories: {report['category_distribution']}") |
|
|
| |
| print(f"\n=== Building splits ===") |
| train, val, test = build_dataset(classified, min_quality=0.3) |
| print(f"Train: {len(train)}, Val: {len(val)}, Test: {len(test)}") |
|
|
| split_dir = Path("data/splits") |
| paths = save_dataset(train, val, test, split_dir) |
| for name, p in paths.items(): |
| print(f" {name}: {p}") |
|
|
| |
| stat_path = OUTPUT_DIR / "generation_stats.json" |
| with open(stat_path, "w") as f: |
| json.dump({"stats": stats, "report": report}, f, indent=2) |
| print(f"\nStats: {stat_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|