TonyD365 commited on
Commit
56995fc
·
verified ·
1 Parent(s): de4ad5e

Create dns_resolver.py

Browse files
Files changed (1) hide show
  1. dns_resolver.py +95 -0
dns_resolver.py ADDED
@@ -0,0 +1,95 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import socket
2
+ import urllib.request
3
+ import json
4
+ import time
5
+ import logging
6
+
7
+ class DnsManager:
8
+ _instance = None
9
+
10
+ def __new__(cls, *args, **kwargs):
11
+ if not cls._instance:
12
+ cls._instance = super(DnsManager, cls).__new__(cls)
13
+ return cls._instance
14
+
15
+ def __init__(self, cache_ttl=600):
16
+ # 避免单例模式下重复初始化
17
+ if hasattr(self, '_initialized'):
18
+ return
19
+ self.cache_ttl = cache_ttl
20
+ self.cache = {} # 格式: {"hostname": {"ip": "xxx", "expiry": 0}}
21
+ # 覆盖 Discord 机器人运行所需的所有关键域名
22
+ self.target_hosts = [
23
+ 'discord.com',
24
+ 'gateway.discord.gg',
25
+ 'cdn.discordapp.com',
26
+ 'discordapp.com'
27
+ ]
28
+ self.logger = logging.getLogger("DnsManager")
29
+ self._initialized = True
30
+
31
+ def _fetch_ip_from_http(self, hostname):
32
+ """通过 HTTPS (TCP 443) 绕过系统 UDP DNS 限制获取 IP"""
33
+ try:
34
+ # 使用 Google DNS API
35
+ url = f"https://dns.google/resolve?name={hostname}&type=A"
36
+ req = urllib.request.Request(url, headers={"Accept": "application/dns-json"})
37
+ with urllib.request.urlopen(req, timeout=5) as response:
38
+ data = json.loads(response.read().decode())
39
+ if "Answer" in data:
40
+ # 找到第一个 A 记录 (IPv4)
41
+ for answer in data['Answer']:
42
+ if answer['type'] == 1: # A record
43
+ return answer['data']
44
+ except Exception as e:
45
+ self.logger.warning(f"无法通过 HTTP 解析 {hostname}: {e}")
46
+ return None
47
+
48
+ def get_ip(self, hostname):
49
+ """获取 IP,优先使用缓存,过期则自动更新"""
50
+ current_time = time.time()
51
+
52
+ if hostname in self.cache:
53
+ if current_time < self.cache[hostname]['expiry']:
54
+ return self.cache[hostname]['ip']
55
+
56
+ # 尝试更新 IP
57
+ new_ip = self._fetch_ip_from_http(hostname)
58
+
59
+ if new_ip:
60
+ self.cache[hostname] = {
61
+ "ip": new_ip,
62
+ "expiry": current_time + self.cache_ttl
63
+ }
64
+ print(f"🔄 [DNS库] 动态解析成功: {hostname} -> {new_ip}")
65
+ return new_ip
66
+
67
+ # 如果获取失败,且缓存里有旧的,则延长旧缓存寿命并继续使用
68
+ if hostname in self.cache:
69
+ self.logger.warning(f"解析失败,正在使用 {hostname} 的过期缓存")
70
+ return self.cache[hostname]['ip']
71
+
72
+ # 最后的保底 hardcode IP (Discord 常用 IP)
73
+ return "162.159.137.232"
74
+
75
+ def patch_socket(self):
76
+ """核心方法:将补丁注入到 Python 全局 socket 层"""
77
+ original_getaddrinfo = socket.getaddrinfo
78
+
79
+ def patched_getaddrinfo(*args, **kwargs):
80
+ host = args[0]
81
+ # 只有在目标域名列表里且不是正在请求 DNS 接口本身时才拦截
82
+ if host in self.target_hosts:
83
+ latest_ip = self.get_ip(host)
84
+ port = args[1]
85
+ # 强制返回 IPv4 地址族 (AF_INET),避免 IPv6 解析错误
86
+ return [(socket.AF_INET, socket.SOCK_STREAM, 6, '', (latest_ip, port))]
87
+
88
+ # 其余请求(如访问 MongoDB 或 Google DNS 接口)走原始解析
89
+ return original_getaddrinfo(*args, **kwargs)
90
+
91
+ socket.getaddrinfo = patched_getaddrinfo
92
+ print("💉 [DNS库] 全局 Socket 补丁已加载,已绕过系统 DNS")
93
+
94
+ # 导出实例
95
+ dns_manager = DnsManager()