File size: 10,186 Bytes
3a68c3f
 
 
 
 
9426190
3a68c3f
 
 
9426190
3a68c3f
e833b51
3a68c3f
9426190
3a68c3f
9426190
3a68c3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9426190
 
 
 
3a68c3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9426190
 
 
3a68c3f
 
 
9426190
 
3a68c3f
 
9426190
 
 
 
3a68c3f
 
 
9426190
 
 
3a68c3f
 
 
 
 
 
 
 
 
9426190
3a68c3f
9426190
 
3a68c3f
 
9426190
 
 
 
 
3a68c3f
9426190
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3a68c3f
 
 
9426190
3a68c3f
 
 
 
 
 
 
 
9426190
3a68c3f
 
 
 
 
 
 
 
 
 
e2d980c
 
 
 
 
3a68c3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9426190
 
 
 
 
 
 
 
 
3a68c3f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import requests
import threading
import time
import random
from datetime import datetime, timedelta
from urllib.parse import urlparse

class ProxyPool:
    def __init__(self, options={}):
        self.target_count = options.get('target_count', 10)  # Reduced since we get one at a time
        self.test_timeout = options.get('test_timeout', 5)
        self.request_timeout = options.get('request_timeout', 5)
        self.target_url = options.get('target_url', 'https://app.freeplay.ai/')
        self.min_threshold = options.get('min_threshold', 3)  # Reduced threshold
        self.check_interval = options.get('check_interval', 30)
        self.max_refill_attempts = options.get('max_refill_attempts', 10)
        self.retry_delay = options.get('retry_delay', 1)

        self.available_proxies = []
        self.current_index = 0
        self.is_initialized = False
        self.is_refilling = False
        self.check_timer = None
        self.lock = threading.Lock()

    def initialize(self):
        if self.is_initialized:
            return
        print(f"Initializing proxy pool, target count: {self.target_count}")
        self.refill_proxies()
        self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
        self.check_timer.start()
        self.is_initialized = True
        print(f"Proxy pool initialized, current available proxies: {len(self.available_proxies)}")

    def stop(self):
        if self.check_timer:
            self.check_timer.cancel()
            self.check_timer = None
        print("Proxy pool service stopped")

    def check_and_refill(self):
        with self.lock:
            if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
                print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting refill")
                # Start refill in a separate thread to avoid blocking
                refill_thread = threading.Thread(target=self.refill_proxies)
                refill_thread.daemon = True
                refill_thread.start()
        if self.is_initialized:
            self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
            self.check_timer.start()

    def refill_proxies(self):
        if self.is_refilling:
            return
        self.is_refilling = True
        print(f"Starting to refill proxies, current count: {len(self.available_proxies)}, target: {self.target_count}")
        
        attempts = 0
        try:
            while len(self.available_proxies) < self.target_count and attempts < self.max_refill_attempts:
                attempts += 1
                print(f"Refill attempt #{attempts}, current available: {len(self.available_proxies)}/{self.target_count}")
                
                proxy_url = self.get_proxy_from_provider()
                if not proxy_url:
                    print(f"No proxy received, retrying in {self.retry_delay} seconds...")
                    time.sleep(self.retry_delay)
                    continue

                if self.is_proxy_duplicate(proxy_url):
                    print(f"Proxy {proxy_url} already exists, getting new one...")
                    continue

                if self.test_and_add_proxy(proxy_url):
                    print(f"Successfully added proxy, current available: {len(self.available_proxies)}/{self.target_count}")
                else:
                    print(f"Proxy {proxy_url} failed test, trying next...")
                
                if len(self.available_proxies) >= self.target_count:
                    break
                    
                time.sleep(0.5)  # Small delay between requests
                
        except Exception as e:
            print(f"Error during proxy refill: {e}")
        finally:
            self.is_refilling = False
            if len(self.available_proxies) >= self.target_count:
                print(f"Proxy refill complete, current available: {len(self.available_proxies)}/{self.target_count}")
            else:
                print(f"Max refill attempts reached, current available: {len(self.available_proxies)}/{self.target_count}")

    def get_proxy_from_provider(self):
        try:
            url = "https://proxy.doudouzi.me/random/us?host=us.proxy302.com"
            print(f"Getting proxy from: {url}")
            response = requests.get(url, timeout=10)
            if response.status_code == 200:
                proxy_url = response.text.strip()
                print(f"Successfully got proxy: {proxy_url}")
                return proxy_url
            print(f"Failed to get proxy. Status: {response.status_code}, Response: {response.text}")
            return None
        except Exception as e:
            print(f"Error getting proxy: {e}")
            return None

    def parse_proxy_url(self, proxy_url):
        """Parse proxy URL like http://username:password@host:port"""
        try:
            parsed = urlparse(proxy_url)
            return {
                'protocol': parsed.scheme,
                'username': parsed.username,
                'password': parsed.password,
                'host': parsed.hostname,
                'port': str(parsed.port),
                'full': proxy_url
            }
        except Exception as e:
            print(f"Error parsing proxy URL {proxy_url}: {e}")
            return None

    def is_proxy_duplicate(self, proxy_url):
        """Check if proxy already exists in the pool"""
        # Note: This method assumes the caller already holds self.lock
        return any(p['full'] == proxy_url for p in self.available_proxies)

    def test_and_add_proxy(self, proxy_url):
        if self.test_proxy(proxy_url):
            with self.lock:
                if not self.is_proxy_duplicate(proxy_url):
                    parsed = self.parse_proxy_url(proxy_url)
                    if parsed:
                        proxy_obj = {
                            'ip': parsed['host'],
                            'port': parsed['port'],
                            'protocol': parsed['protocol'],
                            'username': parsed['username'],
                            'password': parsed['password'],
                            'full': parsed['full'],
                            'added_at': datetime.now().isoformat()
                        }
                        self.available_proxies.append(proxy_obj)
                        print(f"Successfully added proxy: {parsed['host']}:{parsed['port']}")
                        return True
        return False

    def test_proxy(self, proxy_url):
        try:
            proxies = {'http': proxy_url, 'https': proxy_url}
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            response = requests.get(self.target_url, proxies=proxies, headers=headers, timeout=self.request_timeout, allow_redirects=True)
            is_valid = response.status_code == 200
            if is_valid:
                print(f"Proxy {proxy_url} successfully tested, status: {response.status_code}")
            else:
                print(f"Proxy {proxy_url} failed test, status: {response.status_code}")
            return is_valid
        except Exception as e:
            print(f"Proxy {proxy_url} request error: {e}")
            return False

    def get_proxy(self):
        with self.lock:
            if not self.available_proxies:
                print("No available proxies")
                return None
            
            # Ensure current_index is within bounds
            if self.current_index >= len(self.available_proxies):
                self.current_index = 0
                
            proxy = self.available_proxies[self.current_index]
            self.current_index = (self.current_index + 1) % len(self.available_proxies)
            return proxy

    def remove_proxy(self, ip, port):
        with self.lock:
            port_str = str(port)
            initial_length = len(self.available_proxies)
            self.available_proxies = [p for p in self.available_proxies if not (p['ip'] == ip and p['port'] == port_str)]
            if self.current_index >= len(self.available_proxies) and self.available_proxies:
                self.current_index = 0
            removed = initial_length > len(self.available_proxies)
            if removed:
                print(f"Removed proxy {ip}:{port}, current available: {len(self.available_proxies)}")
            else:
                print(f"Could not find proxy to remove {ip}:{port}")
            
            # Check if we need to refill asynchronously to avoid blocking the caller
            if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
                print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting async refill")
                # Start refill in a separate thread to avoid blocking
                refill_thread = threading.Thread(target=self.refill_proxies)
                refill_thread.daemon = True
                refill_thread.start()
            
            return removed

    def get_all_proxies(self):
        with self.lock:
            return list(self.available_proxies)

    def get_count(self):
        with self.lock:
            return len(self.available_proxies)

if __name__ == '__main__':
    proxy_pool = ProxyPool({
        'target_count': 10,
        'min_threshold': 3,
        'check_interval': 60,
        'target_url': 'https://app.freeplay.ai/',
        'concurrent_requests': 15,
        'max_refill_attempts': 15,
        'retry_delay': 1
    })
    proxy_pool.initialize()
    
    time.sleep(5)
    proxy = proxy_pool.get_proxy()
    print(f"Got proxy: {proxy}")

    if proxy:
        time.sleep(5)
        proxy_pool.remove_proxy(proxy['ip'], proxy['port'])

    all_proxies = proxy_pool.get_all_proxies()
    print(f"Current all proxies ({len(all_proxies)}): {all_proxies}")

    time.sleep(5)
    proxy_pool.stop()
    print("Proxy pool example finished.")