File size: 3,718 Bytes
56995fc 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc bb61b8c 56995fc 9d59302 bb61b8c 9d59302 56995fc 9d59302 56995fc 9d59302 56995fc bb61b8c 9d59302 bb61b8c 9d59302 56995fc bb61b8c 56995fc 9d59302 56995fc 9d59302 56995fc | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 | import socket
import urllib.request
import json
import time
import logging
import threading # 导入线程库
class DnsManager:
_instance = None
_lock = threading.Lock() # 类级别的锁,用于保护单例创建
def __new__(cls, *args, **kwargs):
# 双重检查锁定,确保单例创建时的线程安全
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super(DnsManager, cls).__new__(cls)
return cls._instance
def __init__(self, cache_ttl=600):
if hasattr(self, '_initialized'):
return
self.cache_ttl = cache_ttl
self.cache = {}
self.instance_lock = threading.Lock() # 实例锁:保护 cache 字典的读写
self.logger = logging.getLogger("DnsManager")
self._initialized = True
def _fetch_ip_from_http(self, hostname):
"""通过 HTTPS 请求获取 IP (DoH)"""
try:
url = f"https://dns.google/resolve?name={hostname}&type=A"
req = urllib.request.Request(url, headers={"Accept": "application/dns-json"})
with urllib.request.urlopen(req, timeout=5) as response:
data = json.loads(response.read().decode())
if "Answer" in data:
for answer in data['Answer']:
if answer['type'] == 1:
return answer['data']
except Exception as e:
self.logger.warning(f"DoH 解析失败 {hostname}: {e}")
return None
def get_ip(self, hostname):
"""线程安全的 IP 获取逻辑"""
if hostname.replace('.', '').isdigit():
return hostname
# 使用锁保护缓存读取和更新过程
with self.instance_lock:
current_time = time.time()
# 1. 检查缓存
if hostname in self.cache:
if current_time < self.cache[hostname]['expiry']:
return self.cache[hostname]['ip']
# 2. 缓存失效或不存在,进行解析
# 注意:在锁内部执行网络请求会阻塞其他线程,但在 0.1vCPU
# 下这能防止多个线程同时发起重复的 DoH 请求,保护算力。
new_ip = self._fetch_ip_from_http(hostname)
if new_ip:
self.cache[hostname] = {
"ip": new_ip,
"expiry": current_time + self.cache_ttl
}
print(f"🔄 [DNS库] 线程 {threading.current_thread().name} 解析成功: {hostname} -> {new_ip}")
return new_ip
# 3. 失败保底
if hostname in self.cache:
return self.cache[hostname]['ip']
return "162.159.137.232"
def patch_socket(self):
"""注入全局补丁"""
original_getaddrinfo = socket.getaddrinfo
def patched_getaddrinfo(*args, **kwargs):
host = args[0]
port = args[1]
# 排除 google 以防死循环
if host == "dns.google":
return original_getaddrinfo(*args, **kwargs)
try:
# 这里调用 get_ip,内部已经有 instance_lock 保护
latest_ip = self.get_ip(host)
return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (latest_ip, port))]
except Exception:
return original_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = patched_getaddrinfo
print("💉 [DNS库] 安全补丁已加载(多线程就绪)")
# 导出实例
dns_manager = DnsManager()
|