| from __future__ import annotations |
|
|
| import asyncio |
| import logging |
| import os |
| import re |
| import random |
| from typing import Optional |
|
|
| from dotenv import load_dotenv |
| from telegram import Update |
| from telegram.ext import ( |
| ApplicationBuilder, |
| CommandHandler, |
| ContextTypes, |
| MessageHandler, |
| filters, |
| ) |
|
|
| logging.basicConfig( |
| format="%(asctime)s %(levelname)s %(name)s %(message)s", |
| level=logging.INFO, |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| def luhn_checksum(card_number: str) -> int: |
| """计算Luhn校验和""" |
| def digits_of(n): |
| return [int(d) for d in str(n)] |
| |
| digits = digits_of(card_number) |
| odd_digits = digits[-1::-2] |
| even_digits = digits[-2::-2] |
| |
| checksum = 0 |
| checksum += sum(odd_digits) |
| |
| for d in even_digits: |
| checksum += sum(digits_of(d * 2)) |
| |
| return checksum % 10 |
|
|
|
|
| def is_luhn_valid(card_number: str) -> bool: |
| """验证卡号是否符合Luhn算法""" |
| return luhn_checksum(card_number) == 0 |
|
|
|
|
| def generate_luhn_valid_number(prefix: str, length: int = 16) -> str: |
| """生成符合Luhn算法的卡号""" |
| |
| prefix = str(prefix) |
| |
| |
| if len(prefix) >= length: |
| if is_luhn_valid(prefix[:length]): |
| return prefix[:length] |
| else: |
| raise ValueError("前缀长度已达到但不符合Luhn算法") |
| |
| |
| remaining_length = length - len(prefix) - 1 |
| random_digits = ''.join([str(random.randint(0, 9)) for _ in range(remaining_length)]) |
| |
| |
| card_number = prefix + random_digits + '0' |
| |
| |
| checksum = luhn_checksum(card_number) |
| correct_check_digit = (10 - checksum) % 10 |
| |
| |
| valid_card_number = card_number[:-1] + str(correct_check_digit) |
| |
| return valid_card_number |
|
|
|
|
| def parse_and_generate_card(input_string: str) -> dict: |
| """解析输入字符串并生成完整的信用卡信息""" |
| |
| |
| pattern = r'^(\d+x*)\|(\d{1,2})\|(\d{2,4})(?:\|(x*))?$' |
| match = re.match(pattern, input_string) |
| |
| if not match: |
| raise ValueError("输入格式不正确,应为:前缀|月份|年份|CVV (CVV可选)") |
| |
| prefix_part = match.group(1) |
| month = match.group(2) |
| year = match.group(3) |
| cvv_part = match.group(4) if match.group(4) else 'xxx' |
| |
| |
| prefix_match = re.match(r'^(\d+)(x*)$', prefix_part) |
| if not prefix_match: |
| raise ValueError("前缀格式不正确,应为数字后跟x") |
| |
| prefix = prefix_match.group(1) |
| |
| |
| if prefix.startswith('34') or prefix.startswith('37'): |
| total_length = 15 |
| else: |
| total_length = 16 |
| |
| |
| full_card_number = generate_luhn_valid_number(prefix, total_length) |
| |
| |
| month = month.zfill(2) |
| if len(year) == 2: |
| |
| year = '20' + year |
| |
| |
| if prefix.startswith('34') or prefix.startswith('37'): |
| cvv_length = 4 |
| else: |
| cvv_length = 3 |
| |
| cvv = ''.join([str(random.randint(0, 9)) for _ in range(cvv_length)]) |
| |
| return { |
| 'card_number': full_card_number, |
| 'expiry_month': month, |
| 'expiry_year': year, |
| 'cvv': cvv, |
| 'formatted': f"{full_card_number}|{month}|{year}|{cvv}" |
| } |
|
|
|
|
| def is_credit_card_format(text: str) -> bool: |
| """检测文本是否为信用卡格式""" |
| |
| pattern = r'^\d+x*\|\d{1,2}\|\d{2,4}(?:\|x*)?$' |
| return bool(re.match(pattern, text.strip())) |
|
|
|
|
| def generate_multiple_cards(input_string: str, count: int = 10) -> list: |
| """根据输入格式生成多张信用卡""" |
| cards = [] |
| for _ in range(count): |
| try: |
| card_info = parse_and_generate_card(input_string) |
| cards.append(card_info) |
| except ValueError as e: |
| logger.warning(f"生成信用卡失败: {e}") |
| continue |
| return cards |
|
|
|
|
| def _parse_allowed_users(raw_value: str) -> set[int]: |
| """解析允许的用户ID列表""" |
| if not raw_value.strip(): |
| return set() |
| allowed = set() |
| for token in raw_value.replace(";", ",").split(","): |
| token = token.strip() |
| if not token: |
| continue |
| try: |
| allowed.add(int(token)) |
| except ValueError: |
| logger.warning("Ignoring invalid user id token: %s", token) |
| return allowed |
|
|
|
|
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| """处理/start命令""" |
| if not await _ensure_authorized(update, context): |
| return |
|
|
| await update.message.reply_text( |
| "欢迎使用信用卡生成机器人!\n\n" |
| "请发送信用卡格式,我将为您生成10张随机信用卡。\n" |
| "格式示例:\n" |
| "• 440393xxxxx|3|27|xxx\n" |
| "• 424242|11|28\n" |
| "• 34xxxxx|6|25|xxxx (American Express)\n\n" |
| "格式说明:前缀|月份|年份|CVV(CVV可选)" |
| ) |
|
|
|
|
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| """处理文本消息""" |
| assert update.message is not None |
| if not await _ensure_authorized(update, context): |
| return |
|
|
| text = update.message.text or "" |
| |
| |
| if is_credit_card_format(text): |
| await handle_credit_card_request(update, text) |
| else: |
| await update.message.reply_text( |
| "格式不正确!请使用以下格式:\n" |
| "• 440393xxxxx|3|27|xxx\n" |
| "• 424242|11|28\n\n" |
| "格式说明:前缀|月份|年份|CVV(CVV可选)" |
| ) |
|
|
|
|
| async def handle_credit_card_request(update: Update, input_format: str) -> None: |
| """处理信用卡生成请求""" |
| try: |
| |
| cards = generate_multiple_cards(input_format, 10) |
| |
| if not cards: |
| await update.message.reply_text("生成信用卡失败,请检查输入格式。") |
| return |
| |
| |
| response_lines = [f"🎉 成功生成 {len(cards)} 张信用卡:\n"] |
| |
| for i, card in enumerate(cards, 1): |
| response_lines.append( |
| f"{i}. {card['formatted']}" |
| ) |
| |
| |
| batch_size = 5 |
| for i in range(0, len(response_lines), batch_size * 2): |
| batch = response_lines[i:i + batch_size * 2] |
| await update.message.reply_text("\n".join(batch)) |
| |
| except Exception as e: |
| logger.exception("处理信用卡请求时出错") |
| await update.message.reply_text(f"生成信用卡时出错:{str(e)}") |
|
|
|
|
| def build_application(): |
| """构建Telegram应用""" |
| load_dotenv() |
|
|
| bot_token = os.getenv("BOT_TOKEN") |
| if not bot_token: |
| raise RuntimeError("BOT_TOKEN is not set") |
|
|
| |
| application_builder = ApplicationBuilder().token(bot_token) |
|
|
| |
| custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip() |
| if custom_api_url: |
| logger.info(f"Using custom Telegram API URL: {custom_api_url}") |
| api_base = f"{custom_api_url}/bot" |
| file_base = f"{custom_api_url}/file/bot" |
| logger.info(f"Setting API base to: {api_base}") |
| logger.info(f"Setting file base to: {file_base}") |
| application_builder = application_builder.base_url(api_base).base_file_url(file_base) |
| else: |
| logger.info("Using default Telegram API URL") |
|
|
| allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", "")) |
|
|
| application = application_builder.build() |
|
|
| |
| if custom_api_url: |
| application.bot_data["custom_api_url"] = custom_api_url |
|
|
| application.bot_data["allowed_ids"] = allowed_ids |
|
|
| application.add_handler(CommandHandler("start", start)) |
| application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) |
|
|
| return application |
|
|
|
|
| def main() -> None: |
| """主函数""" |
| application = build_application() |
| application.run_polling() |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|
|
|
| async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: |
| """确保用户有权限使用机器人""" |
| allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set()) |
| if not allowed_ids: |
| return True |
|
|
| user = update.effective_user |
| if user and user.id in allowed_ids: |
| return True |
|
|
| message = update.effective_message |
| if message: |
| await message.reply_text("您没有权限使用此机器人。") |
|
|
| logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown") |
| return False |