File size: 10,084 Bytes
66770f6 8816beb 66770f6 310c6d3 66770f6 310c6d3 66770f6 4903048 66770f6 4903048 8816beb 4903048 8816beb fa66df6 8816beb 66770f6 8816beb 66770f6 4903048 66770f6 8816beb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | from __future__ import annotations
import asyncio
import logging
import os
import re
from dataclasses import dataclass
from typing import Iterable, Optional
import httpx
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from telegram.request import HTTPXRequest
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
@dataclass
class ProxyRecord:
username: str
password: str
host: str
port: int
@property
def normalized(self) -> str:
return f"{self.username}:{self.password}@{self.host}:{self.port}"
@dataclass
class ProxyTestResult:
proxy: ProxyRecord
ok: bool
detail: str
CANDIDATE_SEPARATORS = (" ", "|", "#", ",", ";", "\t")
def parse_proxy_line(raw_line: str) -> Optional[ProxyRecord]:
cleaned = raw_line.strip()
if not cleaned or cleaned.startswith("#"):
return None
cleaned = cleaned.replace("\u3000", " ") # full-width space
cleaned = cleaned.replace("\uff1a", ":") # full-width colon
cleaned = cleaned.replace("\uff20", "@") # full-width at sign
candidate = cleaned
for sep in CANDIDATE_SEPARATORS:
if sep in candidate and "@" not in candidate:
candidate = candidate.replace(sep, ":")
by_at = candidate.split("@")
if len(by_at) == 2:
credentials, address = by_at
user_part = credentials.split(":", 1)
host_part = address.split(":", 1)
if len(user_part) == 2 and len(host_part) == 2:
username, password = user_part
host, port_str = host_part
return _make_record(username, password, host, port_str)
pieces = candidate.split(":")
if len(pieces) >= 4:
host, port_str, username, password = pieces[:4]
return _make_record(username, password, host, port_str)
return None
def _make_record(username: str, password: str, host: str, port_str: str) -> Optional[ProxyRecord]:
username = username.strip()
password = password.strip()
host = host.strip()
port_str = port_str.strip()
if not username or not password or not host or not port_str:
return None
if not re.match(r"^[0-9a-zA-Z_.-]+$", host) and not _is_valid_ip(host):
return None
try:
port = int(port_str)
except ValueError:
return None
if port <= 0 or port > 65535:
return None
return ProxyRecord(username=username, password=password, host=host, port=port)
def _is_valid_ip(host: str) -> bool:
parts = host.split(".")
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
value = int(part)
if value < 0 or value > 255:
return False
return True
def _parse_allowed_users(raw_value: str) -> set[int]:
if not raw_value.strip():
return set()
allowed = set()
for token in raw_value.replace(";", ",").split(","):
token = token.strip()
if not token:
continue
try:
allowed.add(int(token))
except ValueError:
logger.warning("Ignoring invalid user id token: %s", token)
return allowed
async def check_proxy(record: ProxyRecord, test_url: str, timeout: float) -> ProxyTestResult:
proxy_url = f"socks5://{record.normalized}"
try:
async with httpx.AsyncClient(
proxies={"http": proxy_url, "https": proxy_url},
timeout=timeout,
) as client:
response = await client.get(test_url)
response.raise_for_status()
detail = response.text[:200]
return ProxyTestResult(proxy=record, ok=True, detail=detail)
except Exception as exc:
return ProxyTestResult(proxy=record, ok=False, detail=str(exc))
async def post_to_backend(records: Iterable[ProxyRecord], backend_url: str, token: Optional[str]) -> Optional[str]:
payload = {"proxies": [record.normalized for record in records]}
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(backend_url, headers=headers, json=payload)
response.raise_for_status()
return response.text
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not await _ensure_authorized(update, context):
return
await update.message.reply_text(
"Send SOCKS5 proxies (one per line). I will normalize, forward to the backend, and test them."
)
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert update.message is not None
if not await _ensure_authorized(update, context):
return
text = update.message.text or ""
records = []
rejected_lines = []
for line in text.splitlines():
record = parse_proxy_line(line)
if record:
records.append(record)
else:
rejected_lines.append(line.strip())
if not records:
await update.message.reply_text(
"Could not find valid proxies in your message. Each line should look like user:pass@host:port."
)
return
backend_url = context.application.bot_data.get("backend_url")
backend_token = context.application.bot_data.get("backend_token")
backend_result = None
if backend_url:
try:
backend_result = await post_to_backend(records, backend_url, backend_token)
except Exception as exc:
logger.exception("Backend submission failed")
backend_result = f"Backend request failed: {exc}"
test_url = context.application.bot_data["test_url"]
timeout = context.application.bot_data["timeout"]
semaphore: asyncio.Semaphore = context.application.bot_data["semaphore"]
async def _bounded_check(record: ProxyRecord) -> ProxyTestResult:
async with semaphore:
return await check_proxy(record, test_url=test_url, timeout=timeout)
tasks = [_bounded_check(record) for record in records]
results = await asyncio.gather(*tasks)
ok_lines = [f"OK {result.proxy.normalized} -> {result.detail}" for result in results if result.ok]
fail_lines = [f"FAIL {result.proxy.normalized} -> {result.detail}" for result in results if not result.ok]
response_lines = []
if ok_lines:
response_lines.append("Working proxies:")
response_lines.extend(ok_lines)
if fail_lines:
response_lines.append("Failed proxies:")
response_lines.extend(fail_lines)
if rejected_lines:
response_lines.append("Rejected lines:")
response_lines.extend(rejected_lines)
if backend_result:
response_lines.append("Backend response:")
response_lines.append(backend_result)
await update.message.reply_text("\n".join(response_lines[:50]))
def build_application():
load_dotenv()
bot_token = os.getenv("BOT_TOKEN")
if not bot_token:
raise RuntimeError("BOT_TOKEN is not set")
# Create the application builder
application_builder = ApplicationBuilder().token(bot_token)
# Check for custom API URL (for proxy)
custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip()
if custom_api_url:
logger.info(f"Using custom Telegram API URL: {custom_api_url}")
# The base_url expects format like "https://api.telegram.org/bot"
# python-telegram-bot will append "<token>/method" to it
# So for custom_api_url = https://worker.dev/telegram-api
# We need to pass https://worker.dev/telegram-api/bot
api_base = f"{custom_api_url}/bot"
file_base = f"{custom_api_url}/file/bot"
logger.info(f"Setting API base to: {api_base}")
logger.info(f"Setting file base to: {file_base}")
application_builder = application_builder.base_url(api_base).base_file_url(file_base)
else:
logger.info("Using default Telegram API URL")
backend_url = os.getenv("BACKEND_URL", "").strip()
backend_token = os.getenv("BACKEND_TOKEN", "").strip() or None
test_url = os.getenv("PROXY_TEST_URL", "https://api.ipify.org?format=json")
timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10"))
max_concurrent = int(os.getenv("MAX_CONCURRENT_TESTS", "5"))
allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", ""))
application = application_builder.build()
# If we have a custom API URL, we'll need to handle it differently
if custom_api_url:
# Store the custom API URL in the application's bot_data
application.bot_data["custom_api_url"] = custom_api_url
application.bot_data["backend_url"] = backend_url or None
application.bot_data["backend_token"] = backend_token
application.bot_data["test_url"] = test_url
application.bot_data["timeout"] = timeout
application.bot_data["semaphore"] = asyncio.Semaphore(max(1, max_concurrent))
application.bot_data["allowed_ids"] = allowed_ids
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text))
return application
def main() -> None:
application = build_application()
application.run_polling()
if __name__ == "__main__":
main()
async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set())
if not allowed_ids:
return True
user = update.effective_user
if user and user.id in allowed_ids:
return True
message = update.effective_message
if message:
await message.reply_text("You are not allowed to use this bot.")
logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown")
return False |