Spaces:
Sleeping
Sleeping
File size: 10,186 Bytes
3a68c3f 9426190 3a68c3f 9426190 3a68c3f e833b51 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f 9426190 3a68c3f e2d980c 3a68c3f 9426190 3a68c3f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | import requests
import threading
import time
import random
from datetime import datetime, timedelta
from urllib.parse import urlparse
class ProxyPool:
def __init__(self, options={}):
self.target_count = options.get('target_count', 10) # Reduced since we get one at a time
self.test_timeout = options.get('test_timeout', 5)
self.request_timeout = options.get('request_timeout', 5)
self.target_url = options.get('target_url', 'https://app.freeplay.ai/')
self.min_threshold = options.get('min_threshold', 3) # Reduced threshold
self.check_interval = options.get('check_interval', 30)
self.max_refill_attempts = options.get('max_refill_attempts', 10)
self.retry_delay = options.get('retry_delay', 1)
self.available_proxies = []
self.current_index = 0
self.is_initialized = False
self.is_refilling = False
self.check_timer = None
self.lock = threading.Lock()
def initialize(self):
if self.is_initialized:
return
print(f"Initializing proxy pool, target count: {self.target_count}")
self.refill_proxies()
self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
self.check_timer.start()
self.is_initialized = True
print(f"Proxy pool initialized, current available proxies: {len(self.available_proxies)}")
def stop(self):
if self.check_timer:
self.check_timer.cancel()
self.check_timer = None
print("Proxy pool service stopped")
def check_and_refill(self):
with self.lock:
if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting refill")
# Start refill in a separate thread to avoid blocking
refill_thread = threading.Thread(target=self.refill_proxies)
refill_thread.daemon = True
refill_thread.start()
if self.is_initialized:
self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
self.check_timer.start()
def refill_proxies(self):
if self.is_refilling:
return
self.is_refilling = True
print(f"Starting to refill proxies, current count: {len(self.available_proxies)}, target: {self.target_count}")
attempts = 0
try:
while len(self.available_proxies) < self.target_count and attempts < self.max_refill_attempts:
attempts += 1
print(f"Refill attempt #{attempts}, current available: {len(self.available_proxies)}/{self.target_count}")
proxy_url = self.get_proxy_from_provider()
if not proxy_url:
print(f"No proxy received, retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
continue
if self.is_proxy_duplicate(proxy_url):
print(f"Proxy {proxy_url} already exists, getting new one...")
continue
if self.test_and_add_proxy(proxy_url):
print(f"Successfully added proxy, current available: {len(self.available_proxies)}/{self.target_count}")
else:
print(f"Proxy {proxy_url} failed test, trying next...")
if len(self.available_proxies) >= self.target_count:
break
time.sleep(0.5) # Small delay between requests
except Exception as e:
print(f"Error during proxy refill: {e}")
finally:
self.is_refilling = False
if len(self.available_proxies) >= self.target_count:
print(f"Proxy refill complete, current available: {len(self.available_proxies)}/{self.target_count}")
else:
print(f"Max refill attempts reached, current available: {len(self.available_proxies)}/{self.target_count}")
def get_proxy_from_provider(self):
try:
url = "https://proxy.doudouzi.me/random/us?host=us.proxy302.com"
print(f"Getting proxy from: {url}")
response = requests.get(url, timeout=10)
if response.status_code == 200:
proxy_url = response.text.strip()
print(f"Successfully got proxy: {proxy_url}")
return proxy_url
print(f"Failed to get proxy. Status: {response.status_code}, Response: {response.text}")
return None
except Exception as e:
print(f"Error getting proxy: {e}")
return None
def parse_proxy_url(self, proxy_url):
"""Parse proxy URL like http://username:password@host:port"""
try:
parsed = urlparse(proxy_url)
return {
'protocol': parsed.scheme,
'username': parsed.username,
'password': parsed.password,
'host': parsed.hostname,
'port': str(parsed.port),
'full': proxy_url
}
except Exception as e:
print(f"Error parsing proxy URL {proxy_url}: {e}")
return None
def is_proxy_duplicate(self, proxy_url):
"""Check if proxy already exists in the pool"""
# Note: This method assumes the caller already holds self.lock
return any(p['full'] == proxy_url for p in self.available_proxies)
def test_and_add_proxy(self, proxy_url):
if self.test_proxy(proxy_url):
with self.lock:
if not self.is_proxy_duplicate(proxy_url):
parsed = self.parse_proxy_url(proxy_url)
if parsed:
proxy_obj = {
'ip': parsed['host'],
'port': parsed['port'],
'protocol': parsed['protocol'],
'username': parsed['username'],
'password': parsed['password'],
'full': parsed['full'],
'added_at': datetime.now().isoformat()
}
self.available_proxies.append(proxy_obj)
print(f"Successfully added proxy: {parsed['host']}:{parsed['port']}")
return True
return False
def test_proxy(self, proxy_url):
try:
proxies = {'http': proxy_url, 'https': proxy_url}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(self.target_url, proxies=proxies, headers=headers, timeout=self.request_timeout, allow_redirects=True)
is_valid = response.status_code == 200
if is_valid:
print(f"Proxy {proxy_url} successfully tested, status: {response.status_code}")
else:
print(f"Proxy {proxy_url} failed test, status: {response.status_code}")
return is_valid
except Exception as e:
print(f"Proxy {proxy_url} request error: {e}")
return False
def get_proxy(self):
with self.lock:
if not self.available_proxies:
print("No available proxies")
return None
# Ensure current_index is within bounds
if self.current_index >= len(self.available_proxies):
self.current_index = 0
proxy = self.available_proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.available_proxies)
return proxy
def remove_proxy(self, ip, port):
with self.lock:
port_str = str(port)
initial_length = len(self.available_proxies)
self.available_proxies = [p for p in self.available_proxies if not (p['ip'] == ip and p['port'] == port_str)]
if self.current_index >= len(self.available_proxies) and self.available_proxies:
self.current_index = 0
removed = initial_length > len(self.available_proxies)
if removed:
print(f"Removed proxy {ip}:{port}, current available: {len(self.available_proxies)}")
else:
print(f"Could not find proxy to remove {ip}:{port}")
# Check if we need to refill asynchronously to avoid blocking the caller
if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting async refill")
# Start refill in a separate thread to avoid blocking
refill_thread = threading.Thread(target=self.refill_proxies)
refill_thread.daemon = True
refill_thread.start()
return removed
def get_all_proxies(self):
with self.lock:
return list(self.available_proxies)
def get_count(self):
with self.lock:
return len(self.available_proxies)
if __name__ == '__main__':
proxy_pool = ProxyPool({
'target_count': 10,
'min_threshold': 3,
'check_interval': 60,
'target_url': 'https://app.freeplay.ai/',
'concurrent_requests': 15,
'max_refill_attempts': 15,
'retry_delay': 1
})
proxy_pool.initialize()
time.sleep(5)
proxy = proxy_pool.get_proxy()
print(f"Got proxy: {proxy}")
if proxy:
time.sleep(5)
proxy_pool.remove_proxy(proxy['ip'], proxy['port'])
all_proxies = proxy_pool.get_all_proxies()
print(f"Current all proxies ({len(all_proxies)}): {all_proxies}")
time.sleep(5)
proxy_pool.stop()
print("Proxy pool example finished.") |