Cashy / scripts /sanitize_agent_export.py
GitHub Actions
Deploy to HF Spaces
17a78b5
#!/usr/bin/env python3
"""
Sanitize Langflow agent exports by removing sensitive credentials.
Usage:
python scripts/sanitize_agent_export.py <input_file> [output_file]
If output_file is not provided, it will create a sanitized version with '_sanitized' suffix.
"""
import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple
# Patterns that indicate sensitive data
SENSITIVE_PATTERNS = [
r'sk-[a-zA-Z0-9]{20,}', # OpenAI API keys
r'sk-proj-[a-zA-Z0-9]{20,}', # OpenAI project API keys
r'postgresql://[^:]+:[^@]+@', # PostgreSQL connection strings with password
r'mongodb://[^:]+:[^@]+@', # MongoDB connection strings with password
r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*', # Bearer tokens
r'[a-zA-Z0-9]{32,}', # Generic long alphanumeric strings (likely tokens)
]
# Keys that typically contain sensitive data
SENSITIVE_KEYS = [
'api_key',
'apikey',
'openai_api_key',
'langsmith_api_key',
'password',
'secret',
'secret_key',
'token',
'bearer',
'credential',
'auth',
'authorization',
'connection_string',
'database_url',
'db_password',
]
# Replacement values for different credential types
REPLACEMENTS = {
'api_key': '${OPENAI_API_KEY}',
'apikey': '${API_KEY}',
'openai_api_key': '${OPENAI_API_KEY}',
'langsmith_api_key': '${LANGSMITH_API_KEY}',
'password': '${DB_PASSWORD}',
'secret': '${SECRET_KEY}',
'secret_key': '${SECRET_KEY}',
'token': '${AUTH_TOKEN}',
'bearer': '${BEARER_TOKEN}',
'credential': '${CREDENTIAL}',
'auth': '${AUTH_KEY}',
'authorization': '${AUTHORIZATION}',
'connection_string': '${DATABASE_URL}',
'database_url': '${DATABASE_URL}',
'db_password': '${DB_PASSWORD}',
}
class CredentialDetector:
"""Detect and report potential credentials in data structures."""
def __init__(self):
self.findings: List[Tuple[str, str, str]] = [] # (path, key, value)
def scan_value(self, value: str, path: str = "") -> bool:
"""Check if a value matches sensitive patterns."""
if not isinstance(value, str) or len(value) < 8:
return False
for pattern in SENSITIVE_PATTERNS:
if re.search(pattern, value, re.IGNORECASE):
return True
return False
def scan_dict(self, data: Dict[str, Any], path: str = "") -> None:
"""Recursively scan dictionary for sensitive data."""
for key, value in data.items():
current_path = f"{path}.{key}" if path else key
# Check if key name suggests sensitive data
if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
if isinstance(value, str) and value:
self.findings.append((current_path, key, value))
# Check if value matches sensitive patterns
elif isinstance(value, str) and self.scan_value(value, current_path):
self.findings.append((current_path, key, value))
# Recurse into nested structures
elif isinstance(value, dict):
self.scan_dict(value, current_path)
elif isinstance(value, list):
self.scan_list(value, current_path)
def scan_list(self, data: List[Any], path: str = "") -> None:
"""Recursively scan list for sensitive data."""
for i, item in enumerate(data):
current_path = f"{path}[{i}]"
if isinstance(item, dict):
self.scan_dict(item, current_path)
elif isinstance(item, list):
self.scan_list(item, current_path)
elif isinstance(item, str) and self.scan_value(item, current_path):
self.findings.append((current_path, f"item_{i}", item))
def sanitize_value(key: str, value: str) -> str:
"""Replace sensitive value with appropriate placeholder."""
key_lower = key.lower()
# Use specific replacement if key matches known pattern
for sensitive_key, replacement in REPLACEMENTS.items():
if sensitive_key in key_lower:
return replacement
# Default replacement for unknown sensitive data
return "${CREDENTIAL}"
def sanitize_dict(data: Dict[str, Any]) -> Dict[str, Any]:
"""Recursively sanitize dictionary by replacing sensitive values."""
sanitized = {}
for key, value in data.items():
# Check if key suggests sensitive data
if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
if isinstance(value, str) and value:
sanitized[key] = sanitize_value(key, value)
else:
sanitized[key] = value
# Recurse into nested structures
elif isinstance(value, dict):
sanitized[key] = sanitize_dict(value)
elif isinstance(value, list):
sanitized[key] = sanitize_list(value)
else:
sanitized[key] = value
return sanitized
def sanitize_list(data: List[Any]) -> List[Any]:
"""Recursively sanitize list by replacing sensitive values."""
sanitized = []
for item in data:
if isinstance(item, dict):
sanitized.append(sanitize_dict(item))
elif isinstance(item, list):
sanitized.append(sanitize_list(item))
else:
sanitized.append(item)
return sanitized
def sanitize_agent_export(input_file: Path, output_file: Path = None) -> bool:
"""
Sanitize Langflow agent export by removing credentials.
Returns True if credentials were found and sanitized, False otherwise.
"""
# Read input file
try:
with open(input_file, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"❌ Error reading {input_file}: {e}")
return False
# Scan for credentials
detector = CredentialDetector()
detector.scan_dict(data)
if not detector.findings:
print(f"✅ No credentials detected in {input_file}")
return False
# Report findings
print(f"⚠️ Found {len(detector.findings)} potential credential(s) in {input_file}:")
for path, key, value in detector.findings:
# Mask the value for display
masked = value[:8] + "..." if len(value) > 8 else "***"
print(f" - {path}: {key} = {masked}")
# Sanitize data
sanitized_data = sanitize_dict(data)
# Determine output file
if output_file is None:
output_file = input_file.parent / f"{input_file.stem}_sanitized{input_file.suffix}"
# Write sanitized output
try:
with open(output_file, 'w') as f:
json.dump(sanitized_data, f, indent=2)
print(f"✅ Sanitized version saved to: {output_file}")
return True
except Exception as e:
print(f"❌ Error writing {output_file}: {e}")
return False
def main():
if len(sys.argv) < 2:
print("Usage: python sanitize_agent_export.py <input_file> [output_file]")
sys.exit(1)
input_file = Path(sys.argv[1])
output_file = Path(sys.argv[2]) if len(sys.argv) > 2 else None
if not input_file.exists():
print(f"❌ Error: {input_file} does not exist")
sys.exit(1)
# Run sanitization
found_credentials = sanitize_agent_export(input_file, output_file)
if found_credentials:
print("\n⚠️ WARNING: Credentials were found and replaced with placeholders.")
print(" Review the sanitized file before committing to Git.")
print(" Make sure to use environment variables in Langflow for all credentials.")
sys.exit(1) # Exit with error code to prevent accidental commits
else:
print("\n✅ File is safe to commit.")
sys.exit(0)
if __name__ == "__main__":
main()