File size: 9,144 Bytes
5c05bce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
"""
Command-line entry point.

Usage
-----
    # Redact (Lite or Pro)
    python -m anonymisation.cli redact \\
        --variant {lite|pro} \\
        [--ner spacy|hf|finetuned] \\
        [--k-target 5] [--max-iterations 5] \\
        [--pseudonymise] [--vault-out PATH] \\
        [--json] [--mosaic-haystack tab] \\
        FILE

    # Round-trip an LLM answer back through the pseudonym vault
    python -m anonymisation.cli restore \\
        --vault PATH \\
        FILE

The defaults pick spaCy as the NER provider so the CLI runs out of the box
on a Phase-1 install. Pass `--ner finetuned` once you have a Phase-2 trained
model in `phase2_baseline_comparison/checkpoints/roberta-tab/final/`.
"""
from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path
from typing import List, Tuple

from .pipeline import LitePipeline, MosaicScorer, ProPipeline, restore


# -----------------------------------------------------------------------
# NER provider construction (lazy imports — these deps may be Phase-2-only)
# -----------------------------------------------------------------------
def _spacy_predictor(model_name: str = "en_core_web_trf"):
    import spacy
    from .mapping import SPACY_TO_TAB

    nlp = spacy.load(model_name)

    def predict(text: str) -> List[Tuple[int, int, str, str]]:
        doc = nlp(text)
        return [
            (ent.start_char, ent.end_char, SPACY_TO_TAB[ent.label_], ent.text)
            for ent in doc.ents
            if ent.label_ in SPACY_TO_TAB
        ]
    return predict


def _hf_predictor(model_name: str = "dslim/bert-base-NER"):
    from transformers import AutoModelForTokenClassification, AutoTokenizer, pipeline
    from .predictors import make_hf_predictor

    tok = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(model_name)
    pipe = pipeline("ner", model=model, tokenizer=tok, aggregation_strategy="simple", device=-1)
    return make_hf_predictor(pipe)


def _finetuned_predictor(model_dir: str):
    from transformers import AutoModelForTokenClassification, AutoTokenizer
    from .predictors import make_finetuned_predictor
    from .device import best_device

    tok = AutoTokenizer.from_pretrained(model_dir, add_prefix_space=True)
    model = AutoModelForTokenClassification.from_pretrained(model_dir)
    device, _ = best_device()
    return make_finetuned_predictor(model, tok, device=device)


def build_ner_provider(choice: str, model_path: str | None):
    if choice == "spacy":
        return _spacy_predictor(model_path or "en_core_web_trf")
    if choice == "hf":
        return _hf_predictor(model_path or "dslim/bert-base-NER")
    if choice == "finetuned":
        if not model_path:
            raise SystemExit(
                "--ner finetuned requires --ner-model PATH "
                "(e.g. phase2_baseline_comparison/checkpoints/roberta-tab/final)"
            )
        return _finetuned_predictor(model_path)
    raise SystemExit(f"unknown --ner choice: {choice}")


# -----------------------------------------------------------------------
# Mosaic haystack
# -----------------------------------------------------------------------
def build_scorer(choice: str) -> MosaicScorer:
    if choice == "tab":
        from .data import load_tab
        ds = load_tab()
        return MosaicScorer.from_tab(list(ds["test"]))
    if choice == "empty":
        return MosaicScorer.empty()
    raise SystemExit(f"unknown --mosaic-haystack: {choice}")


# -----------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------
def cmd_redact(args: argparse.Namespace) -> int:
    # Read input
    if args.file == "-":
        text = sys.stdin.read()
    else:
        text = Path(args.file).read_text()

    # Build NER + (optional) scorer
    ner = build_ner_provider(args.ner, args.ner_model)

    if args.variant == "lite":
        pipeline = LitePipeline(
            ner_provider=ner,
            run_regex=not args.no_regex,
            coref_extend=not args.no_coref,
            pseudonymise=args.pseudonymise,
        )
    else:
        scorer = build_scorer(args.mosaic_haystack)
        pipeline = ProPipeline(
            ner_provider=ner,
            scorer=scorer,
            k_target=args.k_target,
            max_iterations=args.max_iterations,
            run_regex=not args.no_regex,
            coref_extend=not args.no_coref,
            pseudonymise=args.pseudonymise,
        )

    result = pipeline(text)

    if args.json:
        print(json.dumps(result.to_dict(), indent=2, ensure_ascii=False))
    else:
        print(result.redacted_text)
        if args.variant == "pro":
            print(
                f"\n# mosaic risk: k_initial={result.mosaic_risk_initial} "
                f"→ k_final={result.mosaic_risk_final} "
                f"(target k≥{args.k_target}, iterations={result.iterations_used}, "
                f"converged={result.converged})",
                file=sys.stderr,
            )

    # Persist the pseudonym vault if asked
    if args.pseudonymise and args.vault_out:
        Path(args.vault_out).write_text(
            json.dumps(result.pseudonym_vault, indent=2, ensure_ascii=False)
        )
        print(
            f"# pseudonym vault written to {args.vault_out} "
            f"({len(result.pseudonym_vault)} entries)",
            file=sys.stderr,
        )
    elif args.pseudonymise and not args.json:
        # No file specified — emit vault on stderr so stdout stays clean
        print("\n# pseudonym vault:", file=sys.stderr)
        for token, original in result.pseudonym_vault.items():
            print(f"#   {token} -> {original!r}", file=sys.stderr)
    return 0


def cmd_restore(args: argparse.Namespace) -> int:
    """Take a pseudonymised text + vault and restore the original surface forms."""
    text = sys.stdin.read() if args.file == "-" else Path(args.file).read_text()
    vault = json.loads(Path(args.vault).read_text())
    if not isinstance(vault, dict):
        raise SystemExit(f"vault file {args.vault} did not parse as a JSON object")
    print(restore(text, vault), end="")
    return 0


def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(prog="anonymisation", description=__doc__)
    sub = parser.add_subparsers(dest="cmd", required=True)

    p_redact = sub.add_parser("redact", help="Redact a document")
    p_redact.add_argument("file", help="Input file path, or '-' for stdin")
    p_redact.add_argument(
        "--variant", choices=["lite", "pro"], default="lite",
        help="Lite = DIRECT-only; Pro = DIRECT + mosaic-aware QUASI generalization.",
    )
    p_redact.add_argument("--ner", choices=["spacy", "hf", "finetuned"], default="spacy")
    p_redact.add_argument("--ner-model", default=None,
                          help="Model name or path. Defaults to en_core_web_trf for spacy.")
    p_redact.add_argument("--no-regex", action="store_true",
                          help="Disable the regex post-pass.")
    p_redact.add_argument("--no-coref", action="store_true",
                          help="Disable the coreference extension pass (Phase 5).")
    p_redact.add_argument("--k-target", type=int, default=5,
                          help="Pro only: target k-anonymity (default 5).")
    p_redact.add_argument("--max-iterations", type=int, default=5,
                          help="Pro only: max generalization iterations (default 5).")
    p_redact.add_argument("--mosaic-haystack", choices=["tab", "empty"], default="tab",
                          help="Pro only: source of the mosaic comparison corpus.")
    p_redact.add_argument("--pseudonymise", "--pseudonymize", action="store_true",
                          help=("Use referential tokens ([PERSON_A], [PERSON_B], …) "
                                "instead of plain [TYPE] tags. Pair with --vault-out "
                                "to save the mapping for round-trip restore."))
    p_redact.add_argument("--vault-out", default=None,
                          help="Path to write the pseudonym vault as JSON. Implies --pseudonymise.")
    p_redact.add_argument("--json", action="store_true",
                          help="Output the full audit log as JSON instead of just the text.")
    p_redact.set_defaults(func=cmd_redact)

    p_restore = sub.add_parser(
        "restore",
        help="Round-trip a pseudonymised text back to original surface forms.",
    )
    p_restore.add_argument("file", help="Path to redacted text, or '-' for stdin")
    p_restore.add_argument("--vault", required=True,
                           help="Path to the pseudonym vault JSON produced by `redact --pseudonymise`.")
    p_restore.set_defaults(func=cmd_restore)

    args = parser.parse_args(argv)
    # If --vault-out is set, --pseudonymise is implied
    if hasattr(args, "vault_out") and args.vault_out and not args.pseudonymise:
        args.pseudonymise = True
    return args.func(args)


if __name__ == "__main__":
    raise SystemExit(main())