from __future__ import annotations import asyncio import logging import os import re import random from typing import Optional from dotenv import load_dotenv from telegram import Update from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ) logging.basicConfig( format="%(asctime)s %(levelname)s %(name)s %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # 信用卡生成相关函数 def luhn_checksum(card_number: str) -> int: """计算Luhn校验和""" def digits_of(n): return [int(d) for d in str(n)] digits = digits_of(card_number) odd_digits = digits[-1::-2] # 从右到左,奇数位 even_digits = digits[-2::-2] # 从右到左,偶数位 checksum = 0 checksum += sum(odd_digits) for d in even_digits: checksum += sum(digits_of(d * 2)) return checksum % 10 def is_luhn_valid(card_number: str) -> bool: """验证卡号是否符合Luhn算法""" return luhn_checksum(card_number) == 0 def generate_luhn_valid_number(prefix: str, length: int = 16) -> str: """生成符合Luhn算法的卡号""" # 确保前缀是数字字符串 prefix = str(prefix) # 如果前缀已经达到指定长度,直接验证并返回 if len(prefix) >= length: if is_luhn_valid(prefix[:length]): return prefix[:length] else: raise ValueError("前缀长度已达到但不符合Luhn算法") # 生成随机数字填充到长度-1(最后一位是校验位) remaining_length = length - len(prefix) - 1 random_digits = ''.join([str(random.randint(0, 9)) for _ in range(remaining_length)]) # 组合前缀、随机数字和临时校验位0 card_number = prefix + random_digits + '0' # 计算正确的校验位 checksum = luhn_checksum(card_number) correct_check_digit = (10 - checksum) % 10 # 替换最后一位为正确的校验位 valid_card_number = card_number[:-1] + str(correct_check_digit) return valid_card_number def parse_and_generate_card(input_string: str) -> dict: """解析输入字符串并生成完整的信用卡信息""" # 更灵活的正则表达式匹配:前缀|月份|年份|CVV # 前缀可以包含x,CVV部分可以包含x或省略 pattern = r'^(\d+x*)\|(\d{1,2})\|(\d{2,4})(?:\|(x*))?$' match = re.match(pattern, input_string) if not match: raise ValueError("输入格式不正确,应为:前缀|月份|年份|CVV (CVV可选)") prefix_part = match.group(1) month = match.group(2) year = match.group(3) cvv_part = match.group(4) if match.group(4) else 'xxx' # 默认CVV长度 # 分离前缀中的数字和x prefix_match = re.match(r'^(\d+)(x*)$', prefix_part) if not prefix_match: raise ValueError("前缀格式不正确,应为数字后跟x") prefix = prefix_match.group(1) # 确定卡号总长度(通常是16位,但Amex是15位) if prefix.startswith('34') or prefix.startswith('37'): total_length = 15 # American Express else: total_length = 16 # Visa, Mastercard, Discover等 # 生成完整的卡号 full_card_number = generate_luhn_valid_number(prefix, total_length) # 格式化月份和年份 month = month.zfill(2) if len(year) == 2: # 假设是20xx年 year = '20' + year # 生成随机CVV(通常是3位,Amex是4位) if prefix.startswith('34') or prefix.startswith('37'): cvv_length = 4 else: cvv_length = 3 cvv = ''.join([str(random.randint(0, 9)) for _ in range(cvv_length)]) return { 'card_number': full_card_number, 'expiry_month': month, 'expiry_year': year, 'cvv': cvv, 'formatted': f"{full_card_number}|{month}|{year}|{cvv}" } def is_credit_card_format(text: str) -> bool: """检测文本是否为信用卡格式""" # 检测格式:数字+x|数字|数字|可选的x pattern = r'^\d+x*\|\d{1,2}\|\d{2,4}(?:\|x*)?$' return bool(re.match(pattern, text.strip())) def generate_multiple_cards(input_string: str, count: int = 10) -> list: """根据输入格式生成多张信用卡""" cards = [] for _ in range(count): try: card_info = parse_and_generate_card(input_string) cards.append(card_info) except ValueError as e: logger.warning(f"生成信用卡失败: {e}") continue return cards def _parse_allowed_users(raw_value: str) -> set[int]: """解析允许的用户ID列表""" if not raw_value.strip(): return set() allowed = set() for token in raw_value.replace(";", ",").split(","): token = token.strip() if not token: continue try: allowed.add(int(token)) except ValueError: logger.warning("Ignoring invalid user id token: %s", token) return allowed async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """处理/start命令""" if not await _ensure_authorized(update, context): return await update.message.reply_text( "欢迎使用信用卡生成机器人!\n\n" "请发送信用卡格式,我将为您生成10张随机信用卡。\n" "格式示例:\n" "• 440393xxxxx|3|27|xxx\n" "• 424242|11|28\n" "• 34xxxxx|6|25|xxxx (American Express)\n\n" "格式说明:前缀|月份|年份|CVV(CVV可选)" ) async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """处理文本消息""" assert update.message is not None if not await _ensure_authorized(update, context): return text = update.message.text or "" # 检查是否为信用卡格式 if is_credit_card_format(text): await handle_credit_card_request(update, text) else: await update.message.reply_text( "格式不正确!请使用以下格式:\n" "• 440393xxxxx|3|27|xxx\n" "• 424242|11|28\n\n" "格式说明:前缀|月份|年份|CVV(CVV可选)" ) async def handle_credit_card_request(update: Update, input_format: str) -> None: """处理信用卡生成请求""" try: # 生成10张信用卡 cards = generate_multiple_cards(input_format, 10) if not cards: await update.message.reply_text("生成信用卡失败,请检查输入格式。") return # 格式化输出 response_lines = [f"🎉 成功生成 {len(cards)} 张信用卡:\n"] for i, card in enumerate(cards, 1): response_lines.append( f"{i}. {card['formatted']}" ) # 分批发送消息,避免消息过长 batch_size = 5 for i in range(0, len(response_lines), batch_size * 2): # 每批包含标题+5张卡 batch = response_lines[i:i + batch_size * 2] await update.message.reply_text("\n".join(batch)) except Exception as e: logger.exception("处理信用卡请求时出错") await update.message.reply_text(f"生成信用卡时出错:{str(e)}") def build_application(): """构建Telegram应用""" load_dotenv() bot_token = os.getenv("BOT_TOKEN") if not bot_token: raise RuntimeError("BOT_TOKEN is not set") # Create the application builder application_builder = ApplicationBuilder().token(bot_token) # Check for custom API URL (for proxy) custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip() if custom_api_url: logger.info(f"Using custom Telegram API URL: {custom_api_url}") api_base = f"{custom_api_url}/bot" file_base = f"{custom_api_url}/file/bot" logger.info(f"Setting API base to: {api_base}") logger.info(f"Setting file base to: {file_base}") application_builder = application_builder.base_url(api_base).base_file_url(file_base) else: logger.info("Using default Telegram API URL") allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", "")) application = application_builder.build() # Store the custom API URL in the application's bot_data if custom_api_url: application.bot_data["custom_api_url"] = custom_api_url application.bot_data["allowed_ids"] = allowed_ids application.add_handler(CommandHandler("start", start)) application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) return application def main() -> None: """主函数""" application = build_application() application.run_polling() if __name__ == "__main__": main() async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: """确保用户有权限使用机器人""" allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set()) if not allowed_ids: return True user = update.effective_user if user and user.id in allowed_ids: return True message = update.effective_message if message: await message.reply_text("您没有权限使用此机器人。") logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown") return False