Spaces:
Sleeping
Sleeping
| import requests | |
| import threading | |
| import time | |
| import random | |
| from datetime import datetime, timedelta | |
| from urllib.parse import urlparse | |
| class ProxyPool: | |
| def __init__(self, options={}): | |
| self.target_count = options.get('target_count', 10) # Reduced since we get one at a time | |
| self.test_timeout = options.get('test_timeout', 5) | |
| self.request_timeout = options.get('request_timeout', 5) | |
| self.target_url = options.get('target_url', 'https://app.freeplay.ai/') | |
| self.min_threshold = options.get('min_threshold', 3) # Reduced threshold | |
| self.check_interval = options.get('check_interval', 30) | |
| self.max_refill_attempts = options.get('max_refill_attempts', 10) | |
| self.retry_delay = options.get('retry_delay', 1) | |
| self.available_proxies = [] | |
| self.current_index = 0 | |
| self.is_initialized = False | |
| self.is_refilling = False | |
| self.check_timer = None | |
| self.lock = threading.Lock() | |
| def initialize(self): | |
| if self.is_initialized: | |
| return | |
| print(f"Initializing proxy pool, target count: {self.target_count}") | |
| self.refill_proxies() | |
| self.check_timer = threading.Timer(self.check_interval, self.check_and_refill) | |
| self.check_timer.start() | |
| self.is_initialized = True | |
| print(f"Proxy pool initialized, current available proxies: {len(self.available_proxies)}") | |
| def stop(self): | |
| if self.check_timer: | |
| self.check_timer.cancel() | |
| self.check_timer = None | |
| print("Proxy pool service stopped") | |
| def check_and_refill(self): | |
| with self.lock: | |
| if len(self.available_proxies) <= self.min_threshold and not self.is_refilling: | |
| print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting refill") | |
| # Start refill in a separate thread to avoid blocking | |
| refill_thread = threading.Thread(target=self.refill_proxies) | |
| refill_thread.daemon = True | |
| refill_thread.start() | |
| if self.is_initialized: | |
| self.check_timer = threading.Timer(self.check_interval, self.check_and_refill) | |
| self.check_timer.start() | |
| def refill_proxies(self): | |
| if self.is_refilling: | |
| return | |
| self.is_refilling = True | |
| print(f"Starting to refill proxies, current count: {len(self.available_proxies)}, target: {self.target_count}") | |
| attempts = 0 | |
| try: | |
| while len(self.available_proxies) < self.target_count and attempts < self.max_refill_attempts: | |
| attempts += 1 | |
| print(f"Refill attempt #{attempts}, current available: {len(self.available_proxies)}/{self.target_count}") | |
| proxy_url = self.get_proxy_from_provider() | |
| if not proxy_url: | |
| print(f"No proxy received, retrying in {self.retry_delay} seconds...") | |
| time.sleep(self.retry_delay) | |
| continue | |
| if self.is_proxy_duplicate(proxy_url): | |
| print(f"Proxy {proxy_url} already exists, getting new one...") | |
| continue | |
| if self.test_and_add_proxy(proxy_url): | |
| print(f"Successfully added proxy, current available: {len(self.available_proxies)}/{self.target_count}") | |
| else: | |
| print(f"Proxy {proxy_url} failed test, trying next...") | |
| if len(self.available_proxies) >= self.target_count: | |
| break | |
| time.sleep(0.5) # Small delay between requests | |
| except Exception as e: | |
| print(f"Error during proxy refill: {e}") | |
| finally: | |
| self.is_refilling = False | |
| if len(self.available_proxies) >= self.target_count: | |
| print(f"Proxy refill complete, current available: {len(self.available_proxies)}/{self.target_count}") | |
| else: | |
| print(f"Max refill attempts reached, current available: {len(self.available_proxies)}/{self.target_count}") | |
| def get_proxy_from_provider(self): | |
| try: | |
| url = "https://proxy.doudouzi.me/random/us?host=us.proxy302.com" | |
| print(f"Getting proxy from: {url}") | |
| response = requests.get(url, timeout=10) | |
| if response.status_code == 200: | |
| proxy_url = response.text.strip() | |
| print(f"Successfully got proxy: {proxy_url}") | |
| return proxy_url | |
| print(f"Failed to get proxy. Status: {response.status_code}, Response: {response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"Error getting proxy: {e}") | |
| return None | |
| def parse_proxy_url(self, proxy_url): | |
| """Parse proxy URL like http://username:password@host:port""" | |
| try: | |
| parsed = urlparse(proxy_url) | |
| return { | |
| 'protocol': parsed.scheme, | |
| 'username': parsed.username, | |
| 'password': parsed.password, | |
| 'host': parsed.hostname, | |
| 'port': str(parsed.port), | |
| 'full': proxy_url | |
| } | |
| except Exception as e: | |
| print(f"Error parsing proxy URL {proxy_url}: {e}") | |
| return None | |
| def is_proxy_duplicate(self, proxy_url): | |
| """Check if proxy already exists in the pool""" | |
| # Note: This method assumes the caller already holds self.lock | |
| return any(p['full'] == proxy_url for p in self.available_proxies) | |
| def test_and_add_proxy(self, proxy_url): | |
| if self.test_proxy(proxy_url): | |
| with self.lock: | |
| if not self.is_proxy_duplicate(proxy_url): | |
| parsed = self.parse_proxy_url(proxy_url) | |
| if parsed: | |
| proxy_obj = { | |
| 'ip': parsed['host'], | |
| 'port': parsed['port'], | |
| 'protocol': parsed['protocol'], | |
| 'username': parsed['username'], | |
| 'password': parsed['password'], | |
| 'full': parsed['full'], | |
| 'added_at': datetime.now().isoformat() | |
| } | |
| self.available_proxies.append(proxy_obj) | |
| print(f"Successfully added proxy: {parsed['host']}:{parsed['port']}") | |
| return True | |
| return False | |
| def test_proxy(self, proxy_url): | |
| try: | |
| proxies = {'http': proxy_url, 'https': proxy_url} | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| response = requests.get(self.target_url, proxies=proxies, headers=headers, timeout=self.request_timeout, allow_redirects=True) | |
| is_valid = response.status_code == 200 | |
| if is_valid: | |
| print(f"Proxy {proxy_url} successfully tested, status: {response.status_code}") | |
| else: | |
| print(f"Proxy {proxy_url} failed test, status: {response.status_code}") | |
| return is_valid | |
| except Exception as e: | |
| print(f"Proxy {proxy_url} request error: {e}") | |
| return False | |
| def get_proxy(self): | |
| with self.lock: | |
| if not self.available_proxies: | |
| print("No available proxies") | |
| return None | |
| # Ensure current_index is within bounds | |
| if self.current_index >= len(self.available_proxies): | |
| self.current_index = 0 | |
| proxy = self.available_proxies[self.current_index] | |
| self.current_index = (self.current_index + 1) % len(self.available_proxies) | |
| return proxy | |
| def remove_proxy(self, ip, port): | |
| with self.lock: | |
| port_str = str(port) | |
| initial_length = len(self.available_proxies) | |
| self.available_proxies = [p for p in self.available_proxies if not (p['ip'] == ip and p['port'] == port_str)] | |
| if self.current_index >= len(self.available_proxies) and self.available_proxies: | |
| self.current_index = 0 | |
| removed = initial_length > len(self.available_proxies) | |
| if removed: | |
| print(f"Removed proxy {ip}:{port}, current available: {len(self.available_proxies)}") | |
| else: | |
| print(f"Could not find proxy to remove {ip}:{port}") | |
| # Check if we need to refill asynchronously to avoid blocking the caller | |
| if len(self.available_proxies) <= self.min_threshold and not self.is_refilling: | |
| print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting async refill") | |
| # Start refill in a separate thread to avoid blocking | |
| refill_thread = threading.Thread(target=self.refill_proxies) | |
| refill_thread.daemon = True | |
| refill_thread.start() | |
| return removed | |
| def get_all_proxies(self): | |
| with self.lock: | |
| return list(self.available_proxies) | |
| def get_count(self): | |
| with self.lock: | |
| return len(self.available_proxies) | |
| if __name__ == '__main__': | |
| proxy_pool = ProxyPool({ | |
| 'target_count': 10, | |
| 'min_threshold': 3, | |
| 'check_interval': 60, | |
| 'target_url': 'https://app.freeplay.ai/', | |
| 'concurrent_requests': 15, | |
| 'max_refill_attempts': 15, | |
| 'retry_delay': 1 | |
| }) | |
| proxy_pool.initialize() | |
| time.sleep(5) | |
| proxy = proxy_pool.get_proxy() | |
| print(f"Got proxy: {proxy}") | |
| if proxy: | |
| time.sleep(5) | |
| proxy_pool.remove_proxy(proxy['ip'], proxy['port']) | |
| all_proxies = proxy_pool.get_all_proxies() | |
| print(f"Current all proxies ({len(all_proxies)}): {all_proxies}") | |
| time.sleep(5) | |
| proxy_pool.stop() | |
| print("Proxy pool example finished.") |