Spaces:
Runtime error
Runtime error
| import asyncio | |
| import json | |
| from discord.ext import commands | |
| import discord | |
| import os | |
| import ssl | |
| import sys | |
| import traceback | |
| import aiohttp | |
| import logging | |
| import threading | |
| import webbrowser | |
| import time | |
| import requests | |
| from flask import Flask, render_template, request, jsonify, redirect, url_for, send_from_directory, session | |
| from werkzeug.utils import secure_filename | |
| from PIL import Image, ImageDraw, ImageFont | |
| import io | |
| import shutil | |
| from utils.config import config | |
| from functools import wraps | |
| # 配置Flask应用 | |
| app = Flask(__name__, | |
| static_folder='static', | |
| template_folder='templates') | |
| app.secret_key = os.urandom(24) # 设置session密钥 | |
| # 登录验证装饰器 | |
| def login_required(f): | |
| def decorated_function(*args, **kwargs): | |
| if not session.get('logged_in'): | |
| return redirect(url_for('login')) | |
| return f(*args, **kwargs) | |
| return decorated_function | |
| # 配置日志级别,过滤掉 google_genai.models 的 INFO 级别日志 | |
| logging.getLogger('google_genai.models').setLevel(logging.WARNING) | |
| # Disable SSL verification (only for debugging) | |
| ssl_context = ssl.create_default_context() | |
| ssl_context.check_hostname = False | |
| ssl_context.verify_mode = ssl.CERT_NONE | |
| intents = discord.Intents.default() | |
| intents.message_content = True | |
| intents.members = True | |
| bot = commands.Bot(command_prefix=".", intents=intents) | |
| # 创建默认头像 | |
| def create_default_avatar(): | |
| avatar_path = os.path.join('static', 'img', 'default-avatar.png') | |
| # 如果已存在,则不需要创建 | |
| if os.path.exists(avatar_path): | |
| return | |
| # 确保目录存在 | |
| os.makedirs(os.path.dirname(avatar_path), exist_ok=True) | |
| # 创建简单的默认头像 | |
| img = Image.new('RGB', (200, 200), color=(53, 102, 220)) | |
| d = ImageDraw.Draw(img) | |
| # 添加文字 | |
| try: | |
| # 尝试使用系统字体 | |
| font = ImageFont.truetype("arial.ttf", 60) | |
| except: | |
| # 如果没有系统字体,使用默认字体 | |
| font = ImageFont.load_default() | |
| d.text((70, 70), "Bot", fill=(255, 255, 255), font=font) | |
| # 保存图片 | |
| img.save(avatar_path) | |
| # Flask路由 | |
| def login(): | |
| if session.get('logged_in'): | |
| return redirect(url_for('index')) | |
| return render_template('login.html') | |
| def api_login(): | |
| data = request.json | |
| password = data.get('password') | |
| # 直接从config.json读取最新密码 | |
| try: | |
| with open('config.json', 'r', encoding='utf-8') as f: | |
| config_data = json.load(f) | |
| config_password = config_data.get('password') | |
| except: | |
| config_password = None | |
| if not config_password: | |
| # 如果配置中没有密码,设置默认密码为 "admin" | |
| config_password = "admin" | |
| config.write("password", config_password) | |
| if password == config_password: | |
| session['logged_in'] = True | |
| return jsonify({"status": "success"}) | |
| return jsonify({"status": "error", "message": "密码错误"}) | |
| def logout(): | |
| session.pop('logged_in', None) | |
| return redirect(url_for('login')) | |
| def index(): | |
| return render_template('index.html') | |
| def config_page(): | |
| return render_template('config.html') | |
| def presets_page(): | |
| presets = os.listdir('agent/presets') | |
| return render_template('presets.html', presets=presets) | |
| def get_config(): | |
| with open('config.json', 'r', encoding='utf-8') as f: | |
| config_data = json.load(f) | |
| return jsonify(config_data) | |
| def update_config(): | |
| config_data = request.json | |
| # 检查密码是否被修改 | |
| current_password = config.get('password') | |
| password_changed = current_password != config_data.get('password') | |
| # 保存配置 | |
| with open('config.json', 'w', encoding='utf-8') as f: | |
| json.dump(config_data, f, indent=4, ensure_ascii=False) | |
| # 如果密码被修改,返回特殊状态码 | |
| if password_changed: | |
| return jsonify({"status": "password_changed"}) | |
| return jsonify({"status": "success"}) | |
| def get_presets(): | |
| presets = os.listdir('agent/presets') | |
| presets_data = {} | |
| for preset_name in presets: | |
| preset_path = os.path.join('agent/presets', preset_name) | |
| if os.path.isdir(preset_path): | |
| presets_data[preset_name] = {} | |
| for file in os.listdir(preset_path): | |
| if file.endswith('.json'): | |
| with open(os.path.join(preset_path, file), 'r', encoding='utf-8') as f: | |
| presets_data[preset_name][file] = json.load(f) | |
| return jsonify(presets_data) | |
| def get_preset(preset_name): | |
| preset_path = os.path.join('agent/presets', preset_name) | |
| if not os.path.exists(preset_path): | |
| return jsonify({"status": "error", "message": "Preset not found"}), 404 | |
| preset_data = {} | |
| for file in os.listdir(preset_path): | |
| if file.endswith('.json'): | |
| with open(os.path.join(preset_path, file), 'r', encoding='utf-8') as f: | |
| preset_data[file] = json.load(f) | |
| # 同时尝试加载 role_preset.json | |
| role_preset_file = os.path.join(preset_path, 'role_preset.json') | |
| if os.path.exists(role_preset_file): | |
| with open(role_preset_file, 'r', encoding='utf-8') as f: | |
| try: | |
| preset_data['role_preset.json'] = json.load(f) | |
| except json.JSONDecodeError: | |
| preset_data['role_preset.json'] = {"bot_role": ""} # 如果文件为空或无效,提供默认值 | |
| else: | |
| preset_data['role_preset.json'] = {"bot_role": ""} # 如果文件不存在,提供默认值 | |
| return jsonify(preset_data) | |
| def update_preset(preset_name): | |
| preset_data = request.json | |
| preset_path = os.path.join('agent/presets', preset_name) | |
| if not os.path.exists(preset_path): | |
| return jsonify({"status": "error", "message": "Preset not found"}), 404 | |
| for file_name, data in preset_data.items(): | |
| # 确保只写入以.json结尾的文件 | |
| if file_name.endswith('.json'): | |
| with open(os.path.join(preset_path, file_name), 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=4, ensure_ascii=False) | |
| else: | |
| # 可以选择记录一个警告或忽略非json文件 | |
| print(f"Warning: Skipped writing non-JSON file '{file_name}' during preset update.") | |
| # 单独处理 role_preset.json 的保存 | |
| role_data = preset_data.get('role_preset.json') | |
| if role_data is not None: # 检查是否存在,即使内容为空也要保存 | |
| with open(os.path.join(preset_path, 'role_preset.json'), 'w', encoding='utf-8') as f: | |
| json.dump(role_data, f, indent=4, ensure_ascii=False) | |
| return jsonify({"status": "success"}) | |
| def delete_preset(preset_name): | |
| preset_path = os.path.join('agent/presets', preset_name) | |
| if not os.path.exists(preset_path): | |
| return jsonify({"status": "error", "message": "Preset not found"}), 404 | |
| shutil.rmtree(preset_path) | |
| return jsonify({"status": "success"}) | |
| def create_preset(): | |
| data = request.json | |
| preset_name = data.get('name') | |
| template_preset = data.get('template', 'default') | |
| if not preset_name: | |
| return jsonify({"status": "error", "message": "Preset name is required"}), 400 | |
| new_preset_path = os.path.join('agent/presets', preset_name) | |
| if os.path.exists(new_preset_path): | |
| return jsonify({"status": "error", "message": "Preset already exists"}), 400 | |
| # 复制模板预设 | |
| template_path = os.path.join('agent/presets', template_preset) | |
| if not os.path.exists(template_path): | |
| return jsonify({"status": "error", "message": "Template preset not found"}), 404 | |
| shutil.copytree(template_path, new_preset_path) | |
| # 确保也复制了 role_preset.json (如果模板中有的话) | |
| template_role_preset = os.path.join(template_path, 'role_preset.json') | |
| new_role_preset = os.path.join(new_preset_path, 'role_preset.json') | |
| if os.path.exists(template_role_preset): | |
| shutil.copy2(template_role_preset, new_role_preset) | |
| else: | |
| # 如果模板没有,创建一个空的 role_preset.json | |
| with open(new_role_preset, 'w', encoding='utf-8') as f: | |
| json.dump({"bot_role": ""}, f, indent=4, ensure_ascii=False) | |
| return jsonify({"status": "success"}) | |
| def upload_avatar(preset_name): | |
| preset_path = os.path.join('agent/presets', preset_name) | |
| if not os.path.exists(preset_path): | |
| return jsonify({"status": "error", "message": "Preset not found"}), 404 | |
| if 'avatar' not in request.files: | |
| return jsonify({"status": "error", "message": "No file part"}), 400 | |
| file = request.files['avatar'] | |
| if file.filename == '': | |
| return jsonify({"status": "error", "message": "No selected file"}), 400 | |
| # 处理图片 | |
| try: | |
| img = Image.open(file) | |
| # 调整为正方形 | |
| width, height = img.size | |
| size = min(width, height) | |
| left = (width - size) // 2 | |
| top = (height - size) // 2 | |
| right = left + size | |
| bottom = top + size | |
| img = img.crop((left, top, right, bottom)) | |
| # 保存图片 | |
| for old_avatar in os.listdir(preset_path): | |
| if old_avatar.startswith('avatar.'): | |
| os.remove(os.path.join(preset_path, old_avatar)) | |
| # 获取文件扩展名 | |
| ext = os.path.splitext(file.filename)[1].lower() | |
| if ext not in ['.jpg', '.jpeg', '.png', '.gif']: | |
| ext = '.jpg' # 默认使用jpg | |
| img.save(os.path.join(preset_path, f'avatar{ext}')) | |
| return jsonify({"status": "success"}) | |
| except Exception as e: | |
| return jsonify({"status": "error", "message": str(e)}), 500 | |
| def preset_files(filename): | |
| return send_from_directory('agent/presets', filename) | |
| async def on_ready(): | |
| print(f'Bot logged in as {bot.user.name} (ID: {bot.user.id})') | |
| print(f'Bot invite link: https://discord.com/oauth2/authorize?client_id={bot.user.id}&permissions=8&scope=bot') | |
| # Auto sync slash commands | |
| print("Syncing slash commands...") | |
| try: | |
| synced = await bot.tree.sync() | |
| print(f"Synced {len(synced)} command(s).") | |
| except Exception as e: | |
| print(f"Failed to sync commands: {e}") | |
| # 确保所有cog都更新频道配置 | |
| try: | |
| agent_manager = bot.get_cog("AgentManager") | |
| if agent_manager: | |
| print("正在更新所有cog的频道配置...") | |
| agent_manager.reload_chat_channels() # 重新加载配置 | |
| await agent_manager.update_all_cogs_channels() # 更新所有cog的频道配置 | |
| print("所有cog的频道配置已更新") | |
| except Exception as e: | |
| print(f"Failed to update channel configs: {e}") | |
| traceback.print_exc() | |
| print('-' * 50) | |
| # 添加一个帮助调试的命令,用于检查已加载的cog | |
| async def list_cogs(ctx): | |
| loaded_cogs = sorted(list(bot.cogs.keys())) | |
| all_cogs = sorted([f.replace('.py', '') for f in os.listdir('cogs') if f.endswith('.py')]) | |
| not_loaded = [cog for cog in all_cogs if cog not in [c.lower() for c in loaded_cogs]] | |
| await ctx.send(f"**已加载的Cogs ({len(loaded_cogs)}):**\n" | |
| f"{', '.join(loaded_cogs)}\n\n" | |
| f"**未加载的Cogs ({len(not_loaded)}):**\n" | |
| f"{', '.join(not_loaded)}") | |
| # 保持Space活跃的函数 | |
| def keep_alive(): | |
| """定期自我请求以保持Space活跃""" | |
| print("启动保活线程...") | |
| while True: | |
| try: | |
| # 获取SPACE_HOST环境变量,这是HF Space自动提供的 | |
| space_host = os.environ.get('SPACE_HOST') | |
| # 如果没有SPACE_HOST环境变量,说明不是在HF Space上运行 | |
| if not space_host: | |
| # 本地调试环境 | |
| url = "http://127.0.0.1:5000" | |
| else: | |
| # HF Space环境 | |
| url = f"https://{space_host}" | |
| requests.get(url, timeout=10) | |
| print(f"[KeepAlive] 自我请求成功: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| except Exception as e: | |
| print(f"[KeepAlive] 自我请求失败: {e}") | |
| # 每30分钟请求一次,避免Space超时 | |
| time.sleep(1800) | |
| def start_flask(): | |
| # 创建默认头像 | |
| create_default_avatar() | |
| # 启动保活线程 | |
| keep_alive_thread = threading.Thread(target=keep_alive) | |
| keep_alive_thread.daemon = True | |
| keep_alive_thread.start() | |
| # 获取PORT环境变量,默认7860(HF Space默认端口) | |
| # 如果环境变量不存在,就使用5000端口(本地开发环境) | |
| port = int(os.environ.get('PORT', 5000)) | |
| host = '0.0.0.0' if os.environ.get('SPACE_HOST') else '127.0.0.1' | |
| app.run(host=host, port=port) | |
| async def main(): | |
| token = config.get("token") | |
| try: | |
| print("Loading cogs...") | |
| # 先加载AgentManager cog | |
| try: | |
| await bot.load_extension("cogs.agent_manager") | |
| except Exception as e: | |
| print(f"Failed to load AgentManager: {e}") | |
| traceback.print_exc() | |
| # 然后加载其他cog | |
| for file in os.listdir("cogs"): | |
| if file.endswith(".py") and file != "agent_manager.py" and file != "gemini_backup.py": | |
| try: | |
| await bot.load_extension(f"cogs.{file[:-3]}") | |
| except Exception as e: | |
| print(f"Failed to load extension {file}: {e}") | |
| traceback.print_exc() | |
| print("Starting bot...") | |
| # Create custom aiohttp session | |
| connector = aiohttp.TCPConnector(ssl=ssl_context) | |
| session = aiohttp.ClientSession(connector=connector) | |
| # Use custom session | |
| bot.http.session = session | |
| # 启动Flask应用 | |
| flask_thread = threading.Thread(target=start_flask) | |
| flask_thread.daemon = True # 主程序退出时,Flask线程也会退出 | |
| flask_thread.start() | |
| # 是否在Hugging Face Space环境下,不打开浏览器 | |
| if not os.environ.get('SPACE_HOST'): | |
| webbrowser.open('http://127.0.0.1:5000') | |
| await bot.start(token) | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| traceback.print_exc() | |
| finally: | |
| await bot.close() | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("Bot has been shut down.") | |
| except Exception as e: | |
| print(f"Fatal error: {e}") | |
| traceback.print_exc() |