#!/usr/bin/env python3 """ Sanitize Langflow agent exports by removing sensitive credentials. Usage: python scripts/sanitize_agent_export.py [output_file] If output_file is not provided, it will create a sanitized version with '_sanitized' suffix. """ import json import re import sys from pathlib import Path from typing import Any, Dict, List, Tuple # Patterns that indicate sensitive data SENSITIVE_PATTERNS = [ r'sk-[a-zA-Z0-9]{20,}', # OpenAI API keys r'sk-proj-[a-zA-Z0-9]{20,}', # OpenAI project API keys r'postgresql://[^:]+:[^@]+@', # PostgreSQL connection strings with password r'mongodb://[^:]+:[^@]+@', # MongoDB connection strings with password r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*', # Bearer tokens r'[a-zA-Z0-9]{32,}', # Generic long alphanumeric strings (likely tokens) ] # Keys that typically contain sensitive data SENSITIVE_KEYS = [ 'api_key', 'apikey', 'openai_api_key', 'langsmith_api_key', 'password', 'secret', 'secret_key', 'token', 'bearer', 'credential', 'auth', 'authorization', 'connection_string', 'database_url', 'db_password', ] # Replacement values for different credential types REPLACEMENTS = { 'api_key': '${OPENAI_API_KEY}', 'apikey': '${API_KEY}', 'openai_api_key': '${OPENAI_API_KEY}', 'langsmith_api_key': '${LANGSMITH_API_KEY}', 'password': '${DB_PASSWORD}', 'secret': '${SECRET_KEY}', 'secret_key': '${SECRET_KEY}', 'token': '${AUTH_TOKEN}', 'bearer': '${BEARER_TOKEN}', 'credential': '${CREDENTIAL}', 'auth': '${AUTH_KEY}', 'authorization': '${AUTHORIZATION}', 'connection_string': '${DATABASE_URL}', 'database_url': '${DATABASE_URL}', 'db_password': '${DB_PASSWORD}', } class CredentialDetector: """Detect and report potential credentials in data structures.""" def __init__(self): self.findings: List[Tuple[str, str, str]] = [] # (path, key, value) def scan_value(self, value: str, path: str = "") -> bool: """Check if a value matches sensitive patterns.""" if not isinstance(value, str) or len(value) < 8: return False for pattern in SENSITIVE_PATTERNS: if re.search(pattern, value, re.IGNORECASE): return True return False def scan_dict(self, data: Dict[str, Any], path: str = "") -> None: """Recursively scan dictionary for sensitive data.""" for key, value in data.items(): current_path = f"{path}.{key}" if path else key # Check if key name suggests sensitive data if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS): if isinstance(value, str) and value: self.findings.append((current_path, key, value)) # Check if value matches sensitive patterns elif isinstance(value, str) and self.scan_value(value, current_path): self.findings.append((current_path, key, value)) # Recurse into nested structures elif isinstance(value, dict): self.scan_dict(value, current_path) elif isinstance(value, list): self.scan_list(value, current_path) def scan_list(self, data: List[Any], path: str = "") -> None: """Recursively scan list for sensitive data.""" for i, item in enumerate(data): current_path = f"{path}[{i}]" if isinstance(item, dict): self.scan_dict(item, current_path) elif isinstance(item, list): self.scan_list(item, current_path) elif isinstance(item, str) and self.scan_value(item, current_path): self.findings.append((current_path, f"item_{i}", item)) def sanitize_value(key: str, value: str) -> str: """Replace sensitive value with appropriate placeholder.""" key_lower = key.lower() # Use specific replacement if key matches known pattern for sensitive_key, replacement in REPLACEMENTS.items(): if sensitive_key in key_lower: return replacement # Default replacement for unknown sensitive data return "${CREDENTIAL}" def sanitize_dict(data: Dict[str, Any]) -> Dict[str, Any]: """Recursively sanitize dictionary by replacing sensitive values.""" sanitized = {} for key, value in data.items(): # Check if key suggests sensitive data if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS): if isinstance(value, str) and value: sanitized[key] = sanitize_value(key, value) else: sanitized[key] = value # Recurse into nested structures elif isinstance(value, dict): sanitized[key] = sanitize_dict(value) elif isinstance(value, list): sanitized[key] = sanitize_list(value) else: sanitized[key] = value return sanitized def sanitize_list(data: List[Any]) -> List[Any]: """Recursively sanitize list by replacing sensitive values.""" sanitized = [] for item in data: if isinstance(item, dict): sanitized.append(sanitize_dict(item)) elif isinstance(item, list): sanitized.append(sanitize_list(item)) else: sanitized.append(item) return sanitized def sanitize_agent_export(input_file: Path, output_file: Path = None) -> bool: """ Sanitize Langflow agent export by removing credentials. Returns True if credentials were found and sanitized, False otherwise. """ # Read input file try: with open(input_file, 'r') as f: data = json.load(f) except Exception as e: print(f"❌ Error reading {input_file}: {e}") return False # Scan for credentials detector = CredentialDetector() detector.scan_dict(data) if not detector.findings: print(f"✅ No credentials detected in {input_file}") return False # Report findings print(f"⚠️ Found {len(detector.findings)} potential credential(s) in {input_file}:") for path, key, value in detector.findings: # Mask the value for display masked = value[:8] + "..." if len(value) > 8 else "***" print(f" - {path}: {key} = {masked}") # Sanitize data sanitized_data = sanitize_dict(data) # Determine output file if output_file is None: output_file = input_file.parent / f"{input_file.stem}_sanitized{input_file.suffix}" # Write sanitized output try: with open(output_file, 'w') as f: json.dump(sanitized_data, f, indent=2) print(f"✅ Sanitized version saved to: {output_file}") return True except Exception as e: print(f"❌ Error writing {output_file}: {e}") return False def main(): if len(sys.argv) < 2: print("Usage: python sanitize_agent_export.py [output_file]") sys.exit(1) input_file = Path(sys.argv[1]) output_file = Path(sys.argv[2]) if len(sys.argv) > 2 else None if not input_file.exists(): print(f"❌ Error: {input_file} does not exist") sys.exit(1) # Run sanitization found_credentials = sanitize_agent_export(input_file, output_file) if found_credentials: print("\n⚠️ WARNING: Credentials were found and replaced with placeholders.") print(" Review the sanitized file before committing to Git.") print(" Make sure to use environment variables in Langflow for all credentials.") sys.exit(1) # Exit with error code to prevent accidental commits else: print("\n✅ File is safe to commit.") sys.exit(0) if __name__ == "__main__": main()