Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import os | |
| import re | |
| import random | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from telegram import Update | |
| from telegram.ext import ( | |
| ApplicationBuilder, | |
| CommandHandler, | |
| ContextTypes, | |
| MessageHandler, | |
| filters, | |
| ) | |
| logging.basicConfig( | |
| format="%(asctime)s %(levelname)s %(name)s %(message)s", | |
| level=logging.INFO, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # 信用卡生成相关函数 | |
| def luhn_checksum(card_number: str) -> int: | |
| """计算Luhn校验和""" | |
| def digits_of(n): | |
| return [int(d) for d in str(n)] | |
| digits = digits_of(card_number) | |
| odd_digits = digits[-1::-2] # 从右到左,奇数位 | |
| even_digits = digits[-2::-2] # 从右到左,偶数位 | |
| checksum = 0 | |
| checksum += sum(odd_digits) | |
| for d in even_digits: | |
| checksum += sum(digits_of(d * 2)) | |
| return checksum % 10 | |
| def is_luhn_valid(card_number: str) -> bool: | |
| """验证卡号是否符合Luhn算法""" | |
| return luhn_checksum(card_number) == 0 | |
| def generate_luhn_valid_number(prefix: str, length: int = 16) -> str: | |
| """生成符合Luhn算法的卡号""" | |
| # 确保前缀是数字字符串 | |
| prefix = str(prefix) | |
| # 如果前缀已经达到指定长度,直接验证并返回 | |
| if len(prefix) >= length: | |
| if is_luhn_valid(prefix[:length]): | |
| return prefix[:length] | |
| else: | |
| raise ValueError("前缀长度已达到但不符合Luhn算法") | |
| # 生成随机数字填充到长度-1(最后一位是校验位) | |
| remaining_length = length - len(prefix) - 1 | |
| random_digits = ''.join([str(random.randint(0, 9)) for _ in range(remaining_length)]) | |
| # 组合前缀、随机数字和临时校验位0 | |
| card_number = prefix + random_digits + '0' | |
| # 计算正确的校验位 | |
| checksum = luhn_checksum(card_number) | |
| correct_check_digit = (10 - checksum) % 10 | |
| # 替换最后一位为正确的校验位 | |
| valid_card_number = card_number[:-1] + str(correct_check_digit) | |
| return valid_card_number | |
| def parse_and_generate_card(input_string: str) -> dict: | |
| """解析输入字符串并生成完整的信用卡信息""" | |
| # 更灵活的正则表达式匹配:前缀|月份|年份|CVV | |
| # 前缀可以包含x,CVV部分可以包含x或省略 | |
| pattern = r'^(\d+x*)\|(\d{1,2})\|(\d{2,4})(?:\|(x*))?$' | |
| match = re.match(pattern, input_string) | |
| if not match: | |
| raise ValueError("输入格式不正确,应为:前缀|月份|年份|CVV (CVV可选)") | |
| prefix_part = match.group(1) | |
| month = match.group(2) | |
| year = match.group(3) | |
| cvv_part = match.group(4) if match.group(4) else 'xxx' # 默认CVV长度 | |
| # 分离前缀中的数字和x | |
| prefix_match = re.match(r'^(\d+)(x*)$', prefix_part) | |
| if not prefix_match: | |
| raise ValueError("前缀格式不正确,应为数字后跟x") | |
| prefix = prefix_match.group(1) | |
| # 确定卡号总长度(通常是16位,但Amex是15位) | |
| if prefix.startswith('34') or prefix.startswith('37'): | |
| total_length = 15 # American Express | |
| else: | |
| total_length = 16 # Visa, Mastercard, Discover等 | |
| # 生成完整的卡号 | |
| full_card_number = generate_luhn_valid_number(prefix, total_length) | |
| # 格式化月份和年份 | |
| month = month.zfill(2) | |
| if len(year) == 2: | |
| # 假设是20xx年 | |
| year = '20' + year | |
| # 生成随机CVV(通常是3位,Amex是4位) | |
| if prefix.startswith('34') or prefix.startswith('37'): | |
| cvv_length = 4 | |
| else: | |
| cvv_length = 3 | |
| cvv = ''.join([str(random.randint(0, 9)) for _ in range(cvv_length)]) | |
| return { | |
| 'card_number': full_card_number, | |
| 'expiry_month': month, | |
| 'expiry_year': year, | |
| 'cvv': cvv, | |
| 'formatted': f"{full_card_number}|{month}|{year}|{cvv}" | |
| } | |
| def is_credit_card_format(text: str) -> bool: | |
| """检测文本是否为信用卡格式""" | |
| # 检测格式:数字+x|数字|数字|可选的x | |
| pattern = r'^\d+x*\|\d{1,2}\|\d{2,4}(?:\|x*)?$' | |
| return bool(re.match(pattern, text.strip())) | |
| def generate_multiple_cards(input_string: str, count: int = 10) -> list: | |
| """根据输入格式生成多张信用卡""" | |
| cards = [] | |
| for _ in range(count): | |
| try: | |
| card_info = parse_and_generate_card(input_string) | |
| cards.append(card_info) | |
| except ValueError as e: | |
| logger.warning(f"生成信用卡失败: {e}") | |
| continue | |
| return cards | |
| def _parse_allowed_users(raw_value: str) -> set[int]: | |
| """解析允许的用户ID列表""" | |
| if not raw_value.strip(): | |
| return set() | |
| allowed = set() | |
| for token in raw_value.replace(";", ",").split(","): | |
| token = token.strip() | |
| if not token: | |
| continue | |
| try: | |
| allowed.add(int(token)) | |
| except ValueError: | |
| logger.warning("Ignoring invalid user id token: %s", token) | |
| return allowed | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| """处理/start命令""" | |
| if not await _ensure_authorized(update, context): | |
| return | |
| await update.message.reply_text( | |
| "欢迎使用信用卡生成机器人!\n\n" | |
| "请发送信用卡格式,我将为您生成10张随机信用卡。\n" | |
| "格式示例:\n" | |
| "• 440393xxxxx|3|27|xxx\n" | |
| "• 424242|11|28\n" | |
| "• 34xxxxx|6|25|xxxx (American Express)\n\n" | |
| "格式说明:前缀|月份|年份|CVV(CVV可选)" | |
| ) | |
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
| """处理文本消息""" | |
| assert update.message is not None | |
| if not await _ensure_authorized(update, context): | |
| return | |
| text = update.message.text or "" | |
| # 检查是否为信用卡格式 | |
| if is_credit_card_format(text): | |
| await handle_credit_card_request(update, text) | |
| else: | |
| await update.message.reply_text( | |
| "格式不正确!请使用以下格式:\n" | |
| "• 440393xxxxx|3|27|xxx\n" | |
| "• 424242|11|28\n\n" | |
| "格式说明:前缀|月份|年份|CVV(CVV可选)" | |
| ) | |
| async def handle_credit_card_request(update: Update, input_format: str) -> None: | |
| """处理信用卡生成请求""" | |
| try: | |
| # 生成10张信用卡 | |
| cards = generate_multiple_cards(input_format, 10) | |
| if not cards: | |
| await update.message.reply_text("生成信用卡失败,请检查输入格式。") | |
| return | |
| # 格式化输出 | |
| response_lines = [f"🎉 成功生成 {len(cards)} 张信用卡:\n"] | |
| for i, card in enumerate(cards, 1): | |
| response_lines.append( | |
| f"{i}. {card['formatted']}" | |
| ) | |
| # 分批发送消息,避免消息过长 | |
| batch_size = 5 | |
| for i in range(0, len(response_lines), batch_size * 2): # 每批包含标题+5张卡 | |
| batch = response_lines[i:i + batch_size * 2] | |
| await update.message.reply_text("\n".join(batch)) | |
| except Exception as e: | |
| logger.exception("处理信用卡请求时出错") | |
| await update.message.reply_text(f"生成信用卡时出错:{str(e)}") | |
| def build_application(): | |
| """构建Telegram应用""" | |
| load_dotenv() | |
| bot_token = os.getenv("BOT_TOKEN") | |
| if not bot_token: | |
| raise RuntimeError("BOT_TOKEN is not set") | |
| # Create the application builder | |
| application_builder = ApplicationBuilder().token(bot_token) | |
| # Check for custom API URL (for proxy) | |
| custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip() | |
| # custom_api_url = "https://broken-flower-fcad.myteleproxy.workers.dev/telegram-api" | |
| if custom_api_url: | |
| logger.info(f"Using custom Telegram API URL: {custom_api_url}") | |
| api_base = f"{custom_api_url}/bot" | |
| file_base = f"{custom_api_url}/file/bot" | |
| logger.info(f"Setting API base to: {api_base}") | |
| logger.info(f"Setting file base to: {file_base}") | |
| application_builder = application_builder.base_url(api_base).base_file_url(file_base) | |
| else: | |
| logger.info("Using default Telegram API URL") | |
| allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", "")) | |
| application = application_builder.build() | |
| # Store the custom API URL in the application's bot_data | |
| if custom_api_url: | |
| application.bot_data["custom_api_url"] = custom_api_url | |
| application.bot_data["allowed_ids"] = allowed_ids | |
| application.add_handler(CommandHandler("start", start)) | |
| application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text)) | |
| return application | |
| def main() -> None: | |
| """主函数""" | |
| application = build_application() | |
| application.run_polling() | |
| if __name__ == "__main__": | |
| main() | |
| async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool: | |
| """确保用户有权限使用机器人""" | |
| allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set()) | |
| if not allowed_ids: | |
| return True | |
| user = update.effective_user | |
| if user and user.id in allowed_ids: | |
| return True | |
| message = update.effective_message | |
| if message: | |
| await message.reply_text("您没有权限使用此机器人。") | |
| logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown") | |
| return False |