| """ |
| Free Proxy Fetcher - Automatically fetches and validates free proxies |
| """ |
| import aiohttp |
| import asyncio |
| import logging |
|
|
| logger = logging.getLogger("proxy_fetcher") |
|
|
| class FreeProxyFetcher: |
| def __init__(self): |
| self.proxies = [] |
| self.last_fetch = 0 |
| |
| async def fetch_free_proxies(self): |
| """Fetch free proxies from public APIs""" |
| proxy_sources = [ |
| "https://api.proxyscrape.com/v2/?request=get&protocol=http&timeout=10000&country=all&ssl=all&anonymity=all", |
| "https://www.proxy-list.download/api/v1/get?type=http", |
| ] |
| |
| all_proxies = [] |
| async with aiohttp.ClientSession() as session: |
| for source in proxy_sources: |
| try: |
| async with session.get(source, timeout=10) as resp: |
| if resp.status == 200: |
| text = await resp.text() |
| proxies = [f"http://{line.strip()}" for line in text.split('\n') if line.strip()] |
| all_proxies.extend(proxies[:20]) |
| logger.info(f"Fetched {len(proxies)} proxies from {source}") |
| except Exception as e: |
| logger.error(f"Failed to fetch from {source}: {e}") |
| |
| self.proxies = all_proxies |
| logger.info(f"Total free proxies loaded: {len(self.proxies)}") |
| return self.proxies |
| |
| async def validate_proxy(self, proxy, test_url="https://httpbin.org/ip"): |
| """Test if a proxy works""" |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(test_url, proxy=proxy, timeout=5) as resp: |
| if resp.status == 200: |
| return True |
| except: |
| pass |
| return False |
| |
| async def get_working_proxies(self, max_count=10): |
| """Get validated working proxies""" |
| if not self.proxies: |
| await self.fetch_free_proxies() |
| |
| working = [] |
| tasks = [self.validate_proxy(p) for p in self.proxies[:30]] |
| results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| for proxy, is_working in zip(self.proxies[:30], results): |
| if is_working and len(working) < max_count: |
| working.append(proxy) |
| |
| logger.info(f"Validated {len(working)} working proxies") |
| return working |
|
|
| proxy_fetcher = FreeProxyFetcher() |
|
|