""" Request Analyzer - анализ сетевых запросов """ import json from typing import List, Dict, Any from pathlib import Path class RequestAnalyzer: """ Анализирует сетевые запросы для выявления проблем. Проверяет: - Медленные запросы - Ошибки (4xx, 5xx) - Подозрительные паттерны (блокировка, rate limiting) - Fingerprint запросы """ def __init__(self, requests: List[Dict] = None, har_path: str = None): """ Args: requests: Список запросов из DebugSession har_path: Путь к HAR файлу """ if har_path: self.requests = self._load_har(har_path) else: self.requests = requests or [] def _load_har(self, path: str) -> List[Dict]: """Загружает запросы из HAR файла""" with open(path, 'r', encoding='utf-8') as f: har = json.load(f) requests = [] for entry in har.get('log', {}).get('entries', []): req = entry.get('request', {}) resp = entry.get('response', {}) requests.append({ 'url': req.get('url', ''), 'method': req.get('method', 'GET'), 'status': resp.get('status', 0), 'duration': entry.get('time', 0), 'requestBody': req.get('postData', {}).get('text', ''), 'responseBody': resp.get('content', {}).get('text', ''), 'requestHeaders': {h['name']: h['value'] for h in req.get('headers', [])}, 'responseHeaders': {h['name']: h['value'] for h in resp.get('headers', [])}, }) return requests def analyze(self) -> Dict[str, Any]: """Полный анализ запросов""" return { 'summary': self.get_summary(), 'slow_requests': self.find_slow_requests(), 'errors': self.find_errors(), 'fingerprint_requests': self.find_fingerprint_requests(), 'suspicious_patterns': self.find_suspicious_patterns(), 'api_requests': self.find_api_requests(), } def get_summary(self) -> Dict: """Общая статистика""" total = len(self.requests) if not total: return {'total': 0} durations = [r.get('duration', 0) for r in self.requests] statuses = [r.get('status', 0) for r in self.requests] return { 'total': total, 'avg_duration_ms': sum(durations) / total, 'max_duration_ms': max(durations), 'min_duration_ms': min(durations), 'errors_count': len([s for s in statuses if s >= 400]), 'success_count': len([s for s in statuses if 200 <= s < 400]), } def find_slow_requests(self, threshold_ms: int = 2000) -> List[Dict]: """Находит медленные запросы""" slow = [] for req in self.requests: duration = req.get('duration', 0) if duration > threshold_ms: slow.append({ 'url': req.get('url', ''), 'method': req.get('method', ''), 'duration_ms': duration, 'status': req.get('status', 0), }) return sorted(slow, key=lambda x: x['duration_ms'], reverse=True) def find_errors(self) -> List[Dict]: """Находит запросы с ошибками""" errors = [] for req in self.requests: status = req.get('status', 0) if status >= 400 or req.get('error'): errors.append({ 'url': req.get('url', ''), 'method': req.get('method', ''), 'status': status, 'error': req.get('error', ''), 'response_preview': (req.get('responseBody', '') or '')[:200], }) return errors def find_fingerprint_requests(self) -> List[Dict]: """Находит запросы связанные с fingerprint""" keywords = ['fingerprint', 'fwcim', 'metrics', 'telemetry', 'beacon', 'collect'] fp_requests = [] for req in self.requests: url = req.get('url', '').lower() if any(kw in url for kw in keywords): fp_requests.append({ 'url': req.get('url', ''), 'method': req.get('method', ''), 'status': req.get('status', 0), 'request_body_preview': (req.get('requestBody', '') or '')[:500], 'response_preview': (req.get('responseBody', '') or '')[:200], }) return fp_requests def find_suspicious_patterns(self) -> List[Dict]: """Находит подозрительные паттерны""" patterns = [] # Проверяем на rate limiting rate_limit_indicators = ['429', 'rate limit', 'too many requests', 'throttl'] for req in self.requests: status = req.get('status', 0) response = (req.get('responseBody', '') or '').lower() if status == 429 or any(ind in response for ind in rate_limit_indicators): patterns.append({ 'type': 'rate_limiting', 'url': req.get('url', ''), 'status': status, 'evidence': response[:200], }) # Проверяем на блокировку block_indicators = ['blocked', 'forbidden', 'access denied', 'captcha', 'challenge'] for req in self.requests: status = req.get('status', 0) response = (req.get('responseBody', '') or '').lower() if status == 403 or any(ind in response for ind in block_indicators): patterns.append({ 'type': 'blocked', 'url': req.get('url', ''), 'status': status, 'evidence': response[:200], }) # Проверяем на automation detection automation_indicators = ['automation', 'bot', 'selenium', 'webdriver', 'headless'] for req in self.requests: response = (req.get('responseBody', '') or '').lower() if any(ind in response for ind in automation_indicators): patterns.append({ 'type': 'automation_detection', 'url': req.get('url', ''), 'evidence': response[:200], }) return patterns def find_api_requests(self) -> List[Dict]: """Находит API запросы AWS""" api_patterns = [ 'signin.aws', 'profile.aws', 'oidc.', 'awsapps.com', '/api/', 'send-otp', 'verify', 'login', 'signup', ] api_requests = [] for req in self.requests: url = req.get('url', '').lower() if any(p in url for p in api_patterns): api_requests.append({ 'url': req.get('url', ''), 'method': req.get('method', ''), 'status': req.get('status', 0), 'duration_ms': req.get('duration', 0), }) return api_requests def print_report(self): """Выводит отчёт в консоль""" analysis = self.analyze() print("\n" + "="*60) print("REQUEST ANALYSIS REPORT") print("="*60) summary = analysis['summary'] print(f"\nSUMMARY:") print(f" Total requests: {summary.get('total', 0)}") print(f" Avg duration: {summary.get('avg_duration_ms', 0):.0f}ms") print(f" Max duration: {summary.get('max_duration_ms', 0):.0f}ms") print(f" Errors: {summary.get('errors_count', 0)}") slow = analysis['slow_requests'][:10] if slow: print(f"\nSLOW REQUESTS (>{2000}ms):") for req in slow: print(f" [{req['duration_ms']:5}ms] {req['method']:4} {req['status']:3} {req['url'][:60]}...") errors = analysis['errors'] if errors: print(f"\nERRORS:") for err in errors[:10]: print(f" [{err['status']}] {err['method']} {err['url'][:60]}...") if err.get('response_preview'): print(f" {err['response_preview'][:80]}...") suspicious = analysis['suspicious_patterns'] if suspicious: print(f"\n⚠️ SUSPICIOUS PATTERNS:") for pat in suspicious: print(f" [{pat['type']}] {pat['url'][:60]}...") fp = analysis['fingerprint_requests'] if fp: print(f"\nFINGERPRINT REQUESTS:") for req in fp[:5]: print(f" {req['method']} {req['url'][:60]}... -> {req['status']}") print("\n" + "="*60)