Spaces:
No application file
No application file
| #!/usr/bin/env python3 | |
| """ | |
| Generic CSV Redaction Script | |
| Usage Examples: | |
| python redact_generic.py input.csv | |
| python redact_generic.py input.csv --output out.csv --columns log_message | |
| python redact_generic.py input.csv --full-mask | |
| Features: | |
| - Default set of regex rules (URL, IP, email, phone number, ID card, long number, device ID, App name) | |
| - Supports column-specific redaction (by column name or index), defaults to processing all text columns | |
| - Provides partial masking (default) or full replacement options | |
| """ | |
| import re | |
| import csv | |
| import argparse | |
| from pathlib import Path | |
| from typing import List, Pattern | |
| def build_rules(full_mask: bool = False): | |
| # Returns (name, pattern, repl_or_callable) | |
| rules = [] | |
| # URLs | |
| rules.append(("URL", re.compile(r'https?://\S+'), '[REDACTED_URL]')) | |
| # IPv4 | |
| rules.append(("IP", re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), '[REDACTED_IP]')) | |
| rules.append(("EMAIL", re.compile(r'\b[\w.-]+@[\w.-]+\.[A-Za-z]{2,}\b'), '[REDACTED_EMAIL]')) | |
| # Chinese mobile number (1XXXXXXXXXX) | |
| if full_mask: | |
| rules.append(("MOBILE", re.compile(r'\b1[3-9]\d{9}\b'), '[REDACTED_MOBILE]')) | |
| else: | |
| rules.append(("MOBILE", re.compile(r'\b(1[3-9]\d)(\d{4})(\d{4})\b'), lambda m: m.group(1) + '****' + m.group(3))) | |
| # Chinese ID card (15 or 18) | |
| if full_mask: | |
| rules.append(("IDCARD", re.compile(r'\b\d{15}(?:\d{2}[0-9Xx])?\b'), '[REDACTED_ID]')) | |
| else: | |
| rules.append(("IDCARD", re.compile(r'\b(\d{3})\d+(\d{4}[0-9Xx]?)\b'), lambda m: m.group(1) + '****' + m.group(2))) | |
| # Long numeric sequences (likely account/serial) - conservative: 10+ digits | |
| rules.append(("LONGNUM", re.compile(r'\b\d{10,}\b'), '[REDACTED_NUMBER]')) | |
| # Device serials seen in samples like sb096-251 or sa111-010 | |
| # Pattern: letters+digits-hyphen-digits (conservative) | |
| rules.append(("DEVICE_ID", re.compile(r'\b[a-zA-Z]{1,5}\d{1,4}-\d{1,4}\b'), '[REDACTED_DEVICE]')) | |
| # App names ending with App/APP | |
| rules.append(("APPNAME", re.compile(r'\b[A-Za-z0-9_]{2,}App\b', re.IGNORECASE), '[REDACTED_APP]')) | |
| # Short codes (4-6 digits) - optional and may be noisy; comment out by default | |
| # rules.append(("CODE", re.compile(r'\b\d{4,6}\b'), '[REDACTED_CODE]')) | |
| return rules | |
| def redact_text(s: str, rules) -> str: | |
| if s is None: | |
| return s | |
| out = s | |
| for name, patt, repl in rules: | |
| try: | |
| if callable(repl): | |
| out = patt.sub(repl, out) | |
| else: | |
| out = patt.sub(repl, out) | |
| except re.error: | |
| # skip problematic patterns | |
| continue | |
| return out | |
| def process_csv(input_path: Path, output_path: Path, columns: List[str], full_mask: bool): | |
| rules = build_rules(full_mask=full_mask) | |
| with input_path.open('r', encoding='utf-8', errors='ignore', newline='') as fr, \ | |
| output_path.open('w', encoding='utf-8', newline='') as fw: | |
| reader = csv.reader(fr) | |
| writer = csv.writer(fw) | |
| try: | |
| header = next(reader) | |
| except StopIteration: | |
| return | |
| writer.writerow(header) | |
| # Map columns: if user provided column names, find indices; supports numeric indices as strings | |
| col_indices = None | |
| if columns: | |
| col_indices = [] | |
| for c in columns: | |
| # if it's integer-like, treat as index | |
| if c.isdigit(): | |
| idx = int(c) | |
| if idx < len(header): | |
| col_indices.append(idx) | |
| else: | |
| if c in header: | |
| col_indices.append(header.index(c)) | |
| # If no specific columns, process all cells | |
| for row in reader: | |
| if not row: | |
| writer.writerow(row) | |
| continue | |
| new_row = list(row) | |
| if col_indices is None: | |
| # apply to all fields | |
| for i, cell in enumerate(row): | |
| if cell and any(ch.isalpha() or ch.isdigit() for ch in cell): | |
| new_row[i] = redact_text(cell, rules) | |
| else: | |
| for i in col_indices: | |
| if i < len(row): | |
| new_row[i] = redact_text(row[i], rules) | |
| writer.writerow(new_row) | |
| def main(): | |
| ap = argparse.ArgumentParser(description='Generic CSV Redaction Tool') | |
| ap.add_argument('input', help='Input CSV file') | |
| ap.add_argument('--output', '-o', help='Output CSV file (default in the same directory as input with .redacted.csv suffix)') | |
| ap.add_argument('--columns', '-c', nargs='+', help='Column names or indices to redact (indices start from 0). If not specified, apply to all columns') | |
| ap.add_argument('--full-mask', action='store_true', help='Use full replacement instead of partial masking (for phone numbers, ID cards, etc.)') | |
| args = ap.parse_args() | |
| inp = Path(args.input) | |
| if not inp.exists(): | |
| print('Input file does not exist:', inp) | |
| return | |
| out = Path(args.output) if args.output else inp.with_suffix('.redacted.csv') | |
| process_csv(inp, out, args.columns or [], args.full_mask) | |
| print('Redacted file written to:', out) | |
| if __name__ == '__main__': | |
| main() | |