paijo77 commited on
Commit
15c4063
·
verified ·
1 Parent(s): e0cb987

update app/validator.py

Browse files
Files changed (1) hide show
  1. app/validator.py +273 -0
app/validator.py ADDED
@@ -0,0 +1,273 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import aiohttp
2
+ import asyncio
3
+ import time
4
+ import re
5
+ from typing import Optional, Dict, List
6
+ from pydantic import BaseModel
7
+
8
+ # Improved regex that matches IP:PORT format (doesn't validate ranges yet)
9
+ IP_REGEX = re.compile(r"(\d{1,3}\.){3}\d{1,3}:\d{1,5}")
10
+
11
+
12
+ def is_valid_ip(ip: str) -> bool:
13
+ """Validate IP address octets are in range 0-255"""
14
+ try:
15
+ parts = ip.split(".")
16
+ if len(parts) != 4:
17
+ return False
18
+ return all(0 <= int(part) <= 255 for part in parts)
19
+ except (ValueError, AttributeError):
20
+ return False
21
+
22
+
23
+ def is_valid_port(port: int) -> bool:
24
+ """Validate port is in range 1-65535"""
25
+ return 1 <= port <= 65535
26
+
27
+
28
+ class ValidationResult(BaseModel):
29
+ success: bool
30
+ latency_ms: Optional[int] = None
31
+ anonymity: Optional[str] = None
32
+ can_access_google: Optional[bool] = None
33
+ country_code: Optional[str] = None
34
+ country_name: Optional[str] = None
35
+ proxy_type: Optional[str] = None
36
+ quality_score: Optional[int] = None
37
+ error_message: Optional[str] = None
38
+
39
+
40
+ class ProxyValidator:
41
+ def __init__(self, timeout: int = 10, max_concurrent: int = 20):
42
+ self.timeout = aiohttp.ClientTimeout(total=timeout)
43
+ self.semaphore = asyncio.Semaphore(max_concurrent)
44
+
45
+ async def validate_format(self, proxy: str) -> bool:
46
+ if proxy.startswith(("http://", "https://", "socks4://", "socks5://")):
47
+ proxy = proxy.split("://", 1)[1]
48
+
49
+ if not IP_REGEX.match(proxy):
50
+ return False
51
+
52
+ try:
53
+ ip_port = proxy.split(":")
54
+ if len(ip_port) != 2:
55
+ return False
56
+
57
+ ip, port_str = ip_port
58
+ port = int(port_str)
59
+
60
+ return is_valid_ip(ip) and is_valid_port(port)
61
+ except (ValueError, IndexError):
62
+ return False
63
+
64
+ async def validate_connectivity(
65
+ self, proxy_url: str
66
+ ) -> tuple[bool, Optional[int], Optional[str]]:
67
+ async with self.semaphore:
68
+ try:
69
+ start_time = time.time()
70
+
71
+ async with aiohttp.ClientSession(timeout=self.timeout) as session:
72
+ async with session.get(
73
+ "http://httpbin.org/ip", proxy=proxy_url, ssl=False
74
+ ) as resp:
75
+ latency_ms = int((time.time() - start_time) * 1000)
76
+
77
+ if resp.status == 200:
78
+ return True, latency_ms, None
79
+ else:
80
+ return False, None, f"HTTP {resp.status}"
81
+
82
+ except aiohttp.ClientProxyConnectionError:
83
+ return False, None, "Proxy connection failed"
84
+ except asyncio.TimeoutError:
85
+ return False, None, "Connection timeout"
86
+ except Exception as e:
87
+ return False, None, str(e)[:100]
88
+
89
+ async def check_anonymity(self, proxy_url: str) -> Optional[str]:
90
+ try:
91
+ async with aiohttp.ClientSession(timeout=self.timeout) as session:
92
+ async with session.get(
93
+ "http://httpbin.org/headers", proxy=proxy_url, ssl=False
94
+ ) as resp:
95
+ if resp.status != 200:
96
+ return None
97
+
98
+ data = await resp.json()
99
+ headers = data.get("headers", {})
100
+
101
+ if "X-Forwarded-For" in headers or "Via" in headers:
102
+ return "transparent"
103
+ elif "Proxy-Connection" in headers or "X-Real-Ip" in headers:
104
+ return "anonymous"
105
+ else:
106
+ return "elite"
107
+
108
+ except Exception:
109
+ return None
110
+
111
+ async def test_google_access(self, proxy_url: str) -> bool:
112
+ try:
113
+ async with aiohttp.ClientSession(timeout=self.timeout) as session:
114
+ async with session.get(
115
+ "https://www.google.com", proxy=proxy_url, ssl=False
116
+ ) as resp:
117
+ return resp.status == 200
118
+ except Exception:
119
+ return False
120
+
121
+ async def get_geo_info(self, ip: str) -> Dict[str, Optional[str]]:
122
+ try:
123
+ async with aiohttp.ClientSession(timeout=self.timeout) as session:
124
+ async with session.get(f"https://ipapi.co/{ip}/json/") as resp:
125
+ if resp.status == 200:
126
+ data = await resp.json()
127
+ return {
128
+ "country_code": data.get("country_code"),
129
+ "country_name": data.get("country_name"),
130
+ "state": data.get("region"),
131
+ "city": data.get("city"),
132
+ }
133
+ except Exception:
134
+ pass
135
+
136
+ return {"country_code": None, "country_name": None, "state": None, "city": None}
137
+
138
+ async def detect_proxy_type(self, ip: str) -> str:
139
+ try:
140
+ async with aiohttp.ClientSession(timeout=self.timeout) as session:
141
+ async with session.get(f"https://ipinfo.io/{ip}/json") as resp:
142
+ if resp.status == 200:
143
+ data = await resp.json()
144
+ org = data.get("org", "").lower()
145
+
146
+ datacenter_keywords = [
147
+ "amazon",
148
+ "aws",
149
+ "google",
150
+ "microsoft",
151
+ "azure",
152
+ "digitalocean",
153
+ "linode",
154
+ "ovh",
155
+ "hetzner",
156
+ "hosting",
157
+ "datacenter",
158
+ "data center",
159
+ "cloud",
160
+ ]
161
+
162
+ for keyword in datacenter_keywords:
163
+ if keyword in org:
164
+ return "datacenter"
165
+
166
+ return "residential"
167
+ except Exception:
168
+ pass
169
+
170
+ return "unknown"
171
+
172
+ async def calculate_quality_score(
173
+ self,
174
+ latency_ms: Optional[int],
175
+ anonymity: Optional[str],
176
+ can_access_google: Optional[bool],
177
+ proxy_type: Optional[str],
178
+ ) -> int:
179
+ score = 0
180
+
181
+ if latency_ms is not None:
182
+ if latency_ms < 200:
183
+ score += 40
184
+ elif latency_ms < 500:
185
+ score += 30
186
+ elif latency_ms < 1000:
187
+ score += 20
188
+ elif latency_ms < 2000:
189
+ score += 10
190
+
191
+ if anonymity == "elite":
192
+ score += 30
193
+ elif anonymity == "anonymous":
194
+ score += 20
195
+ elif anonymity == "transparent":
196
+ score += 5
197
+
198
+ if can_access_google:
199
+ score += 15
200
+
201
+ if proxy_type == "residential":
202
+ score += 15
203
+ elif proxy_type == "datacenter":
204
+ score += 5
205
+
206
+ return min(score, 100)
207
+
208
+ async def validate_comprehensive(self, proxy_url: str, ip: str) -> ValidationResult:
209
+ is_valid, latency_ms, error = await self.validate_connectivity(proxy_url)
210
+
211
+ if not is_valid:
212
+ return ValidationResult(success=False, error_message=error)
213
+
214
+ anonymity, can_access_google, geo_info, proxy_type = await asyncio.gather(
215
+ self.check_anonymity(proxy_url),
216
+ self.test_google_access(proxy_url),
217
+ self.get_geo_info(ip),
218
+ self.detect_proxy_type(ip),
219
+ return_exceptions=True,
220
+ )
221
+
222
+ if isinstance(anonymity, Exception):
223
+ anonymity = None
224
+ if isinstance(can_access_google, Exception):
225
+ can_access_google = None
226
+ if isinstance(geo_info, Exception):
227
+ geo_info = {}
228
+ if isinstance(proxy_type, Exception):
229
+ proxy_type = "unknown"
230
+
231
+ quality_score = await self.calculate_quality_score(
232
+ latency_ms, anonymity, can_access_google, proxy_type
233
+ )
234
+
235
+ return ValidationResult(
236
+ success=True,
237
+ latency_ms=latency_ms,
238
+ anonymity=anonymity,
239
+ can_access_google=can_access_google,
240
+ country_code=geo_info.get("country_code"),
241
+ country_name=geo_info.get("country_name"),
242
+ proxy_type=proxy_type,
243
+ quality_score=quality_score,
244
+ error_message=None,
245
+ )
246
+
247
+ async def validate_batch(
248
+ self, proxies: List[tuple[str, str]]
249
+ ) -> List[tuple[str, ValidationResult]]:
250
+ tasks = []
251
+ for proxy_url, ip in proxies:
252
+ tasks.append(self.validate_comprehensive(proxy_url, ip))
253
+
254
+ results = await asyncio.gather(*tasks, return_exceptions=True)
255
+
256
+ output = []
257
+ for (proxy_url, ip), result in zip(proxies, results):
258
+ if isinstance(result, Exception):
259
+ output.append(
260
+ (
261
+ proxy_url,
262
+ ValidationResult(
263
+ success=False, error_message=str(result)[:100]
264
+ ),
265
+ )
266
+ )
267
+ else:
268
+ output.append((proxy_url, result))
269
+
270
+ return output
271
+
272
+
273
+ proxy_validator = ProxyValidator()