ECLIPSE / redact_generic.py
BuaaCXF's picture
Update redact_generic.py
d35df54 verified
#!/usr/bin/env python3
"""
Generic CSV Redaction Script
Usage Examples:
python redact_generic.py input.csv
python redact_generic.py input.csv --output out.csv --columns log_message
python redact_generic.py input.csv --full-mask
Features:
- Default set of regex rules (URL, IP, email, phone number, ID card, long number, device ID, App name)
- Supports column-specific redaction (by column name or index), defaults to processing all text columns
- Provides partial masking (default) or full replacement options
"""
import re
import csv
import argparse
from pathlib import Path
from typing import List, Pattern
def build_rules(full_mask: bool = False):
# Returns (name, pattern, repl_or_callable)
rules = []
# URLs
rules.append(("URL", re.compile(r'https?://\S+'), '[REDACTED_URL]'))
# IPv4
rules.append(("IP", re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'), '[REDACTED_IP]'))
# email
rules.append(("EMAIL", re.compile(r'\b[\w.-]+@[\w.-]+\.[A-Za-z]{2,}\b'), '[REDACTED_EMAIL]'))
# Chinese mobile number (1XXXXXXXXXX)
if full_mask:
rules.append(("MOBILE", re.compile(r'\b1[3-9]\d{9}\b'), '[REDACTED_MOBILE]'))
else:
rules.append(("MOBILE", re.compile(r'\b(1[3-9]\d)(\d{4})(\d{4})\b'), lambda m: m.group(1) + '****' + m.group(3)))
# Chinese ID card (15 or 18)
if full_mask:
rules.append(("IDCARD", re.compile(r'\b\d{15}(?:\d{2}[0-9Xx])?\b'), '[REDACTED_ID]'))
else:
rules.append(("IDCARD", re.compile(r'\b(\d{3})\d+(\d{4}[0-9Xx]?)\b'), lambda m: m.group(1) + '****' + m.group(2)))
# Long numeric sequences (likely account/serial) - conservative: 10+ digits
rules.append(("LONGNUM", re.compile(r'\b\d{10,}\b'), '[REDACTED_NUMBER]'))
# Device serials seen in samples like sb096-251 or sa111-010
# Pattern: letters+digits-hyphen-digits (conservative)
rules.append(("DEVICE_ID", re.compile(r'\b[a-zA-Z]{1,5}\d{1,4}-\d{1,4}\b'), '[REDACTED_DEVICE]'))
# App names ending with App/APP
rules.append(("APPNAME", re.compile(r'\b[A-Za-z0-9_]{2,}App\b', re.IGNORECASE), '[REDACTED_APP]'))
# Short codes (4-6 digits) - optional and may be noisy; comment out by default
# rules.append(("CODE", re.compile(r'\b\d{4,6}\b'), '[REDACTED_CODE]'))
return rules
def redact_text(s: str, rules) -> str:
if s is None:
return s
out = s
for name, patt, repl in rules:
try:
if callable(repl):
out = patt.sub(repl, out)
else:
out = patt.sub(repl, out)
except re.error:
# skip problematic patterns
continue
return out
def process_csv(input_path: Path, output_path: Path, columns: List[str], full_mask: bool):
rules = build_rules(full_mask=full_mask)
with input_path.open('r', encoding='utf-8', errors='ignore', newline='') as fr, \
output_path.open('w', encoding='utf-8', newline='') as fw:
reader = csv.reader(fr)
writer = csv.writer(fw)
try:
header = next(reader)
except StopIteration:
return
writer.writerow(header)
# Map columns: if user provided column names, find indices; supports numeric indices as strings
col_indices = None
if columns:
col_indices = []
for c in columns:
# if it's integer-like, treat as index
if c.isdigit():
idx = int(c)
if idx < len(header):
col_indices.append(idx)
else:
if c in header:
col_indices.append(header.index(c))
# If no specific columns, process all cells
for row in reader:
if not row:
writer.writerow(row)
continue
new_row = list(row)
if col_indices is None:
# apply to all fields
for i, cell in enumerate(row):
if cell and any(ch.isalpha() or ch.isdigit() for ch in cell):
new_row[i] = redact_text(cell, rules)
else:
for i in col_indices:
if i < len(row):
new_row[i] = redact_text(row[i], rules)
writer.writerow(new_row)
def main():
ap = argparse.ArgumentParser(description='Generic CSV Redaction Tool')
ap.add_argument('input', help='Input CSV file')
ap.add_argument('--output', '-o', help='Output CSV file (default in the same directory as input with .redacted.csv suffix)')
ap.add_argument('--columns', '-c', nargs='+', help='Column names or indices to redact (indices start from 0). If not specified, apply to all columns')
ap.add_argument('--full-mask', action='store_true', help='Use full replacement instead of partial masking (for phone numbers, ID cards, etc.)')
args = ap.parse_args()
inp = Path(args.input)
if not inp.exists():
print('Input file does not exist:', inp)
return
out = Path(args.output) if args.output else inp.with_suffix('.redacted.csv')
process_csv(inp, out, args.columns or [], args.full_mask)
print('Redacted file written to:', out)
if __name__ == '__main__':
main()