File size: 8,619 Bytes
c5f9050
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import os, json, random, time, asyncio, logging
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import base64

logger = logging.getLogger(__name__)

class ProxyHealth(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    BLOCKED = "blocked"
    FAILED = "failed"

@dataclass
class ProxyInfo:
    server: str
    username: Optional[str] = None
    password: Optional[str] = None
    location: str = "unknown"
    health: ProxyHealth = ProxyHealth.HEALTHY
    success_count: int = 0
    failure_count: int = 0
    last_used: float = 0
    blocked_sites: set = None
    response_time: float = 0
    consecutive_failures: int = 0
    
    def __post_init__(self):
        if self.blocked_sites is None:
            self.blocked_sites = set()
    
    @property
    def success_rate(self) -> float:
        total = self.success_count + self.failure_count
        return self.success_count / total if total > 0 else 1.0
    
    def to_playwright_dict(self) -> Dict:
        proxy_dict = {"server": self.server}
        if self.username:
            proxy_dict["username"] = self.username
        if self.password:
            proxy_dict["password"] = self.password
        return proxy_dict

class SmartProxyManager:
    def __init__(self, vision_model=None):
        self.proxies: List[ProxyInfo] = []
        self.current_proxy_index = 0
        self.vision_model = vision_model
        self.max_proxy_retries = 5
        self.max_consecutive_failures = 3
        
        self._load_proxies()
    
    def _load_proxies(self):
        """Load proxies from environment or config"""
        source = os.getenv("SCRAPER_PROXIES", "[]")
        proxy_data = json.loads(source)
        
        for proxy in proxy_data:
            if isinstance(proxy, str):
                self.proxies.append(ProxyInfo(server=proxy))
            elif isinstance(proxy, dict):
                self.proxies.append(ProxyInfo(
                    server=proxy.get("server", ""),
                    username=proxy.get("username"),
                    password=proxy.get("password"),
                    location=proxy.get("location", "unknown")
                ))
        
        logger.info(f"Loaded {len(self.proxies)} proxies for smart rotation")
    
    def get_best_proxy(self, exclude_blocked_for: str = None) -> Optional[ProxyInfo]:
        """Get the best available proxy based on performance metrics"""
        if not self.proxies:
            return None
        
        # Filter out failed and heavily blocked proxies
        available_proxies = [
            p for p in self.proxies 
            if p.health != ProxyHealth.FAILED and 
            p.consecutive_failures < self.max_consecutive_failures and
            (not exclude_blocked_for or exclude_blocked_for not in p.blocked_sites)
        ]
        
        if not available_proxies:
            # Reset consecutive failures and try again
            for proxy in self.proxies:
                proxy.consecutive_failures = 0
            available_proxies = [p for p in self.proxies if p.health != ProxyHealth.FAILED]
        
        if not available_proxies:
            logger.error("No available proxies found!")
            return None
        
        # Sort by success rate and response time
        sorted_proxies = sorted(
            available_proxies,
            key=lambda p: (p.success_rate, -p.response_time, -p.last_used),
            reverse=True
        )
        
        return sorted_proxies[0]
    
    async def detect_anti_bot_with_vision(self, page, goal: str) -> Tuple[bool, str, Optional[str]]:
        """Use vision model to detect anti-bot systems"""
        if not self.vision_model:
            return False, "", None
        
        try:
            # Take screenshot for vision analysis
            screenshot_bytes = await page.screenshot(type='png')
            screenshot_b64 = base64.b64encode(screenshot_bytes).decode('utf-8')
            
            # Get page content for context
            page_title = await page.title()
            page_url = page.url
            
            # Create anti-bot detection prompt
            detection_prompt = f"""
            ANTI-BOT DETECTION TASK:
            
            You are analyzing a webpage screenshot to detect if we've encountered an anti-bot system, CAPTCHA, or access restriction.
            
            Current URL: {page_url}
            Page Title: {page_title}
            Original Goal: {goal}
            
            Look for these indicators:
            1. **Cloudflare protection pages** - "Checking your browser", "Please wait", security checks
            2. **CAPTCHA challenges** - Image puzzles, reCAPTCHA, hCaptcha, text verification
            3. **Access denied pages** - "Access Denied", "Blocked", "Rate Limited"
            4. **Bot detection warnings** - "Automated traffic detected", "Unusual activity"
            5. **Verification pages** - Phone verification, email verification, identity checks
            6. **Error pages** - 403 Forbidden, 429 Rate Limited, 503 Service Unavailable
            7. **Loading/waiting pages** - Indefinite loading, "Please wait while we verify"
            
            Respond with JSON:
            {{
                "is_anti_bot": true/false,
                "detection_type": "cloudflare|captcha|access_denied|rate_limit|verification|error|none",
                "confidence": 0.0-1.0,
                "description": "Brief description of what you see",
                "can_solve": true/false,
                "suggested_action": "rotate_proxy|solve_captcha|wait|retry|abort"
            }}
            """
            
            # Use vision model to analyze
            result = await self.vision_model.analyze_anti_bot_page(
                screenshot_b64, detection_prompt, page_url
            )
            
            if result.get("is_anti_bot", False):
                detection_type = result.get("detection_type", "unknown")
                suggested_action = result.get("suggested_action", "rotate_proxy")
                description = result.get("description", "Anti-bot system detected")
                
                logger.warning(f"🚫 Anti-bot detected: {detection_type} - {description}")
                return True, detection_type, suggested_action
            
            return False, "", None
            
        except Exception as e:
            logger.error(f"Error in vision-based anti-bot detection: {e}")
            return False, "", None
    
    def mark_proxy_success(self, proxy: ProxyInfo, response_time: float = 0):
        """Mark proxy as successful"""
        proxy.success_count += 1
        proxy.consecutive_failures = 0
        proxy.last_used = time.time()
        proxy.response_time = response_time
        proxy.health = ProxyHealth.HEALTHY
        logger.debug(f"✅ Proxy {proxy.server} marked successful")
    
    def mark_proxy_failure(self, proxy: ProxyInfo, site_url: str = None, detection_type: str = None):
        """Mark proxy as failed"""
        proxy.failure_count += 1
        proxy.consecutive_failures += 1
        
        if detection_type in ["cloudflare", "rate_limit"]:
            proxy.blocked_sites.add(site_url)
            proxy.health = ProxyHealth.BLOCKED
            logger.warning(f"🚫 Proxy {proxy.server} blocked by {detection_type} for {site_url}")
        else:
            proxy.health = ProxyHealth.DEGRADED
        
        # Mark as completely failed if too many consecutive failures
        if proxy.consecutive_failures >= self.max_consecutive_failures:
            proxy.health = ProxyHealth.FAILED
            logger.error(f"❌ Proxy {proxy.server} marked as failed after {proxy.consecutive_failures} consecutive failures")
    
    def get_proxy_stats(self) -> Dict:
        """Get comprehensive proxy statistics"""
        if not self.proxies:
            return {"total": 0, "healthy": 0, "blocked": 0, "failed": 0, "available": 0}
        
        stats = {
            "total": len(self.proxies),
            "healthy": len([p for p in self.proxies if p.health == ProxyHealth.HEALTHY]),
            "degraded": len([p for p in self.proxies if p.health == ProxyHealth.DEGRADED]),
            "blocked": len([p for p in self.proxies if p.health == ProxyHealth.BLOCKED]),
            "failed": len([p for p in self.proxies if p.health == ProxyHealth.FAILED]),
            "available": len([p for p in self.proxies if p.health != ProxyHealth.FAILED and p.consecutive_failures < self.max_consecutive_failures])
        }
        return stats