| import socket |
| import urllib.request |
| import json |
| import time |
| import logging |
| import threading |
|
|
| class DnsManager: |
| _instance = None |
| _lock = threading.Lock() |
|
|
| def __new__(cls, *args, **kwargs): |
| |
| if not cls._instance: |
| with cls._lock: |
| if not cls._instance: |
| cls._instance = super(DnsManager, cls).__new__(cls) |
| return cls._instance |
|
|
| def __init__(self, cache_ttl=600): |
| if hasattr(self, '_initialized'): |
| return |
| self.cache_ttl = cache_ttl |
| self.cache = {} |
| self.instance_lock = threading.Lock() |
| self.logger = logging.getLogger("DnsManager") |
| self._initialized = True |
|
|
| def _fetch_ip_from_http(self, hostname): |
| """通过 HTTPS 请求获取 IP (DoH)""" |
| try: |
| url = f"https://dns.google/resolve?name={hostname}&type=A" |
| req = urllib.request.Request(url, headers={"Accept": "application/dns-json"}) |
| with urllib.request.urlopen(req, timeout=5) as response: |
| data = json.loads(response.read().decode()) |
| if "Answer" in data: |
| for answer in data['Answer']: |
| if answer['type'] == 1: |
| return answer['data'] |
| except Exception as e: |
| self.logger.warning(f"DoH 解析失败 {hostname}: {e}") |
| return None |
|
|
| def get_ip(self, hostname): |
| """线程安全的 IP 获取逻辑""" |
| if hostname.replace('.', '').isdigit(): |
| return hostname |
|
|
| |
| with self.instance_lock: |
| current_time = time.time() |
| |
| |
| if hostname in self.cache: |
| if current_time < self.cache[hostname]['expiry']: |
| return self.cache[hostname]['ip'] |
|
|
| |
| |
| |
| new_ip = self._fetch_ip_from_http(hostname) |
| |
| if new_ip: |
| self.cache[hostname] = { |
| "ip": new_ip, |
| "expiry": current_time + self.cache_ttl |
| } |
| print(f"🔄 [DNS库] 线程 {threading.current_thread().name} 解析成功: {hostname} -> {new_ip}") |
| return new_ip |
| |
| |
| if hostname in self.cache: |
| return self.cache[hostname]['ip'] |
| return "162.159.137.232" |
|
|
| def patch_socket(self): |
| """注入全局补丁""" |
| original_getaddrinfo = socket.getaddrinfo |
|
|
| def patched_getaddrinfo(*args, **kwargs): |
| host = args[0] |
| port = args[1] |
|
|
| |
| if host == "dns.google": |
| return original_getaddrinfo(*args, **kwargs) |
| |
| try: |
| |
| latest_ip = self.get_ip(host) |
| return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (latest_ip, port))] |
| except Exception: |
| return original_getaddrinfo(*args, **kwargs) |
|
|
| socket.getaddrinfo = patched_getaddrinfo |
| print("💉 [DNS库] 安全补丁已加载(多线程就绪)") |
|
|
| |
| dns_manager = DnsManager() |
|
|