Spaces:
Sleeping
Sleeping
| """ | |
| Анализатор трафика Kiro - поиск идентификаторов и fingerprints | |
| Анализирует логи mitmproxy и находит: | |
| - machineId | |
| - IP адреса | |
| - UUID/GUID | |
| - Токены и ключи | |
| - Версии ПО | |
| - Имена пользователей/компьютеров | |
| - Любые другие потенциальные идентификаторы | |
| Использование: | |
| python analyze_kiro_traffic.py [log_file] | |
| Если log_file не указан - берёт последний лог из ~/.kiro-manager-wb/proxy_logs/ | |
| """ | |
| import re | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from collections import defaultdict | |
| from datetime import datetime | |
| # Директория с логами | |
| LOG_DIR = Path.home() / ".kiro-manager-wb" / "proxy_logs" | |
| # Паттерны для поиска идентификаторов | |
| PATTERNS = { | |
| "machineId (64 hex)": re.compile(r'\b[a-f0-9]{64}\b', re.IGNORECASE), | |
| "UUID/GUID": re.compile(r'\b[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\b', re.IGNORECASE), | |
| "SHA-256 hash": re.compile(r'\b[a-f0-9]{64}\b', re.IGNORECASE), | |
| "MD5 hash": re.compile(r'\b[a-f0-9]{32}\b', re.IGNORECASE), | |
| "Bearer token": re.compile(r'Bearer\s+([A-Za-z0-9_-]+\.?[A-Za-z0-9_-]*\.?[A-Za-z0-9_-]*)', re.IGNORECASE), | |
| "AWS access key": re.compile(r'\b(AKIA[A-Z0-9]{16})\b'), | |
| "IP address": re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b'), | |
| "Email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), | |
| "Windows path": re.compile(r'[A-Z]:\\[^"\'<>|\r\n]+', re.IGNORECASE), | |
| "Unix path": re.compile(r'/(?:home|Users)/[^"\'<>|\r\n\s]+'), | |
| "Version string": re.compile(r'\b\d+\.\d+\.\d+(?:\.\d+)?\b'), | |
| "Hostname": re.compile(r'"hostname":\s*"([^"]+)"'), | |
| "Username": re.compile(r'"(?:user|username|userName)":\s*"([^"]+)"', re.IGNORECASE), | |
| "Session ID": re.compile(r'"(?:session|sessionId)":\s*"([^"]+)"', re.IGNORECASE), | |
| "Client ID": re.compile(r'"(?:client|clientId)":\s*"([^"]+)"', re.IGNORECASE), | |
| "Device ID": re.compile(r'"(?:device|deviceId)":\s*"([^"]+)"', re.IGNORECASE), | |
| "Trace ID": re.compile(r'"traceId":\s*"([^"]+)"'), | |
| "Span ID": re.compile(r'"spanId":\s*"([^"]+)"'), | |
| } | |
| # Известные безопасные значения (не идентификаторы) | |
| SAFE_VALUES = { | |
| "127.0.0.1", "localhost", "0.0.0.0", | |
| "0.0.1", "0.8.0", "1.0.0", "2.0.0", # версии | |
| } | |
| # Интересные ключи в JSON | |
| INTERESTING_KEYS = [ | |
| "machineId", "machine_id", "deviceId", "device_id", | |
| "userId", "user_id", "username", "userName", | |
| "hostname", "computerName", "platform", | |
| "sessionId", "session_id", "clientId", "client_id", | |
| "fingerprint", "hwid", "uuid", "guid", | |
| "ip", "ipAddress", "ip_address", | |
| "email", "accountId", "account_id", | |
| ] | |
| class TrafficAnalyzer: | |
| def __init__(self): | |
| self.findings = defaultdict(lambda: defaultdict(set)) # category -> value -> contexts | |
| self.endpoints = defaultdict(int) # URL -> count | |
| self.headers_seen = defaultdict(set) # header_name -> values | |
| self.json_keys = defaultdict(set) # key -> values | |
| def analyze_file(self, filepath: Path): | |
| """Анализирует лог файл""" | |
| print(f"\n{'='*60}") | |
| print(f"Analyzing: {filepath.name}") | |
| print(f"{'='*60}\n") | |
| content = filepath.read_text(encoding='utf-8', errors='ignore') | |
| # Разбиваем на запросы | |
| requests = content.split(">>> REQUEST") | |
| for req in requests[1:]: # пропускаем первый пустой | |
| self._analyze_request(req) | |
| self._print_report() | |
| def _analyze_request(self, request_text: str): | |
| """Анализирует один запрос""" | |
| lines = request_text.strip().split('\n') | |
| # Извлекаем URL | |
| for line in lines: | |
| if line.strip().startswith("URL:"): | |
| url = line.split("URL:", 1)[1].strip() | |
| # Убираем query params для группировки | |
| base_url = url.split('?')[0] | |
| self.endpoints[base_url] += 1 | |
| # Ищем паттерны | |
| for category, pattern in PATTERNS.items(): | |
| matches = pattern.findall(request_text) | |
| for match in matches: | |
| if isinstance(match, tuple): | |
| match = match[0] | |
| if match not in SAFE_VALUES and len(match) > 3: | |
| # Находим контекст (строку где найдено) | |
| for line in lines: | |
| if match in line: | |
| context = line.strip()[:100] | |
| self.findings[category][match].add(context) | |
| break | |
| # Ищем JSON и анализируем ключи | |
| self._analyze_json(request_text) | |
| def _analyze_json(self, text: str): | |
| """Извлекает и анализирует JSON из текста""" | |
| # Ищем JSON объекты | |
| json_pattern = re.compile(r'\{[^{}]*\}|\[[^\[\]]*\]') | |
| for match in json_pattern.finditer(text): | |
| try: | |
| data = json.loads(match.group()) | |
| self._extract_json_values(data) | |
| except: | |
| pass | |
| # Также ищем ключи напрямую | |
| for key in INTERESTING_KEYS: | |
| pattern = re.compile(rf'"{key}":\s*"([^"]+)"', re.IGNORECASE) | |
| for match in pattern.finditer(text): | |
| value = match.group(1) | |
| if value and len(value) > 2: | |
| self.json_keys[key].add(value) | |
| def _extract_json_values(self, data, prefix=""): | |
| """Рекурсивно извлекает значения из JSON""" | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| full_key = f"{prefix}.{key}" if prefix else key | |
| if key.lower() in [k.lower() for k in INTERESTING_KEYS]: | |
| if isinstance(value, str) and len(value) > 2: | |
| self.json_keys[key].add(value) | |
| self._extract_json_values(value, full_key) | |
| elif isinstance(data, list): | |
| for item in data: | |
| self._extract_json_values(item, prefix) | |
| def _print_report(self): | |
| """Выводит отчёт""" | |
| print("\n" + "="*60) | |
| print("📊 ANALYSIS REPORT") | |
| print("="*60) | |
| # Endpoints | |
| print("\n🌐 ENDPOINTS ACCESSED:") | |
| print("-"*40) | |
| for url, count in sorted(self.endpoints.items(), key=lambda x: -x[1]): | |
| print(f" [{count:3}x] {url[:80]}") | |
| # Идентификаторы по категориям | |
| print("\n\n🔍 POTENTIAL IDENTIFIERS FOUND:") | |
| print("-"*40) | |
| for category, values in sorted(self.findings.items()): | |
| if values: | |
| print(f"\n 📌 {category}:") | |
| for value, contexts in sorted(values.items()): | |
| # Показываем укороченное значение | |
| short_val = value[:40] + "..." if len(value) > 40 else value | |
| print(f" • {short_val}") | |
| # Показываем один контекст | |
| if contexts: | |
| ctx = list(contexts)[0][:60] | |
| print(f" └─ {ctx}...") | |
| # JSON ключи | |
| print("\n\n🔑 INTERESTING JSON KEYS:") | |
| print("-"*40) | |
| for key, values in sorted(self.json_keys.items()): | |
| if values: | |
| print(f"\n {key}:") | |
| for val in list(values)[:5]: # макс 5 значений | |
| short_val = val[:50] + "..." if len(val) > 50 else val | |
| print(f" • {short_val}") | |
| if len(values) > 5: | |
| print(f" ... and {len(values)-5} more") | |
| # Рекомендации | |
| print("\n\n⚠️ RECOMMENDATIONS:") | |
| print("-"*40) | |
| if "machineId (64 hex)" in self.findings: | |
| print(" ❗ machineId found - needs to be spoofed!") | |
| if "IP address" in self.findings: | |
| ips = self.findings["IP address"] | |
| external_ips = [ip for ip in ips if not ip.startswith(("127.", "192.168.", "10.", "172."))] | |
| if external_ips: | |
| print(" ❗ External IP addresses found - consider VPN") | |
| if "Windows path" in self.findings or "Unix path" in self.findings: | |
| print(" ❗ File paths with username found - may leak identity") | |
| if "Email" in self.findings: | |
| print(" ❗ Email addresses found in traffic") | |
| print("\n" + "="*60) | |
| def get_latest_log() -> Path: | |
| """Получает последний лог файл""" | |
| if not LOG_DIR.exists(): | |
| print(f"Log directory not found: {LOG_DIR}") | |
| sys.exit(1) | |
| logs = sorted(LOG_DIR.glob("kiro_traffic_*.log"), reverse=True) | |
| if not logs: | |
| print("No log files found") | |
| sys.exit(1) | |
| return logs[0] | |
| def main(): | |
| if len(sys.argv) > 1: | |
| log_file = Path(sys.argv[1]) | |
| else: | |
| log_file = get_latest_log() | |
| if not log_file.exists(): | |
| print(f"File not found: {log_file}") | |
| sys.exit(1) | |
| analyzer = TrafficAnalyzer() | |
| analyzer.analyze_file(log_file) | |
| if __name__ == "__main__": | |
| main() | |