from __future__ import annotations import asyncio import logging import os import re from dataclasses import dataclass from typing import Iterable, Optional import httpx from dotenv import load_dotenv from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) from telegram.request import HTTPXRequest logging.basicConfig( format="%(asctime)s %(levelname)s %(name)s %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) @dataclass class ProxyRecord: username: str password: str host: str port: int @property def normalized(self) -> str: return f"{self.username}:{self.password}@{self.host}:{self.port}" @dataclass class ProxyTestResult: proxy: ProxyRecord ok: bool detail: str CANDIDATE_SEPARATORS = (" ", "|", "#", ",", ";", "\t") def parse_proxy_line(raw_line: str) -> Optional[ProxyRecord]: cleaned = raw_line.strip() if not cleaned or cleaned.startswith("#"): return None cleaned = cleaned.replace("\u3000", " ") # full-width space cleaned = cleaned.replace("\uff1a", ":") # full-width colon cleaned = cleaned.replace("\uff20", "@") # full-width at sign candidate = cleaned for sep in CANDIDATE_SEPARATORS: if sep in candidate and "@" not in candidate: candidate = candidate.replace(sep, ":") by_at = candidate.split("@") if len(by_at) == 2: credentials, address = by_at user_part = credentials.split(":", 1) host_part = address.split(":", 1) if len(user_part) == 2 and len(host_part) == 2: username, password = user_part host, port_str = host_part return _make_record(username, password, host, port_str) pieces = candidate.split(":") if len(pieces) >= 4: host, port_str, username, password = pieces[:4] return _make_record(username, password, host, port_str) return None def _make_record(username: str, password: str, host: str, port_str: str) -> Optional[ProxyRecord]: username = username.strip() password = password.strip() host = host.strip() port_str = port_str.strip() if not username or not password or not host or not port_str: return None if not re.match(r"^[0-9a-zA-Z_.-]+$", host) and not _is_valid_ip(host): return None try: port = int(port_str) except ValueError: return None if port <= 0 or port > 65535: return None return ProxyRecord(username=username, password=password, host=host, port=port) def _is_valid_ip(host: str) -> bool: parts = host.split(".") if len(parts) != 4: return False for part in parts: if not part.isdigit(): return False value = int(part) if value < 0 or value > 255: return False return True def _parse_allowed_users(raw_value: str) -> set[int]: if not raw_value.strip(): return set() allowed = set() for token in raw_value.replace(";", ",").split(","): token = token.strip() if not token: continue try: allowed.add(int(token)) except ValueError: logger.warning("Ignoring invalid user id token: %s", token) return allowed async def check_proxy(record: ProxyRecord, test_url: str, timeout: float) -> ProxyTestResult: proxy_url = f"socks5://{record.normalized}" try: async with httpx.AsyncClient( proxies={"http": proxy_url, "https": proxy_url}, timeout=timeout, ) as client: response = await client.get(test_url) response.raise_for_status() detail = response.text[:200] return ProxyTestResult(proxy=record, ok=True, detail=detail) except Exception as exc: return ProxyTestResult(proxy=record, ok=False, detail=str(exc)) async def post_to_backend(records: Iterable[ProxyRecord], backend_url: str, token: Optional[str]) -> Optional[str]: payload = {"proxies": [record.normalized for record in records]} headers = {"Content-Type": "application/json"} if token: headers["Authorization"] = f"Bearer {token}" async with httpx.AsyncClient(timeout=10.0) as client: response = await client.post(backend_url, headers=headers, json=payload) response.raise_for_status() return response.text async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: if not await _ensure_authorized(update, context): return await update.message.reply_text( "Send SOCKS5 proxies (one per line). I will normalize, forward to the backend, and test them." ) async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: assert update.message is not None if not await _ensure_authorized(update, context): return text = update.message.text or "" records = [] rejected_lines = [] for line in text.splitlines(): record = parse_proxy_line(line) if record: records.append(record) else: rejected_lines.append(line.strip()) if not records: await update.message.reply_text( "Could not find valid proxies in your message. Each line should look like user:pass@host:port." ) return backend_url = context.application.bot_data.get("backend_url") backend_token = context.application.bot_data.get("backend_token") backend_result = None if backend_url: try: backend_result = await post_to_backend(records, backend_url, backend_token) except Exception as exc: logger.exception("Backend submission failed") backend_result = f"Backend request failed: {exc}" test_url = context.application.bot_data["test_url"] timeout = context.application.bot_data["timeout"] semaphore: asyncio.Semaphore = context.application.bot_data["semaphore"] async def _bounded_check(record: ProxyRecord) -> ProxyTestResult: async with semaphore: return await check_proxy(record, test_url=test_url, timeout=timeout) tasks = [_bounded_check(record) for record in records] results = await asyncio.gather(*tasks) ok_lines = [f"OK {result.proxy.normalized} -> {result.detail}" for result in results if result.ok] fail_lines = [f"FAIL {result.proxy.normalized} -> {result.detail}" for result in results if not result.ok] response_lines = [] if ok_lines: response_lines.append("Working proxies:") response_lines.extend(ok_lines) if fail_lines: response_lines.append("Failed proxies:") response_lines.extend(fail_lines) if rejected_lines: response_lines.append("Rejected lines:") response_lines.extend(rejected_lines) if backend_result: response_lines.append("Backend response:") response_lines.append(backend_result) await update.message.reply_text("\n".join(response_lines[:50])) def build_application(): load_dotenv() bot_token = os.getenv("BOT_TOKEN") if not bot_token: raise RuntimeError("BOT_TOKEN is not set") # Create the application builder application_builder = ApplicationBuilder().token(bot_token) # Check for custom API URL (for proxy) custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip() if custom_api_url: logger.info(f"Using custom Telegram API URL: {custom_api_url}") # The base_url expects format like "https://api.telegram.org/bot" # python-telegram-bot will append "/method" to it # So for custom_api_url = https://worker.dev/telegram-api # We need to pass https://worker.dev/telegram-api/bot api_base = f"{custom_api_url}/bot" file_base = f"{custom_api_url}/file/bot" logger.info(f"Setting API base to: {api_base}") logger.info(f"Setting file base to: {file_base}") application_builder = application_builder.base_url(api_base).base_file_url(file_base) else: logger.info("Using default Telegram API URL") backend_url = os.getenv("BACKEND_URL", "").strip() backend_token = os.getenv("BACKEND_TOKEN", "").strip() or None test_url = os.getenv("PROXY_TEST_URL", "https://api.ipify.org?format=json") timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10")) max_concurrent = int(os.getenv("MAX_CONCURRENT_TESTS", "5")) allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", "")) application = application_builder.build() # If we have a custom API URL, we'll need to handle it differently if custom_api_url: # Store the custom API URL in the application's bot_data application.bot_data["custom_api_url"] = custom_api_url application.bot_data["backend_url"] = backend_url or None application.bot_data["backend_token"] = backend_token application.bot_data["test_url"] = test_url application.bot_data["timeout"] = timeout application.bot_data["semaphore"] = asyncio.Semaphore(max(1, max_concurrent)) application.bot_data["allowed_ids"] = allowed_ids application.add_handler(CommandHandler("start", start)) application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) return application def main() -> None: application = build_application() application.run_polling() if __name__ == "__main__": main() async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set()) if not allowed_ids: return True user = update.effective_user if user and user.id in allowed_ids: return True message = update.effective_message if message: await message.reply_text("You are not allowed to use this bot.") logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown") return False