| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import os |
| import re |
| from dataclasses import dataclass |
| from typing import Iterable, Optional |
|
|
| import httpx |
| from dotenv import load_dotenv |
| from telegram import Update |
| from telegram.ext import ( |
| ApplicationBuilder, |
| CommandHandler, |
| ContextTypes, |
| MessageHandler, |
| filters, |
| ) |
| from telegram.request import HTTPXRequest |
|
|
| logging.basicConfig( |
| format="%(asctime)s %(levelname)s %(name)s %(message)s", |
| level=logging.INFO, |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ProxyRecord: |
| username: str |
| password: str |
| host: str |
| port: int |
|
|
| @property |
| def normalized(self) -> str: |
| return f"{self.username}:{self.password}@{self.host}:{self.port}" |
|
|
|
|
| @dataclass |
| class ProxyTestResult: |
| proxy: ProxyRecord |
| ok: bool |
| detail: str |
|
|
|
|
| CANDIDATE_SEPARATORS = (" ", "|", "#", ",", ";", "\t") |
|
|
|
|
| def parse_proxy_line(raw_line: str) -> Optional[ProxyRecord]: |
| cleaned = raw_line.strip() |
| if not cleaned or cleaned.startswith("#"): |
| return None |
|
|
| cleaned = cleaned.replace("\u3000", " ") |
| cleaned = cleaned.replace("\uff1a", ":") |
| cleaned = cleaned.replace("\uff20", "@") |
|
|
| candidate = cleaned |
| for sep in CANDIDATE_SEPARATORS: |
| if sep in candidate and "@" not in candidate: |
| candidate = candidate.replace(sep, ":") |
|
|
| by_at = candidate.split("@") |
| if len(by_at) == 2: |
| credentials, address = by_at |
| user_part = credentials.split(":", 1) |
| host_part = address.split(":", 1) |
| if len(user_part) == 2 and len(host_part) == 2: |
| username, password = user_part |
| host, port_str = host_part |
| return _make_record(username, password, host, port_str) |
|
|
| pieces = candidate.split(":") |
| if len(pieces) >= 4: |
| host, port_str, username, password = pieces[:4] |
| return _make_record(username, password, host, port_str) |
|
|
| return None |
|
|
|
|
| def _make_record(username: str, password: str, host: str, port_str: str) -> Optional[ProxyRecord]: |
| username = username.strip() |
| password = password.strip() |
| host = host.strip() |
| port_str = port_str.strip() |
|
|
| if not username or not password or not host or not port_str: |
| return None |
|
|
| if not re.match(r"^[0-9a-zA-Z_.-]+$", host) and not _is_valid_ip(host): |
| return None |
|
|
| try: |
| port = int(port_str) |
| except ValueError: |
| return None |
|
|
| if port <= 0 or port > 65535: |
| return None |
|
|
| return ProxyRecord(username=username, password=password, host=host, port=port) |
|
|
|
|
| def _is_valid_ip(host: str) -> bool: |
| parts = host.split(".") |
| if len(parts) != 4: |
| return False |
| for part in parts: |
| if not part.isdigit(): |
| return False |
| value = int(part) |
| if value < 0 or value > 255: |
| return False |
| return True |
|
|
|
|
| def _parse_allowed_users(raw_value: str) -> set[int]: |
| if not raw_value.strip(): |
| return set() |
| allowed = set() |
| for token in raw_value.replace(";", ",").split(","): |
| token = token.strip() |
| if not token: |
| continue |
| try: |
| allowed.add(int(token)) |
| except ValueError: |
| logger.warning("Ignoring invalid user id token: %s", token) |
| return allowed |
|
|
|
|
| async def check_proxy(record: ProxyRecord, test_url: str, timeout: float) -> ProxyTestResult: |
| proxy_url = f"socks5://{record.normalized}" |
| try: |
| async with httpx.AsyncClient( |
| proxies={"http": proxy_url, "https": proxy_url}, |
| timeout=timeout, |
| ) as client: |
| response = await client.get(test_url) |
| response.raise_for_status() |
| detail = response.text[:200] |
| return ProxyTestResult(proxy=record, ok=True, detail=detail) |
| except Exception as exc: |
| return ProxyTestResult(proxy=record, ok=False, detail=str(exc)) |
|
|
|
|
| async def post_to_backend(records: Iterable[ProxyRecord], backend_url: str, token: Optional[str]) -> Optional[str]: |
| payload = {"proxies": [record.normalized for record in records]} |
| headers = {"Content-Type": "application/json"} |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
|
|
| async with httpx.AsyncClient(timeout=10.0) as client: |
| response = await client.post(backend_url, headers=headers, json=payload) |
| response.raise_for_status() |
| return response.text |
|
|
|
|
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| if not await _ensure_authorized(update, context): |
| return |
|
|
| await update.message.reply_text( |
| "Send SOCKS5 proxies (one per line). I will normalize, forward to the backend, and test them." |
| ) |
|
|
|
|
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| assert update.message is not None |
| if not await _ensure_authorized(update, context): |
| return |
|
|
| text = update.message.text or "" |
|
|
| records = [] |
| rejected_lines = [] |
| for line in text.splitlines(): |
| record = parse_proxy_line(line) |
| if record: |
| records.append(record) |
| else: |
| rejected_lines.append(line.strip()) |
|
|
| if not records: |
| await update.message.reply_text( |
| "Could not find valid proxies in your message. Each line should look like user:pass@host:port." |
| ) |
| return |
|
|
| backend_url = context.application.bot_data.get("backend_url") |
| backend_token = context.application.bot_data.get("backend_token") |
| backend_result = None |
|
|
| if backend_url: |
| try: |
| backend_result = await post_to_backend(records, backend_url, backend_token) |
| except Exception as exc: |
| logger.exception("Backend submission failed") |
| backend_result = f"Backend request failed: {exc}" |
|
|
| test_url = context.application.bot_data["test_url"] |
| timeout = context.application.bot_data["timeout"] |
| semaphore: asyncio.Semaphore = context.application.bot_data["semaphore"] |
|
|
| async def _bounded_check(record: ProxyRecord) -> ProxyTestResult: |
| async with semaphore: |
| return await check_proxy(record, test_url=test_url, timeout=timeout) |
|
|
| tasks = [_bounded_check(record) for record in records] |
| results = await asyncio.gather(*tasks) |
|
|
| ok_lines = [f"OK {result.proxy.normalized} -> {result.detail}" for result in results if result.ok] |
| fail_lines = [f"FAIL {result.proxy.normalized} -> {result.detail}" for result in results if not result.ok] |
|
|
| response_lines = [] |
| if ok_lines: |
| response_lines.append("Working proxies:") |
| response_lines.extend(ok_lines) |
| if fail_lines: |
| response_lines.append("Failed proxies:") |
| response_lines.extend(fail_lines) |
| if rejected_lines: |
| response_lines.append("Rejected lines:") |
| response_lines.extend(rejected_lines) |
| if backend_result: |
| response_lines.append("Backend response:") |
| response_lines.append(backend_result) |
|
|
| await update.message.reply_text("\n".join(response_lines[:50])) |
|
|
|
|
| def build_application(): |
| load_dotenv() |
|
|
| bot_token = os.getenv("BOT_TOKEN") |
| if not bot_token: |
| raise RuntimeError("BOT_TOKEN is not set") |
|
|
| |
| application_builder = ApplicationBuilder().token(bot_token) |
|
|
| |
| custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip() |
| if custom_api_url: |
| logger.info(f"Using custom Telegram API URL: {custom_api_url}") |
| |
| |
| |
| |
| api_base = f"{custom_api_url}/bot" |
| file_base = f"{custom_api_url}/file/bot" |
| logger.info(f"Setting API base to: {api_base}") |
| logger.info(f"Setting file base to: {file_base}") |
| application_builder = application_builder.base_url(api_base).base_file_url(file_base) |
| else: |
| logger.info("Using default Telegram API URL") |
|
|
| backend_url = os.getenv("BACKEND_URL", "").strip() |
| backend_token = os.getenv("BACKEND_TOKEN", "").strip() or None |
| test_url = os.getenv("PROXY_TEST_URL", "https://api.ipify.org?format=json") |
| timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10")) |
| max_concurrent = int(os.getenv("MAX_CONCURRENT_TESTS", "5")) |
| allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", "")) |
|
|
| application = application_builder.build() |
|
|
| |
| if custom_api_url: |
| |
| application.bot_data["custom_api_url"] = custom_api_url |
|
|
| application.bot_data["backend_url"] = backend_url or None |
| application.bot_data["backend_token"] = backend_token |
| application.bot_data["test_url"] = test_url |
| application.bot_data["timeout"] = timeout |
| application.bot_data["semaphore"] = asyncio.Semaphore(max(1, max_concurrent)) |
| application.bot_data["allowed_ids"] = allowed_ids |
|
|
| application.add_handler(CommandHandler("start", start)) |
| application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) |
|
|
| return application |
|
|
|
|
| def main() -> None: |
| application = build_application() |
| application.run_polling() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|
| async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: |
| allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set()) |
| if not allowed_ids: |
| return True |
|
|
| user = update.effective_user |
| if user and user.id in allowed_ids: |
| return True |
|
|
| message = update.effective_message |
| if message: |
| await message.reply_text("You are not allowed to use this bot.") |
|
|
| logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown") |
| return False |