web / scrub.py
monsimas's picture
TruffleHog backstop: soft-warn (flag for review) instead of hard-reject; drop FP Twilio regex
63d6042 verified
Raw
History Blame Contribute Delete
10.7 kB
#!/usr/bin/env python3
"""
scrub.py — deterministic anonymization pass for Trace Commons donations.
Removes the high-confidence, crisply-patterned leaks from a coding-agent
session before it is reviewed and donated:
- home-directory paths and the username embedded in them
- common secret formats (API keys, tokens, PEM blocks, JWTs, env assignments)
- email addresses
This is intentionally NOT the whole anonymization story, and the secret list
here is a fast first pass, not the authoritative one. Three layers back it up:
the ingestion server re-runs TruffleHog (hundreds of maintained, updated secret
detectors) and rejects anything it flags; the skill performs an LLM/human review
pass for fuzzy things a regex can't recognize (personal names in prose, company
names, internal codenames); and the contributor reviews the exact diff before
anything is uploaded. The split is deliberate: code handles the patterns that
have signatures, a dedicated scanner handles breadth, and a human handles meaning.
The script walks the parsed JSON of each session line and rewrites string
values in place, so it works regardless of where in the structure a string
sits. It writes a cleaned file plus a JSON report of every redaction.
Usage:
python scrub.py --in session.jsonl --harness claude_code \
--out cleaned.jsonl --report report.json
"""
import argparse
import json
import re
import sys
from collections import Counter
# --- redaction patterns -----------------------------------------------------
# Order matters: more specific patterns run before more general ones.
HOME_PATH = re.compile(r'(\\?/(?:Users|home))\\?/([^/\\\s"\']+)')
# Dash-encoded home paths. Coding agents (e.g. Claude Code) name their project
# directories by replacing the slashes of an absolute path with dashes, so
# /Users/<name>/proj becomes the slug .claude/projects/-Users-<name>-proj. The
# slash-based HOME_PATH never sees these, so the username leaks. Anchored on the
# leading "/-Users-" / "/-home-" of the slug to avoid mangling hyphenated prose.
HOME_PATH_DASH = re.compile(r'(/-(?:Users|home))-([^-\s"\'\\/]+)')
# Windows user paths too
WIN_PATH = re.compile(r'([A-Za-z]:\\Users\\)([^\\\s"\']+)', re.IGNORECASE)
EMAIL = re.compile(r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b')
# RFC1918 private / internal IPv4 addresses. Not a secret, but it leaks internal
# network topology (DB hosts, service IPs), so it is redacted like home paths —
# without causing the server backstop to reject the whole donation. The four-octet
# shape with a fixed private prefix avoids mangling version numbers like 1.2.3.4.
PRIVATE_IP = re.compile(
r'\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}'
r'|192\.168\.\d{1,3}\.\d{1,3}'
r'|172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}'
r'|169\.254\.\d{1,3}\.\d{1,3})\b'
)
# Secrets — each tuple is (name, compiled regex). Keep these conservative
# enough to avoid mangling ordinary prose but broad enough to catch real keys.
SECRET_PATTERNS = [
("aws_access_key", re.compile(r'\bAKIA[0-9A-Z]{16}\b')),
("aws_secret", re.compile(r'\b(?i:aws_secret_access_key)\s*[=:]\s*["\']?[A-Za-z0-9/+=]{40}["\']?')),
("github_token", re.compile(r'\bgh[pousr]_[A-Za-z0-9]{36,}\b')),
("hf_token", re.compile(r'\bhf_[A-Za-z0-9]{30,}\b')),
("openai_key", re.compile(r'\bsk-[A-Za-z0-9_\-]{20,}\b')),
("anthropic_key", re.compile(r'\bsk-ant-[A-Za-z0-9_\-]{20,}\b')),
("slack_token", re.compile(r'\bxox[baprs]-[A-Za-z0-9\-]{10,}\b')),
("google_api_key", re.compile(r'\bAIza[0-9A-Za-z_\-]{35}\b')),
("jwt", re.compile(r'\beyJ[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\.[A-Za-z0-9_\-]+\b')),
("private_key_block", re.compile(r'-----BEGIN (?:RSA |EC |OPENSSH |DSA |PGP )?PRIVATE KEY-----.*?-----END (?:RSA |EC |OPENSSH |DSA |PGP )?PRIVATE KEY-----', re.DOTALL)),
("bearer_token", re.compile(r'\b(?i:bearer)\s+[A-Za-z0-9_\-\.=]{20,}')),
("connection_string", re.compile(r'\b(?:postgres|postgresql|mysql|mongodb(?:\+srv)?|redis|amqp)://[^\s"\'<>]+:[^\s"\'<>@]+@[^\s"\'<>]+')),
# More vendor-prefixed tokens. This list is necessarily incomplete — it is a
# fast first pass, NOT the authoritative check. The ingestion server runs
# TruffleHog (hundreds of maintained detectors) as the real backstop and
# rejects anything it flags. Keep these prefix-anchored to avoid false hits.
("github_fine_grained_pat", re.compile(r'\bgithub_pat_[0-9A-Za-z_]{22,}\b')),
("gitlab_pat", re.compile(r'\bglpat-[0-9A-Za-z_\-]{20,}\b')),
("gcp_oauth_token", re.compile(r'\bya29\.[0-9A-Za-z_\-]{20,}\b')),
("stripe_key", re.compile(r'\b(?:sk|rk)_(?:live|test)_[0-9A-Za-z]{20,}\b')),
("sendgrid_key", re.compile(r'\bSG\.[A-Za-z0-9_\-]{16,32}\.[A-Za-z0-9_\-]{16,64}\b')),
("npm_token", re.compile(r'\bnpm_[0-9A-Za-z]{36}\b')),
("pypi_token", re.compile(r'\bpypi-[A-Za-z0-9_\-]{16,}\b')),
# Twilio (SK + 32 hex) is deliberately NOT regexed here: the shape collides
# with ordinary hashes/IDs and would cause false redactions. TruffleHog's
# validated Twilio detector handles it on the server backstop instead.
("azure_storage_key", re.compile(r'\bAccountKey=[A-Za-z0-9+/=]{40,}')),
("slack_webhook", re.compile(r'https://hooks\.slack\.com/services/[A-Za-z0-9/_\-]+')),
("discord_webhook", re.compile(r'https://(?:canary\.|ptb\.)?discord(?:app)?\.com/api/webhooks/[0-9]+/[A-Za-z0-9_\-]+')),
# generic KEY=secret env assignments where the value looks secret-ish
("env_secret", re.compile(r'\b([A-Z][A-Z0-9_]*(?:KEY|TOKEN|SECRET|PASSWORD|PASSWD|PWD|CREDENTIAL|API)[A-Z0-9_]*)\s*=\s*["\']?([^\s"\']{8,})["\']?')),
]
def redact_string(s, counts):
"""Apply all redactions to a single string, tallying what was changed."""
if not isinstance(s, str) or not s:
return s
# Secrets first (before paths/emails, since some secrets contain those shapes)
for name, pat in SECRET_PATTERNS:
def _sub(m, _name=name):
counts[_name] += 1
if _name == "env_secret":
# keep the key name, redact the value
return f"{m.group(1)}=[REDACTED_SECRET]"
return "[REDACTED_SECRET]"
s = pat.sub(_sub, s)
# Home paths -> normalize the username segment
def _home(m):
counts["home_path"] += 1
return f"{m.group(1)}/USER"
s = HOME_PATH.sub(_home, s)
def _home_dash(m):
counts["home_path"] += 1
return f"{m.group(1)}-USER"
s = HOME_PATH_DASH.sub(_home_dash, s)
def _win(m):
counts["home_path"] += 1
return f"{m.group(1)}USER"
s = WIN_PATH.sub(_win, s)
# Emails
def _email(m):
counts["email"] += 1
return "[REDACTED_EMAIL]"
s = EMAIL.sub(_email, s)
# Private/internal IPs (redact-only, not treated as a rejectable secret)
def _ip(m):
counts["private_ip"] += 1
return "[REDACTED_IP]"
s = PRIVATE_IP.sub(_ip, s)
return s
def walk(obj, counts):
"""Recursively rewrite all string values in a parsed JSON structure."""
if isinstance(obj, str):
return redact_string(obj, counts)
if isinstance(obj, list):
return [walk(x, counts) for x in obj]
if isinstance(obj, dict):
# Keys can carry leaks too — some agents key objects by absolute file
# path (e.g. {"/Users/<name>/proj/file": ...}), so scrub keys as well.
return {
(redact_string(k, counts) if isinstance(k, str) else k): walk(v, counts)
for k, v in obj.items()
}
return obj
def scrub_text(raw, harness):
"""Scrub a raw session string. Returns (cleaned_text, report_dict).
Importable so the server can run the exact same detection as the skill,
as a backstop. Mirrors the file-based main() below.
"""
counts = Counter()
lines_in = 0
lines_out = []
stripped = raw.strip()
is_single_doc = stripped.startswith("{") and stripped.count("\n") > 0 and not _looks_like_jsonl(stripped)
if is_single_doc:
try:
doc = json.loads(stripped)
cleaned = walk(doc, counts)
lines_out.append(json.dumps(cleaned, ensure_ascii=False))
lines_in = 1
except json.JSONDecodeError:
is_single_doc = False
if not is_single_doc:
for line in raw.splitlines():
line = line.strip()
if not line:
continue
lines_in += 1
try:
obj = json.loads(line)
except json.JSONDecodeError:
lines_out.append(redact_string(line, counts))
continue
cleaned = walk(obj, counts)
lines_out.append(json.dumps(cleaned, ensure_ascii=False))
report = {
"harness": harness,
"lines_processed": lines_in,
"redactions": dict(counts),
"total_redactions": sum(counts.values()),
}
return "\n".join(lines_out) + "\n", report
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--in", dest="inp", required=True)
ap.add_argument("--harness", required=True)
ap.add_argument("--out", required=True)
ap.add_argument("--report", required=True)
args = ap.parse_args()
with open(args.inp, "r", encoding="utf-8", errors="replace") as f:
raw = f.read()
cleaned_text, report = scrub_text(raw, args.harness)
counts = Counter(report["redactions"])
lines_in = report["lines_processed"]
with open(args.out, "w", encoding="utf-8") as f:
f.write(cleaned_text)
with open(args.report, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
# Human-readable summary to stdout for the skill to relay
print(f"Scrubbed {lines_in} lines from {args.harness} session.")
if counts:
for k, v in counts.most_common():
print(f" {v}× {k}")
else:
print(" No high-confidence secrets or paths found by the automated pass.")
print(f"\nCleaned file: {args.out}")
print(f"Report: {args.report}")
print("\nThis is the automated pass only. Now do the review pass for names,")
print("company names, and internal references before showing the user.")
def _looks_like_jsonl(text):
"""Heuristic: if the first two non-empty lines each parse as JSON, it's JSONL."""
parsed = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
try:
json.loads(line)
parsed += 1
except json.JSONDecodeError:
return False
if parsed >= 2:
return True
return False
if __name__ == "__main__":
main()