""" scanner.py — Secrets Detection Engine ======================================= Pure Python, zero external CLI tools required. Scans directories and ZIP archives for hardcoded secrets using compiled regex. Public API used by app.py and tasks.py: scan_zip(path) → scan an extracted ZIP, return result dict scan_directory(path) → walk a directory, return list of Finding dicts scan_repo(url) → clone GitHub repo, scan, clean up, return result dict validate_repo_url(url)→ raise ValueError on invalid/unsafe URL build_result(findings, source, is_pro) → assemble final response dict """ import os import re import json import zipfile import tempfile import shutil import subprocess import logging from dataclasses import dataclass, asdict from urllib.parse import urlparse logger = logging.getLogger("secretscan.scanner") # ────────────────────────────────────────────────────────────── # LIMITS # ────────────────────────────────────────────────────────────── MAX_FILES = 1_000 # abort ZIP/repo if it contains more files MAX_ZIP_MB = 50 # reject ZIPs larger than this uncompressed MAX_REPO_MB = 50 # reject repos larger than this total MAX_FILE_BYTES = 512_000 # skip individual files larger than 512 KB MAX_LINE_LEN = 2_000 # skip lines longer than this (minified code) FREE_FINDINGS = 5 # max findings shown to free-tier users ALLOWED_HOSTS = {"github.com"} # File extensions worth scanning. Binary, image, and video files are skipped. SCANNABLE_EXTS = { ".py", ".js", ".ts", ".jsx", ".tsx", ".env", ".cfg", ".ini", ".conf", ".config", ".yaml", ".yml", ".toml", ".json", ".xml", ".sh", ".bash", ".zsh", ".fish", ".rb", ".php", ".java", ".go", ".rs", ".cs", ".tf", ".tfvars", ".properties", ".pem", ".key", ".crt", ".txt", ".md", ".html", ".htaccess", "", # files with no extension (Makefile, Dockerfile, etc.) } # Directories that are never worth scanning SKIP_DIRS = { ".git", "node_modules", "__pycache__", ".venv", "venv", "dist", "build", ".next", ".nuxt", "vendor", "target", ".mypy_cache", ".pytest_cache", ".tox", "coverage", } # ────────────────────────────────────────────────────────────── # SECRET PATTERN DEFINITIONS # Each entry: (label, compiled_regex, severity, description, fix) # ────────────────────────────────────────────────────────────── @dataclass class _Pattern: label: str regex: re.Pattern severity: str # HIGH | MEDIUM | LOW description: str fix: str # Patterns are evaluated in order. HIGH severity patterns come first. _PATTERNS: list[_Pattern] = [ # ── HIGH: Cloud & AI service keys ────────────────────────── _Pattern("OpenAI API Key", re.compile(r'\bsk-[A-Za-z0-9]{20,60}\b'), "HIGH", "Hardcoded OpenAI API key. Attackers can run API calls billed to your account.", "Revoke at platform.openai.com and store in OPENAI_API_KEY env var."), _Pattern("OpenAI Project Key", re.compile(r'\bsk-proj-[A-Za-z0-9\-_]{20,80}\b'), "HIGH", "Hardcoded OpenAI project key detected.", "Revoke at platform.openai.com and use os.getenv('OPENAI_API_KEY')."), _Pattern("Anthropic API Key", re.compile(r'\bsk-ant-[A-Za-z0-9\-_]{40,}\b'), "HIGH", "Hardcoded Anthropic (Claude) API key.", "Rotate at console.anthropic.com. Store as ANTHROPIC_API_KEY env var."), _Pattern("HuggingFace Token", re.compile(r'\bhf_[A-Za-z0-9]{34,}\b'), "HIGH", "HuggingFace API token gives access to private models and datasets.", "Revoke at huggingface.co/settings/tokens. Use HUGGINGFACE_TOKEN env var."), _Pattern("Google API Key", re.compile(r'\bAIza[A-Za-z0-9\-_]{35}\b'), "HIGH", "Google API key detected. May allow access to Maps, Cloud, or Firebase.", "Restrict or rotate in Google Cloud Console. Use GOOGLE_API_KEY env var."), _Pattern("Google Service Account", re.compile(r'"type"\s*:\s*"service_account"'), "HIGH", "Google service account JSON found in source code.", "Remove immediately. Use Workload Identity or mount at runtime via Secret Manager."), # ── HIGH: GitHub tokens ──────────────────────────────────── _Pattern("GitHub PAT (Classic)", re.compile(r'\bghp_[A-Za-z0-9]{36}\b'), "HIGH", "GitHub classic Personal Access Token. Grants repo/org access depending on scope.", "Revoke at github.com/settings/tokens. Use GITHUB_TOKEN env var or Actions secrets."), _Pattern("GitHub PAT (Fine-Grained)", re.compile(r'\bgithub_pat_[A-Za-z0-9_]{82}\b'), "HIGH", "GitHub fine-grained PAT detected.", "Revoke at github.com/settings/tokens. Store as a repository secret."), _Pattern("GitHub OAuth Token", re.compile(r'\bgho_[A-Za-z0-9]{36}\b'), "HIGH", "GitHub OAuth access token detected.", "Revoke via the OAuth app settings. Never commit OAuth tokens."), # ── HIGH: AWS ────────────────────────────────────────────── _Pattern("AWS Access Key ID", re.compile(r'\b(AKIA|ASIA|AROA|AIDA)[A-Z0-9]{16}\b'), "HIGH", "AWS Access Key ID. Combined with the secret, this allows full AWS API access.", "Deactivate in IAM console immediately. Use IAM roles or AWS Secrets Manager."), _Pattern("AWS Secret Access Key", re.compile(r'(?i)aws.{0,20}secret.{0,20}[=:]\s*["\']?([A-Za-z0-9/+]{40})["\']?'), "HIGH", "AWS Secret Access Key assignment detected.", "Rotate in IAM. Use environment variables or EC2 instance profiles."), # ── HIGH: Payment & messaging ────────────────────────────── _Pattern("Stripe Secret Key", re.compile(r'\bsk_(live|test)_[A-Za-z0-9]{24,}\b'), "HIGH", "Stripe secret API key. Allows full charge/refund access to your Stripe account.", "Roll the key in the Stripe dashboard. Store as STRIPE_SECRET_KEY env var."), _Pattern("Stripe Webhook Secret", re.compile(r'\bwhsec_[A-Za-z0-9]{32,}\b'), "HIGH", "Stripe webhook signing secret detected.", "Regenerate in Stripe dashboard → Webhooks. Store as STRIPE_WEBHOOK_SECRET env var."), _Pattern("Slack Bot Token", re.compile(r'\bxoxb-[0-9]{10,13}-[0-9]{10,13}-[A-Za-z0-9]{24}\b'), "HIGH", "Slack bot token. Can post messages, read channels, and access workspace data.", "Revoke at api.slack.com/apps. Store as SLACK_BOT_TOKEN env var."), _Pattern("SendGrid API Key", re.compile(r'\bSG\.[A-Za-z0-9\-_]{22,}\.[A-Za-z0-9\-_]{43,}\b'), "HIGH", "SendGrid API key allows sending emails from your account.", "Revoke at app.sendgrid.com/settings/api_keys. Store as SENDGRID_API_KEY env var."), _Pattern("Twilio Auth Token", re.compile(r'(?i)twilio.{0,20}auth_?token.{0,10}[=:]\s*["\']?([a-f0-9]{32})["\']?'), "HIGH", "Twilio auth token allows full API access to your Twilio account.", "Rotate at console.twilio.com. Store as TWILIO_AUTH_TOKEN env var."), _Pattern("PayPal Secret / Client Secret", re.compile(r'(?i)(paypal.{0,20}(secret|client_secret)|PAYPAL_SECRET)\s*[=:]\s*["\']?[A-Za-z0-9\-_]{20,}["\']?'), "HIGH", "PayPal client secret or API credential found in source code.", "Rotate in PayPal Developer Dashboard. Store as PAYPAL_CLIENT_SECRET env var."), # ── HIGH: Infra & cryptography ──────────────────────────── _Pattern("Private Key (PEM Block)", re.compile(r'-----BEGIN (RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----'), "HIGH", "Private key material embedded in source code — critical exposure.", "Remove immediately. Store private keys as files outside the repo or in a secrets manager."), _Pattern("JWT Secret Hardcoded", re.compile( r'(?i)(jwt[_-]?secret|secret[_-]?key)\s*[=:]\s*["\']' r'(?!your|example|changeme|placeholder|secret|xxx)[A-Za-z0-9!@#$%^&*\-_+=]{12,}["\']' ), "HIGH", "JWT signing secret hardcoded in source. Allows forging tokens.", "Generate a strong random secret. Store as SECRET_KEY env var."), _Pattern("Database Connection String", re.compile( r'(?i)(postgres|mysql|mongodb|redis|mssql|sqlite)://' r'[^:@\s]+:[^@\s]+@[^\s\'"]{5,}' ), "HIGH", "Database connection string with embedded credentials found.", "Move to DATABASE_URL env var. Never commit credentials to source control."), _Pattern("Supabase Service Role Key", re.compile( r'(?i)supabase.{0,30}(service_role|anon).{0,10}[=:]\s*["\']eyJ[A-Za-z0-9\-_=.]+["\']' ), "HIGH", "Supabase service role key found. Bypasses Row Level Security — treat as root credential.", "Store as SUPABASE_KEY env var. Never expose the service role key in frontend code."), # ── MEDIUM: Suspicious assignments ──────────────────────── _Pattern("Hardcoded Password", re.compile( r'(?i)(password|passwd|pwd)\s*[=:]\s*["\']' r'(?!your|example|changeme|placeholder|\s*$)[^\s"\']{6,}["\']' ), "MEDIUM", "Hardcoded password string detected.", "Replace with an env var lookup: os.getenv('DB_PASSWORD')."), _Pattern("Generic API Key Assignment", re.compile( r'(?i)(api[_-]?key|apikey|access[_-]?key|auth[_-]?token|bearer[_-]?token)' r'\s*[=:]\s*["\'](?!your|example|test|demo|placeholder|xxx)[A-Za-z0-9\-_.]{16,}["\']' ), "MEDIUM", "Possible API key or token hardcoded in source.", "Move to an environment variable and load at runtime with os.getenv()."), _Pattern("Secret in URL Query String", re.compile( r'https?://[^\s\'"]{0,60}[?&]' r'(key|token|secret|api_key|apikey)=[A-Za-z0-9\-_]{8,}' ), "MEDIUM", "Secret or token embedded in a URL — URLs are often logged by servers and proxies.", "Pass credentials in Authorization headers, not URL query parameters."), # ── LOW: Code quality / hygiene ──────────────────────────── _Pattern(".env File Included", re.compile(r''), # matched on filename, not content — handled separately "MEDIUM", ".env file found in the upload or repository.", "Add .env to .gitignore. Commit only .env.example with placeholder values."), _Pattern("TODO with Credentials", re.compile(r'(?i)#\s*TODO.{0,30}(password|secret|key|token|credential)'), "LOW", "TODO comment referencing credentials — may indicate insecure work in progress.", "Review and ensure credentials are never committed as part of this work."), _Pattern("Weak Hash Algorithm", re.compile(r'(?i)\b(md5|sha1)\s*\('), "LOW", "MD5 or SHA-1 is cryptographically broken for security-sensitive use.", "Use hashlib.sha256() or better. For passwords use bcrypt or argon2."), ] # Separate lookup for filename-based patterns (no content scan needed) _FILENAME_PATTERNS: dict[str, _Pattern] = { p.label: p for p in _PATTERNS if p.label == ".env File Included" } # Content-scan patterns only (skip filename-only entries) _CONTENT_PATTERNS = [p for p in _PATTERNS if p.label != ".env File Included"] # ────────────────────────────────────────────────────────────── # FINDING DATA CLASS # ────────────────────────────────────────────────────────────── @dataclass class Finding: type: str file: str line: int severity: str description: str fix: str match: str = "" # redacted snippet def to_dict(self) -> dict: return asdict(self) # ────────────────────────────────────────────────────────────── # HELPERS # ────────────────────────────────────────────────────────────── def _redact(text: str) -> str: """Show only the first 6 chars of a secret match to prevent leaking it.""" if len(text) <= 6: return "***" return f"{text[:6]}{'*' * min(8, len(text) - 6)} (redacted)" def _scannable(path: str) -> bool: """Return True if a file should be scanned (right extension, right size).""" ext = os.path.splitext(path)[1].lower() if ext not in SCANNABLE_EXTS: return False try: return os.path.getsize(path) <= MAX_FILE_BYTES except OSError: return False # ────────────────────────────────────────────────────────────── # CORE SCAN FUNCTIONS # ────────────────────────────────────────────────────────────── def _scan_file(abs_path: str, rel_path: str) -> list[Finding]: """ Scan a single file for all secret patterns. Returns a list of Finding objects (may be empty). """ findings: list[Finding] = [] seen: set[str] = set() # dedup identical matches within one file # ── Filename-based check ────────────────────────────────── basename = os.path.basename(abs_path) if basename == ".env" or (basename.startswith(".env.") and basename != ".env.example"): pat = _FILENAME_PATTERNS[".env File Included"] findings.append(Finding( type=pat.label, file=rel_path, line=0, severity=pat.severity, description=pat.description, fix=pat.fix, match=basename, )) # ── Content-based scan ──────────────────────────────────── try: with open(abs_path, "r", encoding="utf-8", errors="replace") as fh: lines = fh.readlines() except (OSError, PermissionError) as exc: logger.warning(f"Cannot read {abs_path}: {exc}") return findings for lineno, line in enumerate(lines, start=1): if len(line) > MAX_LINE_LEN: continue # skip minified / generated lines for pat in _CONTENT_PATTERNS: for match in pat.regex.finditer(line): key = f"{pat.label}:{match.group(0)}" if key in seen: continue seen.add(key) findings.append(Finding( type = pat.label, file = rel_path, line = lineno, severity = pat.severity, description = pat.description, fix = pat.fix, match = _redact(match.group(0)), )) return findings def scan_directory(base_dir: str) -> list[Finding]: """ Walk an entire directory tree and scan every eligible file. Returns a list of Finding objects sorted HIGH → MEDIUM → LOW. """ all_findings: list[Finding] = [] order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2} for root, dirs, files in os.walk(base_dir): # Prune ignored dirs so os.walk never descends into them dirs[:] = [d for d in dirs if d not in SKIP_DIRS] for fname in files: abs_path = os.path.join(root, fname) rel_path = os.path.relpath(abs_path, base_dir) if not _scannable(abs_path): continue all_findings.extend(_scan_file(abs_path, rel_path)) all_findings.sort(key=lambda f: (order.get(f.severity, 9), f.file, f.line)) return all_findings # ────────────────────────────────────────────────────────────── # RISK SCORING # ────────────────────────────────────────────────────────────── def _risk_level(findings: list[Finding]) -> str: """Derive a top-level risk label from the findings list.""" if not findings: return "NONE" severities = {f.severity for f in findings} if "HIGH" in severities: return "HIGH" if "MEDIUM" in severities: return "MEDIUM" return "LOW" # ────────────────────────────────────────────────────────────── # RESULT BUILDER # ────────────────────────────────────────────────────────────── def build_result(findings: list[Finding], source: str, is_pro: bool) -> dict: """ Assemble the final API response dict. Free users: see only the first FREE_FINDINGS findings + a truncation notice. Pro users: see everything. Args: findings: All Finding objects from the scan. source: Human-readable label ("zip_upload" or a repo URL). is_pro: Whether the requesting user has a Pro account. Returns a dict matching the documented output schema. """ risk = _risk_level(findings) total = len(findings) counts = {"HIGH": 0, "MEDIUM": 0, "LOW": 0} for f in findings: counts[f.severity] = counts.get(f.severity, 0) + 1 if is_pro: visible = [f.to_dict() for f in findings] truncated = False else: visible = [f.to_dict() for f in findings[:FREE_FINDINGS]] truncated = total > FREE_FINDINGS return { "risk_level": risk, "total_secrets": total, "summary": { "high": counts["HIGH"], "medium": counts["MEDIUM"], "low": counts["LOW"], }, "source": source, "findings": visible, "truncated": truncated, # True when free user has more results hidden "upgrade_message": ( f"Upgrade to Pro to see all {total} findings and download the PDF report." if truncated else "" ), } # ────────────────────────────────────────────────────────────── # ZIP INPUT HANDLER # ────────────────────────────────────────────────────────────── def scan_zip(zip_path: str, is_pro: bool = True) -> dict: """ Extract a ZIP archive to a temp directory, scan it, and clean up. Raises: ValueError: on invalid/unsafe ZIP. Returns: Result dict from build_result(). """ if not zipfile.is_zipfile(zip_path): raise ValueError("Uploaded file is not a valid ZIP archive.") extract_dir = tempfile.mkdtemp(prefix="ss_zip_") try: with zipfile.ZipFile(zip_path, "r") as zf: members = zf.infolist() if len(members) > MAX_FILES: raise ValueError(f"ZIP contains too many files ({len(members):,}). Limit is {MAX_FILES:,}.") total_bytes = sum(m.file_size for m in members) if total_bytes > MAX_ZIP_MB * 1024 * 1024: raise ValueError(f"ZIP uncompressed size exceeds {MAX_ZIP_MB} MB limit.") # ZIP-slip protection: reject path-traversal entries real_extract = os.path.realpath(extract_dir) for member in members: dest = os.path.realpath(os.path.join(extract_dir, member.filename)) if not dest.startswith(real_extract): raise ValueError(f"Unsafe path in ZIP (path traversal): {member.filename}") zf.extractall(extract_dir) findings = scan_directory(extract_dir) return build_result(findings, "zip_upload", is_pro) finally: shutil.rmtree(extract_dir, ignore_errors=True) # ────────────────────────────────────────────────────────────── # REPO INPUT HANDLER # ────────────────────────────────────────────────────────────── def validate_repo_url(url: str) -> None: """ Validate that a URL is a safe GitHub HTTPS URL. Raises ValueError with a user-facing message on failure. """ try: parsed = urlparse(url.strip()) except Exception: raise ValueError("Invalid URL format.") if parsed.scheme != "https": raise ValueError("Only HTTPS repository URLs are accepted.") if parsed.hostname not in ALLOWED_HOSTS: raise ValueError(f"Only GitHub (github.com) repositories are supported.") parts = [p for p in parsed.path.split("/") if p] if len(parts) < 2: raise ValueError( "URL must point to a specific repository: https://github.com/owner/repo" ) def safe_clone(repo_url: str) -> str: """ Shallow-clone a GitHub repository into a fresh temp directory. Returns: Path to the cloned directory. Raises: RuntimeError: if git is missing or the clone fails. """ validate_repo_url(repo_url) clone_dir = tempfile.mkdtemp(prefix="ss_repo_") try: result = subprocess.run( ["git", "clone", "--depth", "1", "--quiet", repo_url, clone_dir], capture_output=True, text=True, timeout=90, ) except subprocess.TimeoutExpired: shutil.rmtree(clone_dir, ignore_errors=True) raise RuntimeError("Repository clone timed out after 90 seconds.") except FileNotFoundError: shutil.rmtree(clone_dir, ignore_errors=True) raise RuntimeError("git is not installed on this server.") if result.returncode != 0: shutil.rmtree(clone_dir, ignore_errors=True) # Return only the last line of stderr to avoid leaking internal paths err = result.stderr.strip().splitlines()[-1][:200] if result.stderr.strip() else "Unknown error" raise RuntimeError(f"Clone failed: {err}") return clone_dir def validate_repo_size(path: str) -> None: """Enforce file count and total size limits on a cloned repo.""" file_count = 0 total_bytes = 0 for root, dirs, files in os.walk(path): dirs[:] = [d for d in dirs if d not in SKIP_DIRS] for fname in files: fp = os.path.join(root, fname) if os.path.islink(fp): continue try: total_bytes += os.path.getsize(fp) file_count += 1 except OSError: continue if file_count > MAX_FILES: raise ValueError(f"Repository exceeds {MAX_FILES:,} file limit.") if total_bytes > MAX_REPO_MB * 1024 * 1024: raise ValueError(f"Repository exceeds {MAX_REPO_MB} MB size limit.") def scan_repo(repo_url: str, is_pro: bool = True) -> dict: """ Clone a GitHub repo, validate size, scan for secrets, clean up, return result. Returns: Result dict from build_result(). """ clone_dir = safe_clone(repo_url) try: validate_repo_size(clone_dir) findings = scan_directory(clone_dir) return build_result(findings, repo_url, is_pro) finally: shutil.rmtree(clone_dir, ignore_errors=True)