KManager / scripts /analyze_kiro_traffic.py
StarrySkyWorld's picture
Initial commit
494c89b
"""
Анализатор трафика Kiro - поиск идентификаторов и fingerprints
Анализирует логи mitmproxy и находит:
- machineId
- IP адреса
- UUID/GUID
- Токены и ключи
- Версии ПО
- Имена пользователей/компьютеров
- Любые другие потенциальные идентификаторы
Использование:
python analyze_kiro_traffic.py [log_file]
Если log_file не указан - берёт последний лог из ~/.kiro-manager-wb/proxy_logs/
"""
import re
import json
import sys
from pathlib import Path
from collections import defaultdict
from datetime import datetime
# Директория с логами
LOG_DIR = Path.home() / ".kiro-manager-wb" / "proxy_logs"
# Паттерны для поиска идентификаторов
PATTERNS = {
"machineId (64 hex)": re.compile(r'\b[a-f0-9]{64}\b', re.IGNORECASE),
"UUID/GUID": re.compile(r'\b[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\b', re.IGNORECASE),
"SHA-256 hash": re.compile(r'\b[a-f0-9]{64}\b', re.IGNORECASE),
"MD5 hash": re.compile(r'\b[a-f0-9]{32}\b', re.IGNORECASE),
"Bearer token": re.compile(r'Bearer\s+([A-Za-z0-9_-]+\.?[A-Za-z0-9_-]*\.?[A-Za-z0-9_-]*)', re.IGNORECASE),
"AWS access key": re.compile(r'\b(AKIA[A-Z0-9]{16})\b'),
"IP address": re.compile(r'\b(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\b'),
"Email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"Windows path": re.compile(r'[A-Z]:\\[^"\'<>|\r\n]+', re.IGNORECASE),
"Unix path": re.compile(r'/(?:home|Users)/[^"\'<>|\r\n\s]+'),
"Version string": re.compile(r'\b\d+\.\d+\.\d+(?:\.\d+)?\b'),
"Hostname": re.compile(r'"hostname":\s*"([^"]+)"'),
"Username": re.compile(r'"(?:user|username|userName)":\s*"([^"]+)"', re.IGNORECASE),
"Session ID": re.compile(r'"(?:session|sessionId)":\s*"([^"]+)"', re.IGNORECASE),
"Client ID": re.compile(r'"(?:client|clientId)":\s*"([^"]+)"', re.IGNORECASE),
"Device ID": re.compile(r'"(?:device|deviceId)":\s*"([^"]+)"', re.IGNORECASE),
"Trace ID": re.compile(r'"traceId":\s*"([^"]+)"'),
"Span ID": re.compile(r'"spanId":\s*"([^"]+)"'),
}
# Известные безопасные значения (не идентификаторы)
SAFE_VALUES = {
"127.0.0.1", "localhost", "0.0.0.0",
"0.0.1", "0.8.0", "1.0.0", "2.0.0", # версии
}
# Интересные ключи в JSON
INTERESTING_KEYS = [
"machineId", "machine_id", "deviceId", "device_id",
"userId", "user_id", "username", "userName",
"hostname", "computerName", "platform",
"sessionId", "session_id", "clientId", "client_id",
"fingerprint", "hwid", "uuid", "guid",
"ip", "ipAddress", "ip_address",
"email", "accountId", "account_id",
]
class TrafficAnalyzer:
def __init__(self):
self.findings = defaultdict(lambda: defaultdict(set)) # category -> value -> contexts
self.endpoints = defaultdict(int) # URL -> count
self.headers_seen = defaultdict(set) # header_name -> values
self.json_keys = defaultdict(set) # key -> values
def analyze_file(self, filepath: Path):
"""Анализирует лог файл"""
print(f"\n{'='*60}")
print(f"Analyzing: {filepath.name}")
print(f"{'='*60}\n")
content = filepath.read_text(encoding='utf-8', errors='ignore')
# Разбиваем на запросы
requests = content.split(">>> REQUEST")
for req in requests[1:]: # пропускаем первый пустой
self._analyze_request(req)
self._print_report()
def _analyze_request(self, request_text: str):
"""Анализирует один запрос"""
lines = request_text.strip().split('\n')
# Извлекаем URL
for line in lines:
if line.strip().startswith("URL:"):
url = line.split("URL:", 1)[1].strip()
# Убираем query params для группировки
base_url = url.split('?')[0]
self.endpoints[base_url] += 1
# Ищем паттерны
for category, pattern in PATTERNS.items():
matches = pattern.findall(request_text)
for match in matches:
if isinstance(match, tuple):
match = match[0]
if match not in SAFE_VALUES and len(match) > 3:
# Находим контекст (строку где найдено)
for line in lines:
if match in line:
context = line.strip()[:100]
self.findings[category][match].add(context)
break
# Ищем JSON и анализируем ключи
self._analyze_json(request_text)
def _analyze_json(self, text: str):
"""Извлекает и анализирует JSON из текста"""
# Ищем JSON объекты
json_pattern = re.compile(r'\{[^{}]*\}|\[[^\[\]]*\]')
for match in json_pattern.finditer(text):
try:
data = json.loads(match.group())
self._extract_json_values(data)
except:
pass
# Также ищем ключи напрямую
for key in INTERESTING_KEYS:
pattern = re.compile(rf'"{key}":\s*"([^"]+)"', re.IGNORECASE)
for match in pattern.finditer(text):
value = match.group(1)
if value and len(value) > 2:
self.json_keys[key].add(value)
def _extract_json_values(self, data, prefix=""):
"""Рекурсивно извлекает значения из JSON"""
if isinstance(data, dict):
for key, value in data.items():
full_key = f"{prefix}.{key}" if prefix else key
if key.lower() in [k.lower() for k in INTERESTING_KEYS]:
if isinstance(value, str) and len(value) > 2:
self.json_keys[key].add(value)
self._extract_json_values(value, full_key)
elif isinstance(data, list):
for item in data:
self._extract_json_values(item, prefix)
def _print_report(self):
"""Выводит отчёт"""
print("\n" + "="*60)
print("📊 ANALYSIS REPORT")
print("="*60)
# Endpoints
print("\n🌐 ENDPOINTS ACCESSED:")
print("-"*40)
for url, count in sorted(self.endpoints.items(), key=lambda x: -x[1]):
print(f" [{count:3}x] {url[:80]}")
# Идентификаторы по категориям
print("\n\n🔍 POTENTIAL IDENTIFIERS FOUND:")
print("-"*40)
for category, values in sorted(self.findings.items()):
if values:
print(f"\n 📌 {category}:")
for value, contexts in sorted(values.items()):
# Показываем укороченное значение
short_val = value[:40] + "..." if len(value) > 40 else value
print(f" • {short_val}")
# Показываем один контекст
if contexts:
ctx = list(contexts)[0][:60]
print(f" └─ {ctx}...")
# JSON ключи
print("\n\n🔑 INTERESTING JSON KEYS:")
print("-"*40)
for key, values in sorted(self.json_keys.items()):
if values:
print(f"\n {key}:")
for val in list(values)[:5]: # макс 5 значений
short_val = val[:50] + "..." if len(val) > 50 else val
print(f" • {short_val}")
if len(values) > 5:
print(f" ... and {len(values)-5} more")
# Рекомендации
print("\n\n⚠️ RECOMMENDATIONS:")
print("-"*40)
if "machineId (64 hex)" in self.findings:
print(" ❗ machineId found - needs to be spoofed!")
if "IP address" in self.findings:
ips = self.findings["IP address"]
external_ips = [ip for ip in ips if not ip.startswith(("127.", "192.168.", "10.", "172."))]
if external_ips:
print(" ❗ External IP addresses found - consider VPN")
if "Windows path" in self.findings or "Unix path" in self.findings:
print(" ❗ File paths with username found - may leak identity")
if "Email" in self.findings:
print(" ❗ Email addresses found in traffic")
print("\n" + "="*60)
def get_latest_log() -> Path:
"""Получает последний лог файл"""
if not LOG_DIR.exists():
print(f"Log directory not found: {LOG_DIR}")
sys.exit(1)
logs = sorted(LOG_DIR.glob("kiro_traffic_*.log"), reverse=True)
if not logs:
print("No log files found")
sys.exit(1)
return logs[0]
def main():
if len(sys.argv) > 1:
log_file = Path(sys.argv[1])
else:
log_file = get_latest_log()
if not log_file.exists():
print(f"File not found: {log_file}")
sys.exit(1)
analyzer = TrafficAnalyzer()
analyzer.analyze_file(log_file)
if __name__ == "__main__":
main()