KManager / debugger /analyzers /request_analyzer.py
StarrySkyWorld's picture
Initial commit
494c89b
"""
Request Analyzer - анализ сетевых запросов
"""
import json
from typing import List, Dict, Any
from pathlib import Path
class RequestAnalyzer:
"""
Анализирует сетевые запросы для выявления проблем.
Проверяет:
- Медленные запросы
- Ошибки (4xx, 5xx)
- Подозрительные паттерны (блокировка, rate limiting)
- Fingerprint запросы
"""
def __init__(self, requests: List[Dict] = None, har_path: str = None):
"""
Args:
requests: Список запросов из DebugSession
har_path: Путь к HAR файлу
"""
if har_path:
self.requests = self._load_har(har_path)
else:
self.requests = requests or []
def _load_har(self, path: str) -> List[Dict]:
"""Загружает запросы из HAR файла"""
with open(path, 'r', encoding='utf-8') as f:
har = json.load(f)
requests = []
for entry in har.get('log', {}).get('entries', []):
req = entry.get('request', {})
resp = entry.get('response', {})
requests.append({
'url': req.get('url', ''),
'method': req.get('method', 'GET'),
'status': resp.get('status', 0),
'duration': entry.get('time', 0),
'requestBody': req.get('postData', {}).get('text', ''),
'responseBody': resp.get('content', {}).get('text', ''),
'requestHeaders': {h['name']: h['value'] for h in req.get('headers', [])},
'responseHeaders': {h['name']: h['value'] for h in resp.get('headers', [])},
})
return requests
def analyze(self) -> Dict[str, Any]:
"""Полный анализ запросов"""
return {
'summary': self.get_summary(),
'slow_requests': self.find_slow_requests(),
'errors': self.find_errors(),
'fingerprint_requests': self.find_fingerprint_requests(),
'suspicious_patterns': self.find_suspicious_patterns(),
'api_requests': self.find_api_requests(),
}
def get_summary(self) -> Dict:
"""Общая статистика"""
total = len(self.requests)
if not total:
return {'total': 0}
durations = [r.get('duration', 0) for r in self.requests]
statuses = [r.get('status', 0) for r in self.requests]
return {
'total': total,
'avg_duration_ms': sum(durations) / total,
'max_duration_ms': max(durations),
'min_duration_ms': min(durations),
'errors_count': len([s for s in statuses if s >= 400]),
'success_count': len([s for s in statuses if 200 <= s < 400]),
}
def find_slow_requests(self, threshold_ms: int = 2000) -> List[Dict]:
"""Находит медленные запросы"""
slow = []
for req in self.requests:
duration = req.get('duration', 0)
if duration > threshold_ms:
slow.append({
'url': req.get('url', ''),
'method': req.get('method', ''),
'duration_ms': duration,
'status': req.get('status', 0),
})
return sorted(slow, key=lambda x: x['duration_ms'], reverse=True)
def find_errors(self) -> List[Dict]:
"""Находит запросы с ошибками"""
errors = []
for req in self.requests:
status = req.get('status', 0)
if status >= 400 or req.get('error'):
errors.append({
'url': req.get('url', ''),
'method': req.get('method', ''),
'status': status,
'error': req.get('error', ''),
'response_preview': (req.get('responseBody', '') or '')[:200],
})
return errors
def find_fingerprint_requests(self) -> List[Dict]:
"""Находит запросы связанные с fingerprint"""
keywords = ['fingerprint', 'fwcim', 'metrics', 'telemetry', 'beacon', 'collect']
fp_requests = []
for req in self.requests:
url = req.get('url', '').lower()
if any(kw in url for kw in keywords):
fp_requests.append({
'url': req.get('url', ''),
'method': req.get('method', ''),
'status': req.get('status', 0),
'request_body_preview': (req.get('requestBody', '') or '')[:500],
'response_preview': (req.get('responseBody', '') or '')[:200],
})
return fp_requests
def find_suspicious_patterns(self) -> List[Dict]:
"""Находит подозрительные паттерны"""
patterns = []
# Проверяем на rate limiting
rate_limit_indicators = ['429', 'rate limit', 'too many requests', 'throttl']
for req in self.requests:
status = req.get('status', 0)
response = (req.get('responseBody', '') or '').lower()
if status == 429 or any(ind in response for ind in rate_limit_indicators):
patterns.append({
'type': 'rate_limiting',
'url': req.get('url', ''),
'status': status,
'evidence': response[:200],
})
# Проверяем на блокировку
block_indicators = ['blocked', 'forbidden', 'access denied', 'captcha', 'challenge']
for req in self.requests:
status = req.get('status', 0)
response = (req.get('responseBody', '') or '').lower()
if status == 403 or any(ind in response for ind in block_indicators):
patterns.append({
'type': 'blocked',
'url': req.get('url', ''),
'status': status,
'evidence': response[:200],
})
# Проверяем на automation detection
automation_indicators = ['automation', 'bot', 'selenium', 'webdriver', 'headless']
for req in self.requests:
response = (req.get('responseBody', '') or '').lower()
if any(ind in response for ind in automation_indicators):
patterns.append({
'type': 'automation_detection',
'url': req.get('url', ''),
'evidence': response[:200],
})
return patterns
def find_api_requests(self) -> List[Dict]:
"""Находит API запросы AWS"""
api_patterns = [
'signin.aws',
'profile.aws',
'oidc.',
'awsapps.com',
'/api/',
'send-otp',
'verify',
'login',
'signup',
]
api_requests = []
for req in self.requests:
url = req.get('url', '').lower()
if any(p in url for p in api_patterns):
api_requests.append({
'url': req.get('url', ''),
'method': req.get('method', ''),
'status': req.get('status', 0),
'duration_ms': req.get('duration', 0),
})
return api_requests
def print_report(self):
"""Выводит отчёт в консоль"""
analysis = self.analyze()
print("\n" + "="*60)
print("REQUEST ANALYSIS REPORT")
print("="*60)
summary = analysis['summary']
print(f"\nSUMMARY:")
print(f" Total requests: {summary.get('total', 0)}")
print(f" Avg duration: {summary.get('avg_duration_ms', 0):.0f}ms")
print(f" Max duration: {summary.get('max_duration_ms', 0):.0f}ms")
print(f" Errors: {summary.get('errors_count', 0)}")
slow = analysis['slow_requests'][:10]
if slow:
print(f"\nSLOW REQUESTS (>{2000}ms):")
for req in slow:
print(f" [{req['duration_ms']:5}ms] {req['method']:4} {req['status']:3} {req['url'][:60]}...")
errors = analysis['errors']
if errors:
print(f"\nERRORS:")
for err in errors[:10]:
print(f" [{err['status']}] {err['method']} {err['url'][:60]}...")
if err.get('response_preview'):
print(f" {err['response_preview'][:80]}...")
suspicious = analysis['suspicious_patterns']
if suspicious:
print(f"\n⚠️ SUSPICIOUS PATTERNS:")
for pat in suspicious:
print(f" [{pat['type']}] {pat['url'][:60]}...")
fp = analysis['fingerprint_requests']
if fp:
print(f"\nFINGERPRINT REQUESTS:")
for req in fp[:5]:
print(f" {req['method']} {req['url'][:60]}... -> {req['status']}")
print("\n" + "="*60)