import random import threading import time from collections import defaultdict class ProxyEntry: __slots__ = ( 'proxy', 'protocol', 'proxy_url', 'latency', 'proxy_ip', 'verified', 'fail_count', 'total_used', ) def __init__(self, proxy, protocol, proxy_url, latency, proxy_ip, verified): self.proxy = proxy self.protocol = protocol self.proxy_url = proxy_url self.latency = latency self.proxy_ip = proxy_ip self.verified = verified self.fail_count = 0 self.total_used = 0 def to_dict(self): return { "proxy": self.proxy, "protocol": self.protocol, "proxy_url": self.proxy_url, "latency": self.latency, "proxy_ip": self.proxy_ip, "verified": self.verified, "total_used": self.total_used, } class ProxyPool: def __init__(self): self._px = [] self._um = {} self._lk = threading.Lock() self._rr = defaultdict(int) self._lr = 0.0 self._rc = 0 def refresh(self, new_proxies): with self._lk: old = dict(self._um) fresh, um = [], {} for px in new_proxies: e = ProxyEntry( px["proxy"], px["protocol"], px["proxy_url"], px["latency"], px.get("proxy_ip", ""), px.get("verified", False), ) o = old.get(e.proxy_url) if o: e.total_used = o.total_used e.fail_count = max(0, o.fail_count - 1) fresh.append(e) um[e.proxy_url] = e fresh.sort(key=lambda x: x.latency) self._px = fresh self._um = um self._lr = time.time() self._rc += 1 @property def size(self): return len(self._px) def _alive(self, protocol=None, verified=False): r = [p for p in self._px if p.fail_count < 3] if protocol: r = [p for p in r if p.protocol == protocol] if verified: r = [p for p in r if p.verified] return r def get_round_robin(self, protocol=None, verified=False): with self._lk: a = self._alive(protocol, verified) if not a: return None k = f"{protocol}_{verified}" i = self._rr[k] % len(a) self._rr[k] = i + 1 a[i].total_used += 1 return a[i] def get_random(self, protocol=None, verified=False): with self._lk: a = self._alive(protocol, verified) if not a: return None p = random.choice(a) p.total_used += 1 return p def get_fastest(self, protocol=None, verified=False): with self._lk: a = self._alive(protocol, verified) if not a: return None a[0].total_used += 1 return a[0] def get_least_used(self, protocol=None, verified=False): with self._lk: a = self._alive(protocol, verified) if not a: return None p = min(a, key=lambda x: x.total_used) p.total_used += 1 return p def report_failure(self, url): with self._lk: p = self._um.get(url) if p: p.fail_count += 1 def report_success(self, url): with self._lk: p = self._um.get(url) if p: p.fail_count = 0 def get_all(self, protocol=None, verified=False, limit=500): with self._lk: return [ p.to_dict() for p in self._alive(protocol, verified)[:limit] ] def get_stats(self): with self._lk: bp = defaultdict(lambda: {"total": 0, "alive": 0}) for p in self._px: bp[p.protocol]["total"] += 1 if p.fail_count < 3: bp[p.protocol]["alive"] += 1 alive = sum(d["alive"] for d in bp.values()) return { "total": len(self._px), "alive": alive, "by_protocol": dict(bp), "refresh_count": self._rc, "last_refresh_ago": round( time.time() - self._lr, 1 ) if self._lr else None, } def export_json(self): with self._lk: return [p.to_dict() for p in self._px] def import_json(self, data): proxies = [] for p in data: proxies.append({ "proxy": p["proxy"], "protocol": p["protocol"], "proxy_url": p["proxy_url"], "latency": p["latency"], "proxy_ip": p.get("proxy_ip", ""), "verified": p.get("verified", False), }) if proxies: self.refresh(proxies)