tgsocks5 / bot.py
Jack698's picture
Upload folder using huggingface_hub
fa66df6 verified
from __future__ import annotations
import asyncio
import logging
import os
import re
from dataclasses import dataclass
from typing import Iterable, Optional
import httpx
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
from telegram.request import HTTPXRequest
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
@dataclass
class ProxyRecord:
username: str
password: str
host: str
port: int
@property
def normalized(self) -> str:
return f"{self.username}:{self.password}@{self.host}:{self.port}"
@dataclass
class ProxyTestResult:
proxy: ProxyRecord
ok: bool
detail: str
CANDIDATE_SEPARATORS = (" ", "|", "#", ",", ";", "\t")
def parse_proxy_line(raw_line: str) -> Optional[ProxyRecord]:
cleaned = raw_line.strip()
if not cleaned or cleaned.startswith("#"):
return None
cleaned = cleaned.replace("\u3000", " ") # full-width space
cleaned = cleaned.replace("\uff1a", ":") # full-width colon
cleaned = cleaned.replace("\uff20", "@") # full-width at sign
candidate = cleaned
for sep in CANDIDATE_SEPARATORS:
if sep in candidate and "@" not in candidate:
candidate = candidate.replace(sep, ":")
by_at = candidate.split("@")
if len(by_at) == 2:
credentials, address = by_at
user_part = credentials.split(":", 1)
host_part = address.split(":", 1)
if len(user_part) == 2 and len(host_part) == 2:
username, password = user_part
host, port_str = host_part
return _make_record(username, password, host, port_str)
pieces = candidate.split(":")
if len(pieces) >= 4:
host, port_str, username, password = pieces[:4]
return _make_record(username, password, host, port_str)
return None
def _make_record(username: str, password: str, host: str, port_str: str) -> Optional[ProxyRecord]:
username = username.strip()
password = password.strip()
host = host.strip()
port_str = port_str.strip()
if not username or not password or not host or not port_str:
return None
if not re.match(r"^[0-9a-zA-Z_.-]+$", host) and not _is_valid_ip(host):
return None
try:
port = int(port_str)
except ValueError:
return None
if port <= 0 or port > 65535:
return None
return ProxyRecord(username=username, password=password, host=host, port=port)
def _is_valid_ip(host: str) -> bool:
parts = host.split(".")
if len(parts) != 4:
return False
for part in parts:
if not part.isdigit():
return False
value = int(part)
if value < 0 or value > 255:
return False
return True
def _parse_allowed_users(raw_value: str) -> set[int]:
if not raw_value.strip():
return set()
allowed = set()
for token in raw_value.replace(";", ",").split(","):
token = token.strip()
if not token:
continue
try:
allowed.add(int(token))
except ValueError:
logger.warning("Ignoring invalid user id token: %s", token)
return allowed
async def check_proxy(record: ProxyRecord, test_url: str, timeout: float) -> ProxyTestResult:
proxy_url = f"socks5://{record.normalized}"
try:
async with httpx.AsyncClient(
proxies={"http": proxy_url, "https": proxy_url},
timeout=timeout,
) as client:
response = await client.get(test_url)
response.raise_for_status()
detail = response.text[:200]
return ProxyTestResult(proxy=record, ok=True, detail=detail)
except Exception as exc:
return ProxyTestResult(proxy=record, ok=False, detail=str(exc))
async def post_to_backend(records: Iterable[ProxyRecord], backend_url: str, token: Optional[str]) -> Optional[str]:
payload = {"proxies": [record.normalized for record in records]}
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(backend_url, headers=headers, json=payload)
response.raise_for_status()
return response.text
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
if not await _ensure_authorized(update, context):
return
await update.message.reply_text(
"Send SOCKS5 proxies (one per line). I will normalize, forward to the backend, and test them."
)
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
assert update.message is not None
if not await _ensure_authorized(update, context):
return
text = update.message.text or ""
records = []
rejected_lines = []
for line in text.splitlines():
record = parse_proxy_line(line)
if record:
records.append(record)
else:
rejected_lines.append(line.strip())
if not records:
await update.message.reply_text(
"Could not find valid proxies in your message. Each line should look like user:pass@host:port."
)
return
backend_url = context.application.bot_data.get("backend_url")
backend_token = context.application.bot_data.get("backend_token")
backend_result = None
if backend_url:
try:
backend_result = await post_to_backend(records, backend_url, backend_token)
except Exception as exc:
logger.exception("Backend submission failed")
backend_result = f"Backend request failed: {exc}"
test_url = context.application.bot_data["test_url"]
timeout = context.application.bot_data["timeout"]
semaphore: asyncio.Semaphore = context.application.bot_data["semaphore"]
async def _bounded_check(record: ProxyRecord) -> ProxyTestResult:
async with semaphore:
return await check_proxy(record, test_url=test_url, timeout=timeout)
tasks = [_bounded_check(record) for record in records]
results = await asyncio.gather(*tasks)
ok_lines = [f"OK {result.proxy.normalized} -> {result.detail}" for result in results if result.ok]
fail_lines = [f"FAIL {result.proxy.normalized} -> {result.detail}" for result in results if not result.ok]
response_lines = []
if ok_lines:
response_lines.append("Working proxies:")
response_lines.extend(ok_lines)
if fail_lines:
response_lines.append("Failed proxies:")
response_lines.extend(fail_lines)
if rejected_lines:
response_lines.append("Rejected lines:")
response_lines.extend(rejected_lines)
if backend_result:
response_lines.append("Backend response:")
response_lines.append(backend_result)
await update.message.reply_text("\n".join(response_lines[:50]))
def build_application():
load_dotenv()
bot_token = os.getenv("BOT_TOKEN")
if not bot_token:
raise RuntimeError("BOT_TOKEN is not set")
# Create the application builder
application_builder = ApplicationBuilder().token(bot_token)
# Check for custom API URL (for proxy)
custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip()
if custom_api_url:
logger.info(f"Using custom Telegram API URL: {custom_api_url}")
# The base_url expects format like "https://api.telegram.org/bot"
# python-telegram-bot will append "<token>/method" to it
# So for custom_api_url = https://worker.dev/telegram-api
# We need to pass https://worker.dev/telegram-api/bot
api_base = f"{custom_api_url}/bot"
file_base = f"{custom_api_url}/file/bot"
logger.info(f"Setting API base to: {api_base}")
logger.info(f"Setting file base to: {file_base}")
application_builder = application_builder.base_url(api_base).base_file_url(file_base)
else:
logger.info("Using default Telegram API URL")
backend_url = os.getenv("BACKEND_URL", "").strip()
backend_token = os.getenv("BACKEND_TOKEN", "").strip() or None
test_url = os.getenv("PROXY_TEST_URL", "https://api.ipify.org?format=json")
timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10"))
max_concurrent = int(os.getenv("MAX_CONCURRENT_TESTS", "5"))
allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", ""))
application = application_builder.build()
# If we have a custom API URL, we'll need to handle it differently
if custom_api_url:
# Store the custom API URL in the application's bot_data
application.bot_data["custom_api_url"] = custom_api_url
application.bot_data["backend_url"] = backend_url or None
application.bot_data["backend_token"] = backend_token
application.bot_data["test_url"] = test_url
application.bot_data["timeout"] = timeout
application.bot_data["semaphore"] = asyncio.Semaphore(max(1, max_concurrent))
application.bot_data["allowed_ids"] = allowed_ids
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text))
return application
def main() -> None:
application = build_application()
application.run_polling()
if __name__ == "__main__":
main()
async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set())
if not allowed_ids:
return True
user = update.effective_user
if user and user.id in allowed_ids:
return True
message = update.effective_message
if message:
await message.reply_text("You are not allowed to use this bot.")
logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown")
return False