webscraper / tools /proxy_manager.py
bluedragonDC
🚀 Deploy: Sniper MCP Forensic Scraper + Gradio
5ddaa4f
Raw
History Blame Contribute Delete
5.45 kB
import os
import random
import json
import time
from pathlib import Path
from typing import Dict, Optional, List
# Hardcoded Default Pool
DEFAULT_POOL = [
"http://Z9M25R:0QHP8X@206.168.90.60:50100",
"http://Z9M25R:0QHP8X@206.168.88.98:50100"
]
class ProxyManager:
"""
Manages proxy rotation and configuration for the V16 Pipeline.
Reads from version16/pipeline_settings.json for configuration.
"""
def __init__(self):
self.settings = self._load_settings()
self.proxy_config = self.settings.get("proxy", {})
self.enabled = self.proxy_config.get("use_proxy", True)
self.method = self.proxy_config.get("pool_method", "only_pool")
self.rotation = self.proxy_config.get("rotation_strategy", "sequential")
self.custom_proxies = self.proxy_config.get("custom_proxies", [])
self._pool = self._build_pool()
self._current_index = 0
# Check environment override (Highest Priority)
if os.environ.get("WCS_FORCE_PROXY"):
self._pool = [os.environ["WCS_FORCE_PROXY"]]
def _load_settings(self) -> dict:
"""Attempts to find and load pipeline_settings.json"""
try:
# 1. Look in version16 sibling directory (High Priority)
current = Path(__file__).parent
v16_settings = current.parent / "version16" / "pipeline_settings.json"
if v16_settings.exists():
with open(v16_settings, 'r', encoding='utf-8') as f:
return json.load(f)
# 2. Look in version15/14/13 (Legacy Fallback)
for v in ["version15", "version14", "version13"]:
settings = current.parent / v / "pipeline_settings.json"
if settings.exists():
with open(settings, 'r', encoding='utf-8') as f:
return json.load(f)
# 3. Fallback: check current directory
if Path("pipeline_settings.json").exists():
with open("pipeline_settings.json", 'r', encoding='utf-8') as f:
return json.load(f)
return {}
except Exception:
return {}
def _build_pool(self) -> List[str]:
"""Constructs the active proxy pool based on settings."""
if not self.enabled:
return []
pool = []
# 1. Add Default Pool
if self.method in ["only_pool", "pool_plus_mine"]:
pool.extend(DEFAULT_POOL)
# 2. Add Custom Proxies
if self.method in ["only_mine", "pool_plus_mine"]:
if self.custom_proxies:
pool.extend(self.custom_proxies)
# Safety fallback
if not pool and self.enabled:
# print("⚠️ Proxy checks enabled but pool is empty! Using default fallback.")
pool.extend(DEFAULT_POOL)
return pool
def get_proxy(self) -> Optional[str]:
"""Get the current or next proxy string."""
if not self.enabled or not self._pool:
return None
if self.rotation == "random":
return random.choice(self._pool)
else:
# Sequential
proxy = self._pool[self._current_index % len(self._pool)]
self._current_index += 1
return proxy
def get_playwright_proxy(self) -> Optional[Dict]:
"""Returns Playwright proxy dictionary or None."""
proxy_str = self.get_proxy()
if not proxy_str:
return None
parse_result = {}
if "@" in proxy_str:
# PROTOCOL://USER:PASS@IP:PORT
try:
protocol_auth, address = proxy_str.split("@")
if "://" in protocol_auth:
protocol, auth = protocol_auth.split("://")
else:
protocol = "http" # Default
auth = protocol_auth
username, password = auth.split(":")
parse_result = {
"server": f"{protocol}://{address}",
"username": username,
"password": password
}
except:
# Parse fail fallback
parse_result = {"server": proxy_str}
else:
parse_result = {"server": proxy_str}
return parse_result
@staticmethod
def inject_env():
"""
Injects proxy settings into WCS environment variables.
Called by Pipeline S0.
"""
manager = ProxyManager()
proxy = manager.get_proxy()
# V16: Track failure count if needed (future)
# For now, we just return the next one in the pool
if proxy:
safe_ip = proxy.split('@')[-1] if '@' in proxy else proxy
print(f"🔒 Proxy Manager: Active Proxy -> {safe_ip}")
print(f" [Method: {manager.method} | Rotation: {manager.rotation}]")
os.environ["WCS_TARGET_PROXY"] = proxy
return proxy
else:
print("🔓 Proxy Manager: DIRECT MODE (No Proxy)")
if "WCS_TARGET_PROXY" in os.environ:
del os.environ["WCS_TARGET_PROXY"]
return "DIRECT"