Willxo's picture
Space deployment
5c05bce
"""
Command-line entry point.
Usage
-----
# Redact (Lite or Pro)
python -m anonymisation.cli redact \\
--variant {lite|pro} \\
[--ner spacy|hf|finetuned] \\
[--k-target 5] [--max-iterations 5] \\
[--pseudonymise] [--vault-out PATH] \\
[--json] [--mosaic-haystack tab] \\
FILE
# Round-trip an LLM answer back through the pseudonym vault
python -m anonymisation.cli restore \\
--vault PATH \\
FILE
The defaults pick spaCy as the NER provider so the CLI runs out of the box
on a Phase-1 install. Pass `--ner finetuned` once you have a Phase-2 trained
model in `phase2_baseline_comparison/checkpoints/roberta-tab/final/`.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import List, Tuple
from .pipeline import LitePipeline, MosaicScorer, ProPipeline, restore
# -----------------------------------------------------------------------
# NER provider construction (lazy imports — these deps may be Phase-2-only)
# -----------------------------------------------------------------------
def _spacy_predictor(model_name: str = "en_core_web_trf"):
import spacy
from .mapping import SPACY_TO_TAB
nlp = spacy.load(model_name)
def predict(text: str) -> List[Tuple[int, int, str, str]]:
doc = nlp(text)
return [
(ent.start_char, ent.end_char, SPACY_TO_TAB[ent.label_], ent.text)
for ent in doc.ents
if ent.label_ in SPACY_TO_TAB
]
return predict
def _hf_predictor(model_name: str = "dslim/bert-base-NER"):
from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline
from .predictors import make_hf_predictor
tok = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)
pipe = pipeline("ner", model=model, tokenizer=tok, aggregation_strategy="simple", device=-1)
return make_hf_predictor(pipe)
def _finetuned_predictor(model_dir: str):
from transformers import AutoModelForTokenClassification, AutoTokenizer
from .predictors import make_finetuned_predictor
from .device import best_device
tok = AutoTokenizer.from_pretrained(model_dir, add_prefix_space=True)
model = AutoModelForTokenClassification.from_pretrained(model_dir)
device, _ = best_device()
return make_finetuned_predictor(model, tok, device=device)
def build_ner_provider(choice: str, model_path: str | None):
if choice == "spacy":
return _spacy_predictor(model_path or "en_core_web_trf")
if choice == "hf":
return _hf_predictor(model_path or "dslim/bert-base-NER")
if choice == "finetuned":
if not model_path:
raise SystemExit(
"--ner finetuned requires --ner-model PATH "
"(e.g. phase2_baseline_comparison/checkpoints/roberta-tab/final)"
)
return _finetuned_predictor(model_path)
raise SystemExit(f"unknown --ner choice: {choice}")
# -----------------------------------------------------------------------
# Mosaic haystack
# -----------------------------------------------------------------------
def build_scorer(choice: str) -> MosaicScorer:
if choice == "tab":
from .data import load_tab
ds = load_tab()
return MosaicScorer.from_tab(list(ds["test"]))
if choice == "empty":
return MosaicScorer.empty()
raise SystemExit(f"unknown --mosaic-haystack: {choice}")
# -----------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------
def cmd_redact(args: argparse.Namespace) -> int:
# Read input
if args.file == "-":
text = sys.stdin.read()
else:
text = Path(args.file).read_text()
# Build NER + (optional) scorer
ner = build_ner_provider(args.ner, args.ner_model)
if args.variant == "lite":
pipeline = LitePipeline(
ner_provider=ner,
run_regex=not args.no_regex,
coref_extend=not args.no_coref,
pseudonymise=args.pseudonymise,
)
else:
scorer = build_scorer(args.mosaic_haystack)
pipeline = ProPipeline(
ner_provider=ner,
scorer=scorer,
k_target=args.k_target,
max_iterations=args.max_iterations,
run_regex=not args.no_regex,
coref_extend=not args.no_coref,
pseudonymise=args.pseudonymise,
)
result = pipeline(text)
if args.json:
print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False))
else:
print(result.redacted_text)
if args.variant == "pro":
print(
f"\n# mosaic risk: k_initial={result.mosaic_risk_initial} "
f"→ k_final={result.mosaic_risk_final} "
f"(target k≥{args.k_target}, iterations={result.iterations_used}, "
f"converged={result.converged})",
file=sys.stderr,
)
# Persist the pseudonym vault if asked
if args.pseudonymise and args.vault_out:
Path(args.vault_out).write_text(
json.dumps(result.pseudonym_vault, indent=2, ensure_ascii=False)
)
print(
f"# pseudonym vault written to {args.vault_out} "
f"({len(result.pseudonym_vault)} entries)",
file=sys.stderr,
)
elif args.pseudonymise and not args.json:
# No file specified — emit vault on stderr so stdout stays clean
print("\n# pseudonym vault:", file=sys.stderr)
for token, original in result.pseudonym_vault.items():
print(f"# {token} -> {original!r}", file=sys.stderr)
return 0
def cmd_restore(args: argparse.Namespace) -> int:
"""Take a pseudonymised text + vault and restore the original surface forms."""
text = sys.stdin.read() if args.file == "-" else Path(args.file).read_text()
vault = json.loads(Path(args.vault).read_text())
if not isinstance(vault, dict):
raise SystemExit(f"vault file {args.vault} did not parse as a JSON object")
print(restore(text, vault), end="")
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="anonymisation", description=__doc__)
sub = parser.add_subparsers(dest="cmd", required=True)
p_redact = sub.add_parser("redact", help="Redact a document")
p_redact.add_argument("file", help="Input file path, or '-' for stdin")
p_redact.add_argument(
"--variant", choices=["lite", "pro"], default="lite",
help="Lite = DIRECT-only; Pro = DIRECT + mosaic-aware QUASI generalization.",
)
p_redact.add_argument("--ner", choices=["spacy", "hf", "finetuned"], default="spacy")
p_redact.add_argument("--ner-model", default=None,
help="Model name or path. Defaults to en_core_web_trf for spacy.")
p_redact.add_argument("--no-regex", action="store_true",
help="Disable the regex post-pass.")
p_redact.add_argument("--no-coref", action="store_true",
help="Disable the coreference extension pass (Phase 5).")
p_redact.add_argument("--k-target", type=int, default=5,
help="Pro only: target k-anonymity (default 5).")
p_redact.add_argument("--max-iterations", type=int, default=5,
help="Pro only: max generalization iterations (default 5).")
p_redact.add_argument("--mosaic-haystack", choices=["tab", "empty"], default="tab",
help="Pro only: source of the mosaic comparison corpus.")
p_redact.add_argument("--pseudonymise", "--pseudonymize", action="store_true",
help=("Use referential tokens ([PERSON_A], [PERSON_B], …) "
"instead of plain [TYPE] tags. Pair with --vault-out "
"to save the mapping for round-trip restore."))
p_redact.add_argument("--vault-out", default=None,
help="Path to write the pseudonym vault as JSON. Implies --pseudonymise.")
p_redact.add_argument("--json", action="store_true",
help="Output the full audit log as JSON instead of just the text.")
p_redact.set_defaults(func=cmd_redact)
p_restore = sub.add_parser(
"restore",
help="Round-trip a pseudonymised text back to original surface forms.",
)
p_restore.add_argument("file", help="Path to redacted text, or '-' for stdin")
p_restore.add_argument("--vault", required=True,
help="Path to the pseudonym vault JSON produced by `redact --pseudonymise`.")
p_restore.set_defaults(func=cmd_restore)
args = parser.parse_args(argv)
# If --vault-out is set, --pseudonymise is implied
if hasattr(args, "vault_out") and args.vault_out and not args.pseudonymise:
args.pseudonymise = True
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())