import requests import threading import time import random from datetime import datetime, timedelta from urllib.parse import urlparse class ProxyPool: def __init__(self, options={}): self.target_count = options.get('target_count', 10) # Reduced since we get one at a time self.test_timeout = options.get('test_timeout', 5) self.request_timeout = options.get('request_timeout', 5) self.target_url = options.get('target_url', 'https://app.freeplay.ai/') self.min_threshold = options.get('min_threshold', 3) # Reduced threshold self.check_interval = options.get('check_interval', 30) self.max_refill_attempts = options.get('max_refill_attempts', 10) self.retry_delay = options.get('retry_delay', 1) self.available_proxies = [] self.current_index = 0 self.is_initialized = False self.is_refilling = False self.check_timer = None self.lock = threading.Lock() def initialize(self): if self.is_initialized: return print(f"Initializing proxy pool, target count: {self.target_count}") self.refill_proxies() self.check_timer = threading.Timer(self.check_interval, self.check_and_refill) self.check_timer.start() self.is_initialized = True print(f"Proxy pool initialized, current available proxies: {len(self.available_proxies)}") def stop(self): if self.check_timer: self.check_timer.cancel() self.check_timer = None print("Proxy pool service stopped") def check_and_refill(self): with self.lock: if len(self.available_proxies) <= self.min_threshold and not self.is_refilling: print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting refill") # Start refill in a separate thread to avoid blocking refill_thread = threading.Thread(target=self.refill_proxies) refill_thread.daemon = True refill_thread.start() if self.is_initialized: self.check_timer = threading.Timer(self.check_interval, self.check_and_refill) self.check_timer.start() def refill_proxies(self): if self.is_refilling: return self.is_refilling = True print(f"Starting to refill proxies, current count: {len(self.available_proxies)}, target: {self.target_count}") attempts = 0 try: while len(self.available_proxies) < self.target_count and attempts < self.max_refill_attempts: attempts += 1 print(f"Refill attempt #{attempts}, current available: {len(self.available_proxies)}/{self.target_count}") proxy_url = self.get_proxy_from_provider() if not proxy_url: print(f"No proxy received, retrying in {self.retry_delay} seconds...") time.sleep(self.retry_delay) continue if self.is_proxy_duplicate(proxy_url): print(f"Proxy {proxy_url} already exists, getting new one...") continue if self.test_and_add_proxy(proxy_url): print(f"Successfully added proxy, current available: {len(self.available_proxies)}/{self.target_count}") else: print(f"Proxy {proxy_url} failed test, trying next...") if len(self.available_proxies) >= self.target_count: break time.sleep(0.5) # Small delay between requests except Exception as e: print(f"Error during proxy refill: {e}") finally: self.is_refilling = False if len(self.available_proxies) >= self.target_count: print(f"Proxy refill complete, current available: {len(self.available_proxies)}/{self.target_count}") else: print(f"Max refill attempts reached, current available: {len(self.available_proxies)}/{self.target_count}") def get_proxy_from_provider(self): try: url = "https://proxy.doudouzi.me/random/us?host=us.proxy302.com" print(f"Getting proxy from: {url}") response = requests.get(url, timeout=10) if response.status_code == 200: proxy_url = response.text.strip() print(f"Successfully got proxy: {proxy_url}") return proxy_url print(f"Failed to get proxy. Status: {response.status_code}, Response: {response.text}") return None except Exception as e: print(f"Error getting proxy: {e}") return None def parse_proxy_url(self, proxy_url): """Parse proxy URL like http://username:password@host:port""" try: parsed = urlparse(proxy_url) return { 'protocol': parsed.scheme, 'username': parsed.username, 'password': parsed.password, 'host': parsed.hostname, 'port': str(parsed.port), 'full': proxy_url } except Exception as e: print(f"Error parsing proxy URL {proxy_url}: {e}") return None def is_proxy_duplicate(self, proxy_url): """Check if proxy already exists in the pool""" # Note: This method assumes the caller already holds self.lock return any(p['full'] == proxy_url for p in self.available_proxies) def test_and_add_proxy(self, proxy_url): if self.test_proxy(proxy_url): with self.lock: if not self.is_proxy_duplicate(proxy_url): parsed = self.parse_proxy_url(proxy_url) if parsed: proxy_obj = { 'ip': parsed['host'], 'port': parsed['port'], 'protocol': parsed['protocol'], 'username': parsed['username'], 'password': parsed['password'], 'full': parsed['full'], 'added_at': datetime.now().isoformat() } self.available_proxies.append(proxy_obj) print(f"Successfully added proxy: {parsed['host']}:{parsed['port']}") return True return False def test_proxy(self, proxy_url): try: proxies = {'http': proxy_url, 'https': proxy_url} headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(self.target_url, proxies=proxies, headers=headers, timeout=self.request_timeout, allow_redirects=True) is_valid = response.status_code == 200 if is_valid: print(f"Proxy {proxy_url} successfully tested, status: {response.status_code}") else: print(f"Proxy {proxy_url} failed test, status: {response.status_code}") return is_valid except Exception as e: print(f"Proxy {proxy_url} request error: {e}") return False def get_proxy(self): with self.lock: if not self.available_proxies: print("No available proxies") return None # Ensure current_index is within bounds if self.current_index >= len(self.available_proxies): self.current_index = 0 proxy = self.available_proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.available_proxies) return proxy def remove_proxy(self, ip, port): with self.lock: port_str = str(port) initial_length = len(self.available_proxies) self.available_proxies = [p for p in self.available_proxies if not (p['ip'] == ip and p['port'] == port_str)] if self.current_index >= len(self.available_proxies) and self.available_proxies: self.current_index = 0 removed = initial_length > len(self.available_proxies) if removed: print(f"Removed proxy {ip}:{port}, current available: {len(self.available_proxies)}") else: print(f"Could not find proxy to remove {ip}:{port}") # Check if we need to refill asynchronously to avoid blocking the caller if len(self.available_proxies) <= self.min_threshold and not self.is_refilling: print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting async refill") # Start refill in a separate thread to avoid blocking refill_thread = threading.Thread(target=self.refill_proxies) refill_thread.daemon = True refill_thread.start() return removed def get_all_proxies(self): with self.lock: return list(self.available_proxies) def get_count(self): with self.lock: return len(self.available_proxies) if __name__ == '__main__': proxy_pool = ProxyPool({ 'target_count': 10, 'min_threshold': 3, 'check_interval': 60, 'target_url': 'https://app.freeplay.ai/', 'concurrent_requests': 15, 'max_refill_attempts': 15, 'retry_delay': 1 }) proxy_pool.initialize() time.sleep(5) proxy = proxy_pool.get_proxy() print(f"Got proxy: {proxy}") if proxy: time.sleep(5) proxy_pool.remove_proxy(proxy['ip'], proxy['port']) all_proxies = proxy_pool.get_all_proxies() print(f"Current all proxies ({len(all_proxies)}): {all_proxies}") time.sleep(5) proxy_pool.stop() print("Proxy pool example finished.")