""" Hugging Face Space 保活管理器 每隔5小时自动向配置的URL发送GET请求,防止空间休眠 """ import os import json import time import asyncio import aiohttp import threading from datetime import datetime, timedelta from flask import Flask, render_template, request, jsonify, redirect, url_for from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger app = Flask(__name__) # 数据存储文件 DATA_FILE = "urls.json" LOG_FILE = "logs.json" # 保活间隔(小时) KEEPALIVE_INTERVAL_HOURS = 5 def load_urls(): """加载已保存的URL列表""" if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r', encoding='utf-8') as f: return json.load(f) except: return [] return [] def save_urls(urls): """保存URL列表""" with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(urls, f, ensure_ascii=False, indent=2) def load_logs(): """加载日志""" if os.path.exists(LOG_FILE): try: with open(LOG_FILE, 'r', encoding='utf-8') as f: logs = json.load(f) # 只保留最近100条日志 return logs[-100:] except: return [] return [] def save_logs(logs): """保存日志""" # 只保留最近100条 logs = logs[-100:] with open(LOG_FILE, 'w', encoding='utf-8') as f: json.dump(logs, f, ensure_ascii=False, indent=2) def add_log(url, status, message): """添加一条日志""" logs = load_logs() logs.append({ "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "url": url, "status": status, "message": message }) save_logs(logs) async def ping_url(session, url_data): """异步发送GET请求到指定URL""" url = url_data.get("url", "") name = url_data.get("name", url) try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: status = response.status if status == 200: add_log(name, "success", f"状态码: {status}") return {"url": url, "name": name, "status": "success", "code": status} else: add_log(name, "warning", f"状态码: {status}") return {"url": url, "name": name, "status": "warning", "code": status} except asyncio.TimeoutError: add_log(name, "error", "请求超时") return {"url": url, "name": name, "status": "error", "message": "请求超时"} except Exception as e: add_log(name, "error", str(e)[:100]) return {"url": url, "name": name, "status": "error", "message": str(e)[:100]} async def ping_all_urls(): """异步ping所有URL""" urls = load_urls() if not urls: add_log("系统", "info", "没有配置需要保活的URL") return [] add_log("系统", "info", f"开始保活任务,共 {len(urls)} 个URL") async with aiohttp.ClientSession() as session: tasks = [ping_url(session, url_data) for url_data in urls] results = await asyncio.gather(*tasks) success_count = sum(1 for r in results if r.get("status") == "success") add_log("系统", "info", f"保活任务完成,成功: {success_count}/{len(urls)}") return results def run_keepalive_job(): """运行保活任务的同步包装器""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: loop.run_until_complete(ping_all_urls()) finally: loop.close() # 初始化调度器 scheduler = BackgroundScheduler() scheduler.add_job( run_keepalive_job, trigger=IntervalTrigger(hours=KEEPALIVE_INTERVAL_HOURS), id='keepalive_job', name='URL保活任务', replace_existing=True ) @app.route('/') def index(): """主页""" urls = load_urls() logs = load_logs() # 获取下次执行时间 job = scheduler.get_job('keepalive_job') next_run = job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job and job.next_run_time else "未调度" return render_template('index.html', urls=urls, logs=reversed(logs), # 最新的在前 next_run=next_run, interval=KEEPALIVE_INTERVAL_HOURS) @app.route('/api/urls', methods=['GET']) def get_urls(): """获取所有URL""" return jsonify(load_urls()) @app.route('/api/urls', methods=['POST']) def add_url(): """添加新URL""" data = request.json url = data.get('url', '').strip() name = data.get('name', '').strip() if not url: return jsonify({"error": "URL不能为空"}), 400 # 确保URL有协议前缀 if not url.startswith(('http://', 'https://')): url = 'https://' + url urls = load_urls() # 检查是否已存在 for u in urls: if u.get('url') == url: return jsonify({"error": "URL已存在"}), 400 # 使用URL作为默认名称 if not name: name = url.split('//')[1].split('/')[0] if '//' in url else url urls.append({ "url": url, "name": name, "added_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }) save_urls(urls) add_log(name, "info", "添加到保活列表") return jsonify({"success": True, "message": "URL添加成功"}) @app.route('/api/urls/', methods=['DELETE']) def delete_url(index): """删除URL""" urls = load_urls() if 0 <= index < len(urls): removed = urls.pop(index) save_urls(urls) add_log(removed.get('name', removed.get('url')), "info", "从保活列表移除") return jsonify({"success": True}) return jsonify({"error": "索引无效"}), 400 @app.route('/api/ping', methods=['POST']) def manual_ping(): """手动触发保活""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: results = loop.run_until_complete(ping_all_urls()) finally: loop.close() return jsonify({"success": True, "results": results}) @app.route('/api/ping/', methods=['POST']) def ping_single(index): """ping单个URL""" urls = load_urls() if 0 <= index < len(urls): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: async def ping_one(): async with aiohttp.ClientSession() as session: return await ping_url(session, urls[index]) result = loop.run_until_complete(ping_one()) finally: loop.close() return jsonify({"success": True, "result": result}) return jsonify({"error": "索引无效"}), 400 @app.route('/api/logs', methods=['GET']) def get_logs(): """获取日志""" return jsonify(load_logs()) @app.route('/api/logs', methods=['DELETE']) def clear_logs(): """清空日志""" save_logs([]) return jsonify({"success": True}) @app.route('/api/status', methods=['GET']) def get_status(): """获取系统状态""" job = scheduler.get_job('keepalive_job') urls = load_urls() return jsonify({ "url_count": len(urls), "interval_hours": KEEPALIVE_INTERVAL_HOURS, "next_run": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job and job.next_run_time else None, "scheduler_running": scheduler.running }) if __name__ == '__main__': # 启动调度器 scheduler.start() add_log("系统", "info", "保活管理器启动") # 运行Flask应用 app.run(host='0.0.0.0', port=7860, debug=False)