freeplay2api / proxy_pool.py
bibibi12345's picture
changed proxy pool
e2d980c
import requests
import threading
import time
import random
from datetime import datetime, timedelta
from urllib.parse import urlparse
class ProxyPool:
def __init__(self, options={}):
self.target_count = options.get('target_count', 10) # Reduced since we get one at a time
self.test_timeout = options.get('test_timeout', 5)
self.request_timeout = options.get('request_timeout', 5)
self.target_url = options.get('target_url', 'https://app.freeplay.ai/')
self.min_threshold = options.get('min_threshold', 3) # Reduced threshold
self.check_interval = options.get('check_interval', 30)
self.max_refill_attempts = options.get('max_refill_attempts', 10)
self.retry_delay = options.get('retry_delay', 1)
self.available_proxies = []
self.current_index = 0
self.is_initialized = False
self.is_refilling = False
self.check_timer = None
self.lock = threading.Lock()
def initialize(self):
if self.is_initialized:
return
print(f"Initializing proxy pool, target count: {self.target_count}")
self.refill_proxies()
self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
self.check_timer.start()
self.is_initialized = True
print(f"Proxy pool initialized, current available proxies: {len(self.available_proxies)}")
def stop(self):
if self.check_timer:
self.check_timer.cancel()
self.check_timer = None
print("Proxy pool service stopped")
def check_and_refill(self):
with self.lock:
if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting refill")
# Start refill in a separate thread to avoid blocking
refill_thread = threading.Thread(target=self.refill_proxies)
refill_thread.daemon = True
refill_thread.start()
if self.is_initialized:
self.check_timer = threading.Timer(self.check_interval, self.check_and_refill)
self.check_timer.start()
def refill_proxies(self):
if self.is_refilling:
return
self.is_refilling = True
print(f"Starting to refill proxies, current count: {len(self.available_proxies)}, target: {self.target_count}")
attempts = 0
try:
while len(self.available_proxies) < self.target_count and attempts < self.max_refill_attempts:
attempts += 1
print(f"Refill attempt #{attempts}, current available: {len(self.available_proxies)}/{self.target_count}")
proxy_url = self.get_proxy_from_provider()
if not proxy_url:
print(f"No proxy received, retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
continue
if self.is_proxy_duplicate(proxy_url):
print(f"Proxy {proxy_url} already exists, getting new one...")
continue
if self.test_and_add_proxy(proxy_url):
print(f"Successfully added proxy, current available: {len(self.available_proxies)}/{self.target_count}")
else:
print(f"Proxy {proxy_url} failed test, trying next...")
if len(self.available_proxies) >= self.target_count:
break
time.sleep(0.5) # Small delay between requests
except Exception as e:
print(f"Error during proxy refill: {e}")
finally:
self.is_refilling = False
if len(self.available_proxies) >= self.target_count:
print(f"Proxy refill complete, current available: {len(self.available_proxies)}/{self.target_count}")
else:
print(f"Max refill attempts reached, current available: {len(self.available_proxies)}/{self.target_count}")
def get_proxy_from_provider(self):
try:
url = "https://proxy.doudouzi.me/random/us?host=us.proxy302.com"
print(f"Getting proxy from: {url}")
response = requests.get(url, timeout=10)
if response.status_code == 200:
proxy_url = response.text.strip()
print(f"Successfully got proxy: {proxy_url}")
return proxy_url
print(f"Failed to get proxy. Status: {response.status_code}, Response: {response.text}")
return None
except Exception as e:
print(f"Error getting proxy: {e}")
return None
def parse_proxy_url(self, proxy_url):
"""Parse proxy URL like http://username:password@host:port"""
try:
parsed = urlparse(proxy_url)
return {
'protocol': parsed.scheme,
'username': parsed.username,
'password': parsed.password,
'host': parsed.hostname,
'port': str(parsed.port),
'full': proxy_url
}
except Exception as e:
print(f"Error parsing proxy URL {proxy_url}: {e}")
return None
def is_proxy_duplicate(self, proxy_url):
"""Check if proxy already exists in the pool"""
# Note: This method assumes the caller already holds self.lock
return any(p['full'] == proxy_url for p in self.available_proxies)
def test_and_add_proxy(self, proxy_url):
if self.test_proxy(proxy_url):
with self.lock:
if not self.is_proxy_duplicate(proxy_url):
parsed = self.parse_proxy_url(proxy_url)
if parsed:
proxy_obj = {
'ip': parsed['host'],
'port': parsed['port'],
'protocol': parsed['protocol'],
'username': parsed['username'],
'password': parsed['password'],
'full': parsed['full'],
'added_at': datetime.now().isoformat()
}
self.available_proxies.append(proxy_obj)
print(f"Successfully added proxy: {parsed['host']}:{parsed['port']}")
return True
return False
def test_proxy(self, proxy_url):
try:
proxies = {'http': proxy_url, 'https': proxy_url}
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(self.target_url, proxies=proxies, headers=headers, timeout=self.request_timeout, allow_redirects=True)
is_valid = response.status_code == 200
if is_valid:
print(f"Proxy {proxy_url} successfully tested, status: {response.status_code}")
else:
print(f"Proxy {proxy_url} failed test, status: {response.status_code}")
return is_valid
except Exception as e:
print(f"Proxy {proxy_url} request error: {e}")
return False
def get_proxy(self):
with self.lock:
if not self.available_proxies:
print("No available proxies")
return None
# Ensure current_index is within bounds
if self.current_index >= len(self.available_proxies):
self.current_index = 0
proxy = self.available_proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.available_proxies)
return proxy
def remove_proxy(self, ip, port):
with self.lock:
port_str = str(port)
initial_length = len(self.available_proxies)
self.available_proxies = [p for p in self.available_proxies if not (p['ip'] == ip and p['port'] == port_str)]
if self.current_index >= len(self.available_proxies) and self.available_proxies:
self.current_index = 0
removed = initial_length > len(self.available_proxies)
if removed:
print(f"Removed proxy {ip}:{port}, current available: {len(self.available_proxies)}")
else:
print(f"Could not find proxy to remove {ip}:{port}")
# Check if we need to refill asynchronously to avoid blocking the caller
if len(self.available_proxies) <= self.min_threshold and not self.is_refilling:
print(f"Available proxies ({len(self.available_proxies)}) below threshold ({self.min_threshold}), starting async refill")
# Start refill in a separate thread to avoid blocking
refill_thread = threading.Thread(target=self.refill_proxies)
refill_thread.daemon = True
refill_thread.start()
return removed
def get_all_proxies(self):
with self.lock:
return list(self.available_proxies)
def get_count(self):
with self.lock:
return len(self.available_proxies)
if __name__ == '__main__':
proxy_pool = ProxyPool({
'target_count': 10,
'min_threshold': 3,
'check_interval': 60,
'target_url': 'https://app.freeplay.ai/',
'concurrent_requests': 15,
'max_refill_attempts': 15,
'retry_delay': 1
})
proxy_pool.initialize()
time.sleep(5)
proxy = proxy_pool.get_proxy()
print(f"Got proxy: {proxy}")
if proxy:
time.sleep(5)
proxy_pool.remove_proxy(proxy['ip'], proxy['port'])
all_proxies = proxy_pool.get_all_proxies()
print(f"Current all proxies ({len(all_proxies)}): {all_proxies}")
time.sleep(5)
proxy_pool.stop()
print("Proxy pool example finished.")