gencard / bot.py
Jack698's picture
Upload folder using huggingface_hub
f644bf3 verified
from __future__ import annotations
import asyncio
import logging
import os
import re
import random
from typing import Optional
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
)
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# 信用卡生成相关函数
def luhn_checksum(card_number: str) -> int:
"""计算Luhn校验和"""
def digits_of(n):
return [int(d) for d in str(n)]
digits = digits_of(card_number)
odd_digits = digits[-1::-2] # 从右到左,奇数位
even_digits = digits[-2::-2] # 从右到左,偶数位
checksum = 0
checksum += sum(odd_digits)
for d in even_digits:
checksum += sum(digits_of(d * 2))
return checksum % 10
def is_luhn_valid(card_number: str) -> bool:
"""验证卡号是否符合Luhn算法"""
return luhn_checksum(card_number) == 0
def generate_luhn_valid_number(prefix: str, length: int = 16) -> str:
"""生成符合Luhn算法的卡号"""
# 确保前缀是数字字符串
prefix = str(prefix)
# 如果前缀已经达到指定长度,直接验证并返回
if len(prefix) >= length:
if is_luhn_valid(prefix[:length]):
return prefix[:length]
else:
raise ValueError("前缀长度已达到但不符合Luhn算法")
# 生成随机数字填充到长度-1(最后一位是校验位)
remaining_length = length - len(prefix) - 1
random_digits = ''.join([str(random.randint(0, 9)) for _ in range(remaining_length)])
# 组合前缀、随机数字和临时校验位0
card_number = prefix + random_digits + '0'
# 计算正确的校验位
checksum = luhn_checksum(card_number)
correct_check_digit = (10 - checksum) % 10
# 替换最后一位为正确的校验位
valid_card_number = card_number[:-1] + str(correct_check_digit)
return valid_card_number
def parse_and_generate_card(input_string: str) -> dict:
"""解析输入字符串并生成完整的信用卡信息"""
# 更灵活的正则表达式匹配:前缀|月份|年份|CVV
# 前缀可以包含x,CVV部分可以包含x或省略
pattern = r'^(\d+x*)\|(\d{1,2})\|(\d{2,4})(?:\|(x*))?$'
match = re.match(pattern, input_string)
if not match:
raise ValueError("输入格式不正确,应为:前缀|月份|年份|CVV (CVV可选)")
prefix_part = match.group(1)
month = match.group(2)
year = match.group(3)
cvv_part = match.group(4) if match.group(4) else 'xxx' # 默认CVV长度
# 分离前缀中的数字和x
prefix_match = re.match(r'^(\d+)(x*)$', prefix_part)
if not prefix_match:
raise ValueError("前缀格式不正确,应为数字后跟x")
prefix = prefix_match.group(1)
# 确定卡号总长度(通常是16位,但Amex是15位)
if prefix.startswith('34') or prefix.startswith('37'):
total_length = 15 # American Express
else:
total_length = 16 # Visa, Mastercard, Discover等
# 生成完整的卡号
full_card_number = generate_luhn_valid_number(prefix, total_length)
# 格式化月份和年份
month = month.zfill(2)
if len(year) == 2:
# 假设是20xx年
year = '20' + year
# 生成随机CVV(通常是3位,Amex是4位)
if prefix.startswith('34') or prefix.startswith('37'):
cvv_length = 4
else:
cvv_length = 3
cvv = ''.join([str(random.randint(0, 9)) for _ in range(cvv_length)])
return {
'card_number': full_card_number,
'expiry_month': month,
'expiry_year': year,
'cvv': cvv,
'formatted': f"{full_card_number}|{month}|{year}|{cvv}"
}
def is_credit_card_format(text: str) -> bool:
"""检测文本是否为信用卡格式"""
# 检测格式:数字+x|数字|数字|可选的x
pattern = r'^\d+x*\|\d{1,2}\|\d{2,4}(?:\|x*)?$'
return bool(re.match(pattern, text.strip()))
def generate_multiple_cards(input_string: str, count: int = 10) -> list:
"""根据输入格式生成多张信用卡"""
cards = []
for _ in range(count):
try:
card_info = parse_and_generate_card(input_string)
cards.append(card_info)
except ValueError as e:
logger.warning(f"生成信用卡失败: {e}")
continue
return cards
def _parse_allowed_users(raw_value: str) -> set[int]:
"""解析允许的用户ID列表"""
if not raw_value.strip():
return set()
allowed = set()
for token in raw_value.replace(";", ",").split(","):
token = token.strip()
if not token:
continue
try:
allowed.add(int(token))
except ValueError:
logger.warning("Ignoring invalid user id token: %s", token)
return allowed
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理/start命令"""
if not await _ensure_authorized(update, context):
return
await update.message.reply_text(
"欢迎使用信用卡生成机器人!\n\n"
"请发送信用卡格式,我将为您生成10张随机信用卡。\n"
"格式示例:\n"
"• 440393xxxxx|3|27|xxx\n"
"• 424242|11|28\n"
"• 34xxxxx|6|25|xxxx (American Express)\n\n"
"格式说明:前缀|月份|年份|CVV(CVV可选)"
)
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""处理文本消息"""
assert update.message is not None
if not await _ensure_authorized(update, context):
return
text = update.message.text or ""
# 检查是否为信用卡格式
if is_credit_card_format(text):
await handle_credit_card_request(update, text)
else:
await update.message.reply_text(
"格式不正确!请使用以下格式:\n"
"• 440393xxxxx|3|27|xxx\n"
"• 424242|11|28\n\n"
"格式说明:前缀|月份|年份|CVV(CVV可选)"
)
async def handle_credit_card_request(update: Update, input_format: str) -> None:
"""处理信用卡生成请求"""
try:
# 生成10张信用卡
cards = generate_multiple_cards(input_format, 10)
if not cards:
await update.message.reply_text("生成信用卡失败,请检查输入格式。")
return
# 格式化输出
response_lines = [f"🎉 成功生成 {len(cards)} 张信用卡:\n"]
for i, card in enumerate(cards, 1):
response_lines.append(
f"{i}. {card['formatted']}"
)
# 分批发送消息,避免消息过长
batch_size = 5
for i in range(0, len(response_lines), batch_size * 2): # 每批包含标题+5张卡
batch = response_lines[i:i + batch_size * 2]
await update.message.reply_text("\n".join(batch))
except Exception as e:
logger.exception("处理信用卡请求时出错")
await update.message.reply_text(f"生成信用卡时出错:{str(e)}")
def build_application():
"""构建Telegram应用"""
load_dotenv()
bot_token = os.getenv("BOT_TOKEN")
if not bot_token:
raise RuntimeError("BOT_TOKEN is not set")
# Create the application builder
application_builder = ApplicationBuilder().token(bot_token)
# Check for custom API URL (for proxy)
custom_api_url = os.getenv("TELEGRAM_API_URL", "").strip()
if custom_api_url:
logger.info(f"Using custom Telegram API URL: {custom_api_url}")
api_base = f"{custom_api_url}/bot"
file_base = f"{custom_api_url}/file/bot"
logger.info(f"Setting API base to: {api_base}")
logger.info(f"Setting file base to: {file_base}")
application_builder = application_builder.base_url(api_base).base_file_url(file_base)
else:
logger.info("Using default Telegram API URL")
allowed_ids = _parse_allowed_users(os.getenv("ALLOWED_USER_IDS", ""))
application = application_builder.build()
# Store the custom API URL in the application's bot_data
if custom_api_url:
application.bot_data["custom_api_url"] = custom_api_url
application.bot_data["allowed_ids"] = allowed_ids
application.add_handler(CommandHandler("start", start))
application.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_text))
return application
def main() -> None:
"""主函数"""
application = build_application()
application.run_polling()
if __name__ == "__main__":
main()
async def _ensure_authorized(update: Update, context: ContextTypes.DEFAULT_TYPE) -> bool:
"""确保用户有权限使用机器人"""
allowed_ids: set[int] = context.application.bot_data.get("allowed_ids", set())
if not allowed_ids:
return True
user = update.effective_user
if user and user.id in allowed_ids:
return True
message = update.effective_message
if message:
await message.reply_text("您没有权限使用此机器人。")
logger.info("Unauthorized access attempt by user id %s", user.id if user else "unknown")
return False