Spaces:
Build error
Build error
File size: 9,144 Bytes
5c05bce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 | """
Command-line entry point.
Usage
-----
# Redact (Lite or Pro)
python -m anonymisation.cli redact \\
--variant {lite|pro} \\
[--ner spacy|hf|finetuned] \\
[--k-target 5] [--max-iterations 5] \\
[--pseudonymise] [--vault-out PATH] \\
[--json] [--mosaic-haystack tab] \\
FILE
# Round-trip an LLM answer back through the pseudonym vault
python -m anonymisation.cli restore \\
--vault PATH \\
FILE
The defaults pick spaCy as the NER provider so the CLI runs out of the box
on a Phase-1 install. Pass `--ner finetuned` once you have a Phase-2 trained
model in `phase2_baseline_comparison/checkpoints/roberta-tab/final/`.
"""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import List, Tuple
from .pipeline import LitePipeline, MosaicScorer, ProPipeline, restore
# -----------------------------------------------------------------------
# NER provider construction (lazy imports — these deps may be Phase-2-only)
# -----------------------------------------------------------------------
def _spacy_predictor(model_name: str = "en_core_web_trf"):
import spacy
from .mapping import SPACY_TO_TAB
nlp = spacy.load(model_name)
def predict(text: str) -> List[Tuple[int, int, str, str]]:
doc = nlp(text)
return [
(ent.start_char, ent.end_char, SPACY_TO_TAB[ent.label_], ent.text)
for ent in doc.ents
if ent.label_ in SPACY_TO_TAB
]
return predict
def _hf_predictor(model_name: str = "dslim/bert-base-NER"):
from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline
from .predictors import make_hf_predictor
tok = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForTokenClassification.from_pretrained(model_name)
pipe = pipeline("ner", model=model, tokenizer=tok, aggregation_strategy="simple", device=-1)
return make_hf_predictor(pipe)
def _finetuned_predictor(model_dir: str):
from transformers import AutoModelForTokenClassification, AutoTokenizer
from .predictors import make_finetuned_predictor
from .device import best_device
tok = AutoTokenizer.from_pretrained(model_dir, add_prefix_space=True)
model = AutoModelForTokenClassification.from_pretrained(model_dir)
device, _ = best_device()
return make_finetuned_predictor(model, tok, device=device)
def build_ner_provider(choice: str, model_path: str | None):
if choice == "spacy":
return _spacy_predictor(model_path or "en_core_web_trf")
if choice == "hf":
return _hf_predictor(model_path or "dslim/bert-base-NER")
if choice == "finetuned":
if not model_path:
raise SystemExit(
"--ner finetuned requires --ner-model PATH "
"(e.g. phase2_baseline_comparison/checkpoints/roberta-tab/final)"
)
return _finetuned_predictor(model_path)
raise SystemExit(f"unknown --ner choice: {choice}")
# -----------------------------------------------------------------------
# Mosaic haystack
# -----------------------------------------------------------------------
def build_scorer(choice: str) -> MosaicScorer:
if choice == "tab":
from .data import load_tab
ds = load_tab()
return MosaicScorer.from_tab(list(ds["test"]))
if choice == "empty":
return MosaicScorer.empty()
raise SystemExit(f"unknown --mosaic-haystack: {choice}")
# -----------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------
def cmd_redact(args: argparse.Namespace) -> int:
# Read input
if args.file == "-":
text = sys.stdin.read()
else:
text = Path(args.file).read_text()
# Build NER + (optional) scorer
ner = build_ner_provider(args.ner, args.ner_model)
if args.variant == "lite":
pipeline = LitePipeline(
ner_provider=ner,
run_regex=not args.no_regex,
coref_extend=not args.no_coref,
pseudonymise=args.pseudonymise,
)
else:
scorer = build_scorer(args.mosaic_haystack)
pipeline = ProPipeline(
ner_provider=ner,
scorer=scorer,
k_target=args.k_target,
max_iterations=args.max_iterations,
run_regex=not args.no_regex,
coref_extend=not args.no_coref,
pseudonymise=args.pseudonymise,
)
result = pipeline(text)
if args.json:
print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False))
else:
print(result.redacted_text)
if args.variant == "pro":
print(
f"\n# mosaic risk: k_initial={result.mosaic_risk_initial} "
f"→ k_final={result.mosaic_risk_final} "
f"(target k≥{args.k_target}, iterations={result.iterations_used}, "
f"converged={result.converged})",
file=sys.stderr,
)
# Persist the pseudonym vault if asked
if args.pseudonymise and args.vault_out:
Path(args.vault_out).write_text(
json.dumps(result.pseudonym_vault, indent=2, ensure_ascii=False)
)
print(
f"# pseudonym vault written to {args.vault_out} "
f"({len(result.pseudonym_vault)} entries)",
file=sys.stderr,
)
elif args.pseudonymise and not args.json:
# No file specified — emit vault on stderr so stdout stays clean
print("\n# pseudonym vault:", file=sys.stderr)
for token, original in result.pseudonym_vault.items():
print(f"# {token} -> {original!r}", file=sys.stderr)
return 0
def cmd_restore(args: argparse.Namespace) -> int:
"""Take a pseudonymised text + vault and restore the original surface forms."""
text = sys.stdin.read() if args.file == "-" else Path(args.file).read_text()
vault = json.loads(Path(args.vault).read_text())
if not isinstance(vault, dict):
raise SystemExit(f"vault file {args.vault} did not parse as a JSON object")
print(restore(text, vault), end="")
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(prog="anonymisation", description=__doc__)
sub = parser.add_subparsers(dest="cmd", required=True)
p_redact = sub.add_parser("redact", help="Redact a document")
p_redact.add_argument("file", help="Input file path, or '-' for stdin")
p_redact.add_argument(
"--variant", choices=["lite", "pro"], default="lite",
help="Lite = DIRECT-only; Pro = DIRECT + mosaic-aware QUASI generalization.",
)
p_redact.add_argument("--ner", choices=["spacy", "hf", "finetuned"], default="spacy")
p_redact.add_argument("--ner-model", default=None,
help="Model name or path. Defaults to en_core_web_trf for spacy.")
p_redact.add_argument("--no-regex", action="store_true",
help="Disable the regex post-pass.")
p_redact.add_argument("--no-coref", action="store_true",
help="Disable the coreference extension pass (Phase 5).")
p_redact.add_argument("--k-target", type=int, default=5,
help="Pro only: target k-anonymity (default 5).")
p_redact.add_argument("--max-iterations", type=int, default=5,
help="Pro only: max generalization iterations (default 5).")
p_redact.add_argument("--mosaic-haystack", choices=["tab", "empty"], default="tab",
help="Pro only: source of the mosaic comparison corpus.")
p_redact.add_argument("--pseudonymise", "--pseudonymize", action="store_true",
help=("Use referential tokens ([PERSON_A], [PERSON_B], …) "
"instead of plain [TYPE] tags. Pair with --vault-out "
"to save the mapping for round-trip restore."))
p_redact.add_argument("--vault-out", default=None,
help="Path to write the pseudonym vault as JSON. Implies --pseudonymise.")
p_redact.add_argument("--json", action="store_true",
help="Output the full audit log as JSON instead of just the text.")
p_redact.set_defaults(func=cmd_redact)
p_restore = sub.add_parser(
"restore",
help="Round-trip a pseudonymised text back to original surface forms.",
)
p_restore.add_argument("file", help="Path to redacted text, or '-' for stdin")
p_restore.add_argument("--vault", required=True,
help="Path to the pseudonym vault JSON produced by `redact --pseudonymise`.")
p_restore.set_defaults(func=cmd_restore)
args = parser.parse_args(argv)
# If --vault-out is set, --pseudonymise is implied
if hasattr(args, "vault_out") and args.vault_out and not args.pseudonymise:
args.pseudonymise = True
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())
|