#!/usr/bin/env python3 """ Generic CSV Redaction Script Usage Examples: python redact_generic.py input.csv python redact_generic.py input.csv --output out.csv --columns log_message python redact_generic.py input.csv --full-mask Features: - Default set of regex rules (URL, IP, email, phone number, ID card, long number, device ID, App name) - Supports column-specific redaction (by column name or index), defaults to processing all text columns - Provides partial masking (default) or full replacement options """ import re import csv import argparse from pathlib import Path from typing import List, Pattern def build_rules(full_mask: bool = False): # Returns (name, pattern, repl_or_callable) rules = [] # URLs rules.append(("URL", re.compile(r'https?://\S+'), '[REDACTED_URL]')) # IPv4 rules.append(("IP", re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), '[REDACTED_IP]')) # email rules.append(("EMAIL", re.compile(r'\b[\w.-]+@[\w.-]+\.[A-Za-z]{2,}\b'), '[REDACTED_EMAIL]')) # Chinese mobile number (1XXXXXXXXXX) if full_mask: rules.append(("MOBILE", re.compile(r'\b1[3-9]\d{9}\b'), '[REDACTED_MOBILE]')) else: rules.append(("MOBILE", re.compile(r'\b(1[3-9]\d)(\d{4})(\d{4})\b'), lambda m: m.group(1) + '****' + m.group(3))) # Chinese ID card (15 or 18) if full_mask: rules.append(("IDCARD", re.compile(r'\b\d{15}(?:\d{2}[0-9Xx])?\b'), '[REDACTED_ID]')) else: rules.append(("IDCARD", re.compile(r'\b(\d{3})\d+(\d{4}[0-9Xx]?)\b'), lambda m: m.group(1) + '****' + m.group(2))) # Long numeric sequences (likely account/serial) - conservative: 10+ digits rules.append(("LONGNUM", re.compile(r'\b\d{10,}\b'), '[REDACTED_NUMBER]')) # Device serials seen in samples like sb096-251 or sa111-010 # Pattern: letters+digits-hyphen-digits (conservative) rules.append(("DEVICE_ID", re.compile(r'\b[a-zA-Z]{1,5}\d{1,4}-\d{1,4}\b'), '[REDACTED_DEVICE]')) # App names ending with App/APP rules.append(("APPNAME", re.compile(r'\b[A-Za-z0-9_]{2,}App\b', re.IGNORECASE), '[REDACTED_APP]')) # Short codes (4-6 digits) - optional and may be noisy; comment out by default # rules.append(("CODE", re.compile(r'\b\d{4,6}\b'), '[REDACTED_CODE]')) return rules def redact_text(s: str, rules) -> str: if s is None: return s out = s for name, patt, repl in rules: try: if callable(repl): out = patt.sub(repl, out) else: out = patt.sub(repl, out) except re.error: # skip problematic patterns continue return out def process_csv(input_path: Path, output_path: Path, columns: List[str], full_mask: bool): rules = build_rules(full_mask=full_mask) with input_path.open('r', encoding='utf-8', errors='ignore', newline='') as fr, \ output_path.open('w', encoding='utf-8', newline='') as fw: reader = csv.reader(fr) writer = csv.writer(fw) try: header = next(reader) except StopIteration: return writer.writerow(header) # Map columns: if user provided column names, find indices; supports numeric indices as strings col_indices = None if columns: col_indices = [] for c in columns: # if it's integer-like, treat as index if c.isdigit(): idx = int(c) if idx < len(header): col_indices.append(idx) else: if c in header: col_indices.append(header.index(c)) # If no specific columns, process all cells for row in reader: if not row: writer.writerow(row) continue new_row = list(row) if col_indices is None: # apply to all fields for i, cell in enumerate(row): if cell and any(ch.isalpha() or ch.isdigit() for ch in cell): new_row[i] = redact_text(cell, rules) else: for i in col_indices: if i < len(row): new_row[i] = redact_text(row[i], rules) writer.writerow(new_row) def main(): ap = argparse.ArgumentParser(description='Generic CSV Redaction Tool') ap.add_argument('input', help='Input CSV file') ap.add_argument('--output', '-o', help='Output CSV file (default in the same directory as input with .redacted.csv suffix)') ap.add_argument('--columns', '-c', nargs='+', help='Column names or indices to redact (indices start from 0). If not specified, apply to all columns') ap.add_argument('--full-mask', action='store_true', help='Use full replacement instead of partial masking (for phone numbers, ID cards, etc.)') args = ap.parse_args() inp = Path(args.input) if not inp.exists(): print('Input file does not exist:', inp) return out = Path(args.output) if args.output else inp.with_suffix('.redacted.csv') process_csv(inp, out, args.columns or [], args.full_mask) print('Redacted file written to:', out) if __name__ == '__main__': main()