File size: 10,084 Bytes
66770f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8816beb
66770f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
310c6d3
66770f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
310c6d3
66770f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4903048
66770f6
 
 
 
 
4903048
8816beb
 
4903048
 
 
8816beb
 
fa66df6
 
 
 
 
 
 
 
 
8816beb
 
66770f6
 
 
 
 
 
 
 
8816beb
66770f6
4903048
 
 
 
 
66770f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8816beb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
from __future__ import annotations

import asyncio
import logging
import os
import re
from dataclasses import dataclass
from typing import Iterable, Optional

import httpx
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
    ApplicationBuilder,
    CommandHandler,
    ContextTypes,
    MessageHandler,
    filters,
)
from telegram.request import HTTPXRequest

logging.basicConfig(
    format="%(asctime)s %(levelname)s %(name)s %(message)s",
    level=logging.INFO,
)
logger = logging.getLogger(__name__)


@dataclass
class ProxyRecord:
    username: str
    password: str
    host: str
    port: int

    @property
    def normalized(self) -> str:
        return f"{self.username}:{self.password}@{self.host}:{self.port}"


@dataclass
class ProxyTestResult:
    proxy: ProxyRecord
    ok: bool
    detail: str


CANDIDATE_SEPARATORS = (" ", "|", "#", ",", ";", "\t")


def parse_proxy_line(raw_line: str) -> Optional[ProxyRecord]:
    cleaned = raw_line.strip()
    if not cleaned or cleaned.startswith("#"):
        return None

    cleaned = cleaned.replace("\u3000", " ")  # full-width space
    cleaned = cleaned.replace("\uff1a", ":")  # full-width colon
    cleaned = cleaned.replace("\uff20", "@")  # full-width at sign

    candidate = cleaned
    for sep in CANDIDATE_SEPARATORS:
        if sep in candidate and "@" not in candidate:
            candidate = candidate.replace(sep, ":")

    by_at = candidate.split("@")
    if len(by_at) == 2:
        credentials, address = by_at
        user_part = credentials.split(":", 1)
        host_part = address.split(":", 1)
        if len(user_part) == 2 and len(host_part) == 2:
            username, password = user_part
            host, port_str = host_part
            return _make_record(username, password, host, port_str)

    pieces = candidate.split(":")
    if len(pieces) >= 4:
        host, port_str, username, password = pieces[:4]
        return _make_record(username, password, host, port_str)

    return None


def _make_record(username: str, password: str, host: str, port_str: str) -> Optional[ProxyRecord]:
    username = username.strip()
    password = password.strip()
    host = host.strip()
    port_str = port_str.strip()

    if not username or not password or not host or not port_str:
        return None

    if not re.match(r"^[0-9a-zA-Z_.-]+$", host) and not _is_valid_ip(host):
        return None

    try:
        port = int(port_str)
    except ValueError:
        return None

    if port <= 0 or port > 65535:
        return None

    return ProxyRecord(username=username, password=password, host=host, port=port)


def _is_valid_ip(host: str) -> bool:
    parts = host.split(".")
    if len(parts) != 4:
        return False
    for part in parts:
        if not part.isdigit():
            return False
        value = int(part)
        if value < 0 or value > 255:
            return False
    return True


def _parse_allowed_users(raw_value: str) -> set[int]:
    if not raw_value.strip():
        return set()
    allowed = set()
    for token in raw_value.replace(";", ",").split(","):
        token = token.strip()
        if not token:
            continue
        try:
            allowed.add(int(token))
        except ValueError:
            logger.warning("Ignoring invalid user id token: %s", token)
    return allowed


async def check_proxy(record: ProxyRecord, test_url: str, timeout: float) -> ProxyTestResult:
    proxy_url = f"socks5://{record.normalized}"
    try:
        async with httpx.AsyncClient(
            proxies={"http": proxy_url, "https": proxy_url},
            timeout=timeout,
        ) as client:
            response = await client.get(test_url)
            response.raise_for_status()
            detail = response.text[:200]
            return ProxyTestResult(proxy=record, ok=True, detail=detail)
    except Exception as exc:
        return ProxyTestResult(proxy=record, ok=False, detail=str(exc))


async def post_to_backend(records: Iterable[ProxyRecord], backend_url: str, token: Optional[str]) -> Optional[str]:
    payload = {"proxies": [record.normalized for record in records]}
    headers = {"Content-Type": "application/json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"

    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.post(backend_url, headers=headers, json=payload)
        response.raise_for_status()
        return response.text


async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    if not await _ensure_authorized(update, context):
        return

    await update.message.reply_text(
        "Send SOCKS5 proxies (one per line). I will normalize, forward to the backend, and test them."
    )


async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    assert update.message is not None
    if not await _ensure_authorized(update, context):
        return

    text = update.message.text or ""

    records = []
    rejected_lines = []
    for line in text.splitlines():
        record = parse_proxy_line(line)
        if record:
            records.append(record)
        else:
            rejected_lines.append(line.strip())

    if not records:
        await update.message.reply_text(
            "Could not find valid proxies in your message. Each line should look like user:pass@host:port."
        )
        return

    backend_url = context.application.bot_data.get("backend_url")
    backend_token = context.application.bot_data.get("backend_token")
    backend_result = None

    if backend_url:
        try:
            backend_result = await post_to_backend(records, backend_url, backend_token)
        except Exception as exc:
            logger.exception("Backend submission failed")
            backend_result = f"Backend request failed: {exc}"

    test_url = context.application.bot_data["test_url"]
    timeout = context.application.bot_data["timeout"]
    semaphore: asyncio.Semaphore = context.application.bot_data["semaphore"]

    async def _bounded_check(record: ProxyRecord) -> ProxyTestResult:
        async with semaphore:
            return await check_proxy(record, test_url=test_url, timeout=timeout)

    tasks = [_bounded_check(record) for record in records]
    results = await asyncio.gather(*tasks)

    ok_lines = [f"OK {result.proxy.normalized} -> {result.detail}" for result in results if result.ok]
    fail_lines = [f"FAIL {result.proxy.normalized} -> {result.detail}" for result in results if not result.ok]

    response_lines = []
    if ok_lines:
        response_lines.append("Working proxies:")
        response_lines.extend(ok_lines)
    if fail_lines:
        response_lines.append("Failed proxies:")
        response_lines.extend(fail_lines)
    if rejected_lines:
        response_lines.append("Rejected lines:")
        response_lines.extend(rejected_lines)
    if backend_result:
        response_lines.append("Backend response:")
        response_lines.append(backend_result)

    await update.message.reply_text("\n".join(response_lines[:50]))


def build_application():
    load_dotenv()

    bot_token = os.getenv("BOT_TOKEN")
    if not bot_token:
        raise RuntimeError("BOT_TOKEN is not set")

    # Create the application builder
    application_builder = ApplicationBuilder().token(bot_token)

    # Check for custom API URL (for proxy)
    custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip()
    if custom_api_url:
        logger.info(f"Using custom Telegram API URL: {custom_api_url}")
        # The base_url expects format like "https://api.telegram.org/bot"
        # python-telegram-bot will append "<token>/method" to it
        # So for custom_api_url = https://worker.dev/telegram-api
        # We need to pass https://worker.dev/telegram-api/bot
        api_base = f"{custom_api_url}/bot"
        file_base = f"{custom_api_url}/file/bot"
        logger.info(f"Setting API base to: {api_base}")
        logger.info(f"Setting file base to: {file_base}")
        application_builder = application_builder.base_url(api_base).base_file_url(file_base)
    else:
        logger.info("Using default Telegram API URL")

    backend_url = os.getenv("BACKEND_URL", "").strip()
    backend_token = os.getenv("BACKEND_TOKEN", "").strip() or None
    test_url = os.getenv("PROXY_TEST_URL", "https://api.ipify.org?format=json")
    timeout = float(os.getenv("PROXY_TEST_TIMEOUT", "10"))
    max_concurrent = int(os.getenv("MAX_CONCURRENT_TESTS", "5"))
    allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", ""))

    application = application_builder.build()

    # If we have a custom API URL, we'll need to handle it differently
    if custom_api_url:
        # Store the custom API URL in the application's bot_data
        application.bot_data["custom_api_url"] = custom_api_url

    application.bot_data["backend_url"] = backend_url or None
    application.bot_data["backend_token"] = backend_token
    application.bot_data["test_url"] = test_url
    application.bot_data["timeout"] = timeout
    application.bot_data["semaphore"] = asyncio.Semaphore(max(1, max_concurrent))
    application.bot_data["allowed_ids"] = allowed_ids

    application.add_handler(CommandHandler("start", start))
    application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text))

    return application


def main() -> None:
    application = build_application()
    application.run_polling()


if __name__ == "__main__":
    main()


async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
    allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set())
    if not allowed_ids:
        return True

    user = update.effective_user
    if user and user.id in allowed_ids:
        return True

    message = update.effective_message
    if message:
        await message.reply_text("You are not allowed to use this bot.")

    logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown")
    return False