File size: 7,803 Bytes
17a78b5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
#!/usr/bin/env python3
"""
Sanitize Langflow agent exports by removing sensitive credentials.

Usage:
    python scripts/sanitize_agent_export.py <input_file> [output_file]

If output_file is not provided, it will create a sanitized version with '_sanitized' suffix.
"""

import json
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Tuple


# Patterns that indicate sensitive data
SENSITIVE_PATTERNS = [
    r'sk-[a-zA-Z0-9]{20,}',  # OpenAI API keys
    r'sk-proj-[a-zA-Z0-9]{20,}',  # OpenAI project API keys
    r'postgresql://[^:]+:[^@]+@',  # PostgreSQL connection strings with password
    r'mongodb://[^:]+:[^@]+@',  # MongoDB connection strings with password
    r'Bearer\s+[a-zA-Z0-9\-._~+/]+=*',  # Bearer tokens
    r'[a-zA-Z0-9]{32,}',  # Generic long alphanumeric strings (likely tokens)
]

# Keys that typically contain sensitive data
SENSITIVE_KEYS = [
    'api_key',
    'apikey',
    'openai_api_key',
    'langsmith_api_key',
    'password',
    'secret',
    'secret_key',
    'token',
    'bearer',
    'credential',
    'auth',
    'authorization',
    'connection_string',
    'database_url',
    'db_password',
]

# Replacement values for different credential types
REPLACEMENTS = {
    'api_key': '${OPENAI_API_KEY}',
    'apikey': '${API_KEY}',
    'openai_api_key': '${OPENAI_API_KEY}',
    'langsmith_api_key': '${LANGSMITH_API_KEY}',
    'password': '${DB_PASSWORD}',
    'secret': '${SECRET_KEY}',
    'secret_key': '${SECRET_KEY}',
    'token': '${AUTH_TOKEN}',
    'bearer': '${BEARER_TOKEN}',
    'credential': '${CREDENTIAL}',
    'auth': '${AUTH_KEY}',
    'authorization': '${AUTHORIZATION}',
    'connection_string': '${DATABASE_URL}',
    'database_url': '${DATABASE_URL}',
    'db_password': '${DB_PASSWORD}',
}


class CredentialDetector:
    """Detect and report potential credentials in data structures."""

    def __init__(self):
        self.findings: List[Tuple[str, str, str]] = []  # (path, key, value)

    def scan_value(self, value: str, path: str = "") -> bool:
        """Check if a value matches sensitive patterns."""
        if not isinstance(value, str) or len(value) < 8:
            return False

        for pattern in SENSITIVE_PATTERNS:
            if re.search(pattern, value, re.IGNORECASE):
                return True
        return False

    def scan_dict(self, data: Dict[str, Any], path: str = "") -> None:
        """Recursively scan dictionary for sensitive data."""
        for key, value in data.items():
            current_path = f"{path}.{key}" if path else key

            # Check if key name suggests sensitive data
            if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
                if isinstance(value, str) and value:
                    self.findings.append((current_path, key, value))

            # Check if value matches sensitive patterns
            elif isinstance(value, str) and self.scan_value(value, current_path):
                self.findings.append((current_path, key, value))

            # Recurse into nested structures
            elif isinstance(value, dict):
                self.scan_dict(value, current_path)
            elif isinstance(value, list):
                self.scan_list(value, current_path)

    def scan_list(self, data: List[Any], path: str = "") -> None:
        """Recursively scan list for sensitive data."""
        for i, item in enumerate(data):
            current_path = f"{path}[{i}]"

            if isinstance(item, dict):
                self.scan_dict(item, current_path)
            elif isinstance(item, list):
                self.scan_list(item, current_path)
            elif isinstance(item, str) and self.scan_value(item, current_path):
                self.findings.append((current_path, f"item_{i}", item))


def sanitize_value(key: str, value: str) -> str:
    """Replace sensitive value with appropriate placeholder."""
    key_lower = key.lower()

    # Use specific replacement if key matches known pattern
    for sensitive_key, replacement in REPLACEMENTS.items():
        if sensitive_key in key_lower:
            return replacement

    # Default replacement for unknown sensitive data
    return "${CREDENTIAL}"


def sanitize_dict(data: Dict[str, Any]) -> Dict[str, Any]:
    """Recursively sanitize dictionary by replacing sensitive values."""
    sanitized = {}

    for key, value in data.items():
        # Check if key suggests sensitive data
        if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
            if isinstance(value, str) and value:
                sanitized[key] = sanitize_value(key, value)
            else:
                sanitized[key] = value

        # Recurse into nested structures
        elif isinstance(value, dict):
            sanitized[key] = sanitize_dict(value)
        elif isinstance(value, list):
            sanitized[key] = sanitize_list(value)
        else:
            sanitized[key] = value

    return sanitized


def sanitize_list(data: List[Any]) -> List[Any]:
    """Recursively sanitize list by replacing sensitive values."""
    sanitized = []

    for item in data:
        if isinstance(item, dict):
            sanitized.append(sanitize_dict(item))
        elif isinstance(item, list):
            sanitized.append(sanitize_list(item))
        else:
            sanitized.append(item)

    return sanitized


def sanitize_agent_export(input_file: Path, output_file: Path = None) -> bool:
    """
    Sanitize Langflow agent export by removing credentials.

    Returns True if credentials were found and sanitized, False otherwise.
    """
    # Read input file
    try:
        with open(input_file, 'r') as f:
            data = json.load(f)
    except Exception as e:
        print(f"❌ Error reading {input_file}: {e}")
        return False

    # Scan for credentials
    detector = CredentialDetector()
    detector.scan_dict(data)

    if not detector.findings:
        print(f"✅ No credentials detected in {input_file}")
        return False

    # Report findings
    print(f"⚠️  Found {len(detector.findings)} potential credential(s) in {input_file}:")
    for path, key, value in detector.findings:
        # Mask the value for display
        masked = value[:8] + "..." if len(value) > 8 else "***"
        print(f"   - {path}: {key} = {masked}")

    # Sanitize data
    sanitized_data = sanitize_dict(data)

    # Determine output file
    if output_file is None:
        output_file = input_file.parent / f"{input_file.stem}_sanitized{input_file.suffix}"

    # Write sanitized output
    try:
        with open(output_file, 'w') as f:
            json.dump(sanitized_data, f, indent=2)
        print(f"✅ Sanitized version saved to: {output_file}")
        return True
    except Exception as e:
        print(f"❌ Error writing {output_file}: {e}")
        return False


def main():
    if len(sys.argv) < 2:
        print("Usage: python sanitize_agent_export.py <input_file> [output_file]")
        sys.exit(1)

    input_file = Path(sys.argv[1])
    output_file = Path(sys.argv[2]) if len(sys.argv) > 2 else None

    if not input_file.exists():
        print(f"❌ Error: {input_file} does not exist")
        sys.exit(1)

    # Run sanitization
    found_credentials = sanitize_agent_export(input_file, output_file)

    if found_credentials:
        print("\n⚠️  WARNING: Credentials were found and replaced with placeholders.")
        print("   Review the sanitized file before committing to Git.")
        print("   Make sure to use environment variables in Langflow for all credentials.")
        sys.exit(1)  # Exit with error code to prevent accidental commits
    else:
        print("\n✅ File is safe to commit.")
        sys.exit(0)


if __name__ == "__main__":
    main()