Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Sanitize Langflow agent exports by removing sensitive credentials. | |
| Usage: | |
| python scripts/sanitize_agent_export.py <input_file> [output_file] | |
| If output_file is not provided, it will create a sanitized version with '_sanitized' suffix. | |
| """ | |
| import json | |
| import re | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Tuple | |
| # Patterns that indicate sensitive data | |
| SENSITIVE_PATTERNS = [ | |
| r'sk-[a-zA-Z0-9]{20,}', # OpenAI API keys | |
| r'sk-proj-[a-zA-Z0-9]{20,}', # OpenAI project API keys | |
| r'postgresql://[^:]+:[^@]+@', # PostgreSQL connection strings with password | |
| r'mongodb://[^:]+:[^@]+@', # MongoDB connection strings with password | |
| r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*', # Bearer tokens | |
| r'[a-zA-Z0-9]{32,}', # Generic long alphanumeric strings (likely tokens) | |
| ] | |
| # Keys that typically contain sensitive data | |
| SENSITIVE_KEYS = [ | |
| 'api_key', | |
| 'apikey', | |
| 'openai_api_key', | |
| 'langsmith_api_key', | |
| 'password', | |
| 'secret', | |
| 'secret_key', | |
| 'token', | |
| 'bearer', | |
| 'credential', | |
| 'auth', | |
| 'authorization', | |
| 'connection_string', | |
| 'database_url', | |
| 'db_password', | |
| ] | |
| # Replacement values for different credential types | |
| REPLACEMENTS = { | |
| 'api_key': '${OPENAI_API_KEY}', | |
| 'apikey': '${API_KEY}', | |
| 'openai_api_key': '${OPENAI_API_KEY}', | |
| 'langsmith_api_key': '${LANGSMITH_API_KEY}', | |
| 'password': '${DB_PASSWORD}', | |
| 'secret': '${SECRET_KEY}', | |
| 'secret_key': '${SECRET_KEY}', | |
| 'token': '${AUTH_TOKEN}', | |
| 'bearer': '${BEARER_TOKEN}', | |
| 'credential': '${CREDENTIAL}', | |
| 'auth': '${AUTH_KEY}', | |
| 'authorization': '${AUTHORIZATION}', | |
| 'connection_string': '${DATABASE_URL}', | |
| 'database_url': '${DATABASE_URL}', | |
| 'db_password': '${DB_PASSWORD}', | |
| } | |
| class CredentialDetector: | |
| """Detect and report potential credentials in data structures.""" | |
| def __init__(self): | |
| self.findings: List[Tuple[str, str, str]] = [] # (path, key, value) | |
| def scan_value(self, value: str, path: str = "") -> bool: | |
| """Check if a value matches sensitive patterns.""" | |
| if not isinstance(value, str) or len(value) < 8: | |
| return False | |
| for pattern in SENSITIVE_PATTERNS: | |
| if re.search(pattern, value, re.IGNORECASE): | |
| return True | |
| return False | |
| def scan_dict(self, data: Dict[str, Any], path: str = "") -> None: | |
| """Recursively scan dictionary for sensitive data.""" | |
| for key, value in data.items(): | |
| current_path = f"{path}.{key}" if path else key | |
| # Check if key name suggests sensitive data | |
| if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS): | |
| if isinstance(value, str) and value: | |
| self.findings.append((current_path, key, value)) | |
| # Check if value matches sensitive patterns | |
| elif isinstance(value, str) and self.scan_value(value, current_path): | |
| self.findings.append((current_path, key, value)) | |
| # Recurse into nested structures | |
| elif isinstance(value, dict): | |
| self.scan_dict(value, current_path) | |
| elif isinstance(value, list): | |
| self.scan_list(value, current_path) | |
| def scan_list(self, data: List[Any], path: str = "") -> None: | |
| """Recursively scan list for sensitive data.""" | |
| for i, item in enumerate(data): | |
| current_path = f"{path}[{i}]" | |
| if isinstance(item, dict): | |
| self.scan_dict(item, current_path) | |
| elif isinstance(item, list): | |
| self.scan_list(item, current_path) | |
| elif isinstance(item, str) and self.scan_value(item, current_path): | |
| self.findings.append((current_path, f"item_{i}", item)) | |
| def sanitize_value(key: str, value: str) -> str: | |
| """Replace sensitive value with appropriate placeholder.""" | |
| key_lower = key.lower() | |
| # Use specific replacement if key matches known pattern | |
| for sensitive_key, replacement in REPLACEMENTS.items(): | |
| if sensitive_key in key_lower: | |
| return replacement | |
| # Default replacement for unknown sensitive data | |
| return "${CREDENTIAL}" | |
| def sanitize_dict(data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Recursively sanitize dictionary by replacing sensitive values.""" | |
| sanitized = {} | |
| for key, value in data.items(): | |
| # Check if key suggests sensitive data | |
| if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS): | |
| if isinstance(value, str) and value: | |
| sanitized[key] = sanitize_value(key, value) | |
| else: | |
| sanitized[key] = value | |
| # Recurse into nested structures | |
| elif isinstance(value, dict): | |
| sanitized[key] = sanitize_dict(value) | |
| elif isinstance(value, list): | |
| sanitized[key] = sanitize_list(value) | |
| else: | |
| sanitized[key] = value | |
| return sanitized | |
| def sanitize_list(data: List[Any]) -> List[Any]: | |
| """Recursively sanitize list by replacing sensitive values.""" | |
| sanitized = [] | |
| for item in data: | |
| if isinstance(item, dict): | |
| sanitized.append(sanitize_dict(item)) | |
| elif isinstance(item, list): | |
| sanitized.append(sanitize_list(item)) | |
| else: | |
| sanitized.append(item) | |
| return sanitized | |
| def sanitize_agent_export(input_file: Path, output_file: Path = None) -> bool: | |
| """ | |
| Sanitize Langflow agent export by removing credentials. | |
| Returns True if credentials were found and sanitized, False otherwise. | |
| """ | |
| # Read input file | |
| try: | |
| with open(input_file, 'r') as f: | |
| data = json.load(f) | |
| except Exception as e: | |
| print(f"❌ Error reading {input_file}: {e}") | |
| return False | |
| # Scan for credentials | |
| detector = CredentialDetector() | |
| detector.scan_dict(data) | |
| if not detector.findings: | |
| print(f"✅ No credentials detected in {input_file}") | |
| return False | |
| # Report findings | |
| print(f"⚠️ Found {len(detector.findings)} potential credential(s) in {input_file}:") | |
| for path, key, value in detector.findings: | |
| # Mask the value for display | |
| masked = value[:8] + "..." if len(value) > 8 else "***" | |
| print(f" - {path}: {key} = {masked}") | |
| # Sanitize data | |
| sanitized_data = sanitize_dict(data) | |
| # Determine output file | |
| if output_file is None: | |
| output_file = input_file.parent / f"{input_file.stem}_sanitized{input_file.suffix}" | |
| # Write sanitized output | |
| try: | |
| with open(output_file, 'w') as f: | |
| json.dump(sanitized_data, f, indent=2) | |
| print(f"✅ Sanitized version saved to: {output_file}") | |
| return True | |
| except Exception as e: | |
| print(f"❌ Error writing {output_file}: {e}") | |
| return False | |
| def main(): | |
| if len(sys.argv) < 2: | |
| print("Usage: python sanitize_agent_export.py <input_file> [output_file]") | |
| sys.exit(1) | |
| input_file = Path(sys.argv[1]) | |
| output_file = Path(sys.argv[2]) if len(sys.argv) > 2 else None | |
| if not input_file.exists(): | |
| print(f"❌ Error: {input_file} does not exist") | |
| sys.exit(1) | |
| # Run sanitization | |
| found_credentials = sanitize_agent_export(input_file, output_file) | |
| if found_credentials: | |
| print("\n⚠️ WARNING: Credentials were found and replaced with placeholders.") | |
| print(" Review the sanitized file before committing to Git.") | |
| print(" Make sure to use environment variables in Langflow for all credentials.") | |
| sys.exit(1) # Exit with error code to prevent accidental commits | |
| else: | |
| print("\n✅ File is safe to commit.") | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() | |