""" Command-line entry point. Usage ----- # Redact (Lite or Pro) python -m anonymisation.cli redact \\ --variant {lite|pro} \\ [--ner spacy|hf|finetuned] \\ [--k-target 5] [--max-iterations 5] \\ [--pseudonymise] [--vault-out PATH] \\ [--json] [--mosaic-haystack tab] \\ FILE # Round-trip an LLM answer back through the pseudonym vault python -m anonymisation.cli restore \\ --vault PATH \\ FILE The defaults pick spaCy as the NER provider so the CLI runs out of the box on a Phase-1 install. Pass `--ner finetuned` once you have a Phase-2 trained model in `phase2_baseline_comparison/checkpoints/roberta-tab/final/`. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import List, Tuple from .pipeline import LitePipeline, MosaicScorer, ProPipeline, restore # ----------------------------------------------------------------------- # NER provider construction (lazy imports — these deps may be Phase-2-only) # ----------------------------------------------------------------------- def _spacy_predictor(model_name: str = "en_core_web_trf"): import spacy from .mapping import SPACY_TO_TAB nlp = spacy.load(model_name) def predict(text: str) -> List[Tuple[int, int, str, str]]: doc = nlp(text) return [ (ent.start_char, ent.end_char, SPACY_TO_TAB[ent.label_], ent.text) for ent in doc.ents if ent.label_ in SPACY_TO_TAB ] return predict def _hf_predictor(model_name: str = "dslim/bert-base-NER"): from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline from .predictors import make_hf_predictor tok = AutoTokenizer.from_pretrained(model_name) model = AutoModelForTokenClassification.from_pretrained(model_name) pipe = pipeline("ner", model=model, tokenizer=tok, aggregation_strategy="simple", device=-1) return make_hf_predictor(pipe) def _finetuned_predictor(model_dir: str): from transformers import AutoModelForTokenClassification, AutoTokenizer from .predictors import make_finetuned_predictor from .device import best_device tok = AutoTokenizer.from_pretrained(model_dir, add_prefix_space=True) model = AutoModelForTokenClassification.from_pretrained(model_dir) device, _ = best_device() return make_finetuned_predictor(model, tok, device=device) def build_ner_provider(choice: str, model_path: str | None): if choice == "spacy": return _spacy_predictor(model_path or "en_core_web_trf") if choice == "hf": return _hf_predictor(model_path or "dslim/bert-base-NER") if choice == "finetuned": if not model_path: raise SystemExit( "--ner finetuned requires --ner-model PATH " "(e.g. phase2_baseline_comparison/checkpoints/roberta-tab/final)" ) return _finetuned_predictor(model_path) raise SystemExit(f"unknown --ner choice: {choice}") # ----------------------------------------------------------------------- # Mosaic haystack # ----------------------------------------------------------------------- def build_scorer(choice: str) -> MosaicScorer: if choice == "tab": from .data import load_tab ds = load_tab() return MosaicScorer.from_tab(list(ds["test"])) if choice == "empty": return MosaicScorer.empty() raise SystemExit(f"unknown --mosaic-haystack: {choice}") # ----------------------------------------------------------------------- # Main # ----------------------------------------------------------------------- def cmd_redact(args: argparse.Namespace) -> int: # Read input if args.file == "-": text = sys.stdin.read() else: text = Path(args.file).read_text() # Build NER + (optional) scorer ner = build_ner_provider(args.ner, args.ner_model) if args.variant == "lite": pipeline = LitePipeline( ner_provider=ner, run_regex=not args.no_regex, coref_extend=not args.no_coref, pseudonymise=args.pseudonymise, ) else: scorer = build_scorer(args.mosaic_haystack) pipeline = ProPipeline( ner_provider=ner, scorer=scorer, k_target=args.k_target, max_iterations=args.max_iterations, run_regex=not args.no_regex, coref_extend=not args.no_coref, pseudonymise=args.pseudonymise, ) result = pipeline(text) if args.json: print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False)) else: print(result.redacted_text) if args.variant == "pro": print( f"\n# mosaic risk: k_initial={result.mosaic_risk_initial} " f"→ k_final={result.mosaic_risk_final} " f"(target k≥{args.k_target}, iterations={result.iterations_used}, " f"converged={result.converged})", file=sys.stderr, ) # Persist the pseudonym vault if asked if args.pseudonymise and args.vault_out: Path(args.vault_out).write_text( json.dumps(result.pseudonym_vault, indent=2, ensure_ascii=False) ) print( f"# pseudonym vault written to {args.vault_out} " f"({len(result.pseudonym_vault)} entries)", file=sys.stderr, ) elif args.pseudonymise and not args.json: # No file specified — emit vault on stderr so stdout stays clean print("\n# pseudonym vault:", file=sys.stderr) for token, original in result.pseudonym_vault.items(): print(f"# {token} -> {original!r}", file=sys.stderr) return 0 def cmd_restore(args: argparse.Namespace) -> int: """Take a pseudonymised text + vault and restore the original surface forms.""" text = sys.stdin.read() if args.file == "-" else Path(args.file).read_text() vault = json.loads(Path(args.vault).read_text()) if not isinstance(vault, dict): raise SystemExit(f"vault file {args.vault} did not parse as a JSON object") print(restore(text, vault), end="") return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(prog="anonymisation", description=__doc__) sub = parser.add_subparsers(dest="cmd", required=True) p_redact = sub.add_parser("redact", help="Redact a document") p_redact.add_argument("file", help="Input file path, or '-' for stdin") p_redact.add_argument( "--variant", choices=["lite", "pro"], default="lite", help="Lite = DIRECT-only; Pro = DIRECT + mosaic-aware QUASI generalization.", ) p_redact.add_argument("--ner", choices=["spacy", "hf", "finetuned"], default="spacy") p_redact.add_argument("--ner-model", default=None, help="Model name or path. Defaults to en_core_web_trf for spacy.") p_redact.add_argument("--no-regex", action="store_true", help="Disable the regex post-pass.") p_redact.add_argument("--no-coref", action="store_true", help="Disable the coreference extension pass (Phase 5).") p_redact.add_argument("--k-target", type=int, default=5, help="Pro only: target k-anonymity (default 5).") p_redact.add_argument("--max-iterations", type=int, default=5, help="Pro only: max generalization iterations (default 5).") p_redact.add_argument("--mosaic-haystack", choices=["tab", "empty"], default="tab", help="Pro only: source of the mosaic comparison corpus.") p_redact.add_argument("--pseudonymise", "--pseudonymize", action="store_true", help=("Use referential tokens ([PERSON_A], [PERSON_B], …) " "instead of plain [TYPE] tags. Pair with --vault-out " "to save the mapping for round-trip restore.")) p_redact.add_argument("--vault-out", default=None, help="Path to write the pseudonym vault as JSON. Implies --pseudonymise.") p_redact.add_argument("--json", action="store_true", help="Output the full audit log as JSON instead of just the text.") p_redact.set_defaults(func=cmd_redact) p_restore = sub.add_parser( "restore", help="Round-trip a pseudonymised text back to original surface forms.", ) p_restore.add_argument("file", help="Path to redacted text, or '-' for stdin") p_restore.add_argument("--vault", required=True, help="Path to the pseudonym vault JSON produced by `redact --pseudonymise`.") p_restore.set_defaults(func=cmd_restore) args = parser.parse_args(argv) # If --vault-out is set, --pseudonymise is implied if hasattr(args, "vault_out") and args.vault_out and not args.pseudonymise: args.pseudonymise = True return args.func(args) if __name__ == "__main__": raise SystemExit(main())