import socket import urllib.request import json import time import logging import threading # 导入线程库 class DnsManager: _instance = None _lock = threading.Lock() # 类级别的锁,用于保护单例创建 def __new__(cls, *args, **kwargs): # 双重检查锁定,确保单例创建时的线程安全 if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super(DnsManager, cls).__new__(cls) return cls._instance def __init__(self, cache_ttl=600): if hasattr(self, '_initialized'): return self.cache_ttl = cache_ttl self.cache = {} self.instance_lock = threading.Lock() # 实例锁:保护 cache 字典的读写 self.logger = logging.getLogger("DnsManager") self._initialized = True def _fetch_ip_from_http(self, hostname): """通过 HTTPS 请求获取 IP (DoH)""" try: url = f"https://dns.google/resolve?name={hostname}&type=A" req = urllib.request.Request(url, headers={"Accept": "application/dns-json"}) with urllib.request.urlopen(req, timeout=5) as response: data = json.loads(response.read().decode()) if "Answer" in data: for answer in data['Answer']: if answer['type'] == 1: return answer['data'] except Exception as e: self.logger.warning(f"DoH 解析失败 {hostname}: {e}") return None def get_ip(self, hostname): """线程安全的 IP 获取逻辑""" if hostname.replace('.', '').isdigit(): return hostname # 使用锁保护缓存读取和更新过程 with self.instance_lock: current_time = time.time() # 1. 检查缓存 if hostname in self.cache: if current_time < self.cache[hostname]['expiry']: return self.cache[hostname]['ip'] # 2. 缓存失效或不存在,进行解析 # 注意:在锁内部执行网络请求会阻塞其他线程,但在 0.1vCPU # 下这能防止多个线程同时发起重复的 DoH 请求,保护算力。 new_ip = self._fetch_ip_from_http(hostname) if new_ip: self.cache[hostname] = { "ip": new_ip, "expiry": current_time + self.cache_ttl } print(f"🔄 [DNS库] 线程 {threading.current_thread().name} 解析成功: {hostname} -> {new_ip}") return new_ip # 3. 失败保底 if hostname in self.cache: return self.cache[hostname]['ip'] return "162.159.137.232" def patch_socket(self): """注入全局补丁""" original_getaddrinfo = socket.getaddrinfo def patched_getaddrinfo(*args, **kwargs): host = args[0] port = args[1] # 排除 google 以防死循环 if host == "dns.google": return original_getaddrinfo(*args, **kwargs) try: # 这里调用 get_ip,内部已经有 instance_lock 保护 latest_ip = self.get_ip(host) return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (latest_ip, port))] except Exception: return original_getaddrinfo(*args, **kwargs) socket.getaddrinfo = patched_getaddrinfo print("💉 [DNS库] 安全补丁已加载(多线程就绪)") # 导出实例 dns_manager = DnsManager()