Spaces:
Running
Running
| """ | |
| Hugging Face Space 保活管理器 | |
| 每隔5小时自动向配置的URL发送GET请求,防止空间休眠 | |
| """ | |
| import os | |
| import json | |
| import time | |
| import asyncio | |
| import aiohttp | |
| import threading | |
| from datetime import datetime, timedelta | |
| from flask import Flask, render_template, request, jsonify, redirect, url_for | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from apscheduler.triggers.interval import IntervalTrigger | |
| app = Flask(__name__) | |
| # 数据存储文件 | |
| DATA_FILE = "urls.json" | |
| LOG_FILE = "logs.json" | |
| # 保活间隔(小时) | |
| KEEPALIVE_INTERVAL_HOURS = 5 | |
| def load_urls(): | |
| """加载已保存的URL列表""" | |
| if os.path.exists(DATA_FILE): | |
| try: | |
| with open(DATA_FILE, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except: | |
| return [] | |
| return [] | |
| def save_urls(urls): | |
| """保存URL列表""" | |
| with open(DATA_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(urls, f, ensure_ascii=False, indent=2) | |
| def load_logs(): | |
| """加载日志""" | |
| if os.path.exists(LOG_FILE): | |
| try: | |
| with open(LOG_FILE, 'r', encoding='utf-8') as f: | |
| logs = json.load(f) | |
| # 只保留最近100条日志 | |
| return logs[-100:] | |
| except: | |
| return [] | |
| return [] | |
| def save_logs(logs): | |
| """保存日志""" | |
| # 只保留最近100条 | |
| logs = logs[-100:] | |
| with open(LOG_FILE, 'w', encoding='utf-8') as f: | |
| json.dump(logs, f, ensure_ascii=False, indent=2) | |
| def add_log(url, status, message): | |
| """添加一条日志""" | |
| logs = load_logs() | |
| logs.append({ | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "url": url, | |
| "status": status, | |
| "message": message | |
| }) | |
| save_logs(logs) | |
| async def ping_url(session, url_data): | |
| """异步发送GET请求到指定URL""" | |
| url = url_data.get("url", "") | |
| name = url_data.get("name", url) | |
| try: | |
| async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response: | |
| status = response.status | |
| if status == 200: | |
| add_log(name, "success", f"状态码: {status}") | |
| return {"url": url, "name": name, "status": "success", "code": status} | |
| else: | |
| add_log(name, "warning", f"状态码: {status}") | |
| return {"url": url, "name": name, "status": "warning", "code": status} | |
| except asyncio.TimeoutError: | |
| add_log(name, "error", "请求超时") | |
| return {"url": url, "name": name, "status": "error", "message": "请求超时"} | |
| except Exception as e: | |
| add_log(name, "error", str(e)[:100]) | |
| return {"url": url, "name": name, "status": "error", "message": str(e)[:100]} | |
| async def ping_all_urls(): | |
| """异步ping所有URL""" | |
| urls = load_urls() | |
| if not urls: | |
| add_log("系统", "info", "没有配置需要保活的URL") | |
| return [] | |
| add_log("系统", "info", f"开始保活任务,共 {len(urls)} 个URL") | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [ping_url(session, url_data) for url_data in urls] | |
| results = await asyncio.gather(*tasks) | |
| success_count = sum(1 for r in results if r.get("status") == "success") | |
| add_log("系统", "info", f"保活任务完成,成功: {success_count}/{len(urls)}") | |
| return results | |
| def run_keepalive_job(): | |
| """运行保活任务的同步包装器""" | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(ping_all_urls()) | |
| finally: | |
| loop.close() | |
| # 初始化调度器 | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job( | |
| run_keepalive_job, | |
| trigger=IntervalTrigger(hours=KEEPALIVE_INTERVAL_HOURS), | |
| id='keepalive_job', | |
| name='URL保活任务', | |
| replace_existing=True | |
| ) | |
| def index(): | |
| """主页""" | |
| urls = load_urls() | |
| logs = load_logs() | |
| # 获取下次执行时间 | |
| job = scheduler.get_job('keepalive_job') | |
| next_run = job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job and job.next_run_time else "未调度" | |
| return render_template('index.html', | |
| urls=urls, | |
| logs=reversed(logs), # 最新的在前 | |
| next_run=next_run, | |
| interval=KEEPALIVE_INTERVAL_HOURS) | |
| def get_urls(): | |
| """获取所有URL""" | |
| return jsonify(load_urls()) | |
| def add_url(): | |
| """添加新URL""" | |
| data = request.json | |
| url = data.get('url', '').strip() | |
| name = data.get('name', '').strip() | |
| if not url: | |
| return jsonify({"error": "URL不能为空"}), 400 | |
| # 确保URL有协议前缀 | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| urls = load_urls() | |
| # 检查是否已存在 | |
| for u in urls: | |
| if u.get('url') == url: | |
| return jsonify({"error": "URL已存在"}), 400 | |
| # 使用URL作为默认名称 | |
| if not name: | |
| name = url.split('//')[1].split('/')[0] if '//' in url else url | |
| urls.append({ | |
| "url": url, | |
| "name": name, | |
| "added_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| }) | |
| save_urls(urls) | |
| add_log(name, "info", "添加到保活列表") | |
| return jsonify({"success": True, "message": "URL添加成功"}) | |
| def delete_url(index): | |
| """删除URL""" | |
| urls = load_urls() | |
| if 0 <= index < len(urls): | |
| removed = urls.pop(index) | |
| save_urls(urls) | |
| add_log(removed.get('name', removed.get('url')), "info", "从保活列表移除") | |
| return jsonify({"success": True}) | |
| return jsonify({"error": "索引无效"}), 400 | |
| def manual_ping(): | |
| """手动触发保活""" | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| results = loop.run_until_complete(ping_all_urls()) | |
| finally: | |
| loop.close() | |
| return jsonify({"success": True, "results": results}) | |
| def ping_single(index): | |
| """ping单个URL""" | |
| urls = load_urls() | |
| if 0 <= index < len(urls): | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| async def ping_one(): | |
| async with aiohttp.ClientSession() as session: | |
| return await ping_url(session, urls[index]) | |
| result = loop.run_until_complete(ping_one()) | |
| finally: | |
| loop.close() | |
| return jsonify({"success": True, "result": result}) | |
| return jsonify({"error": "索引无效"}), 400 | |
| def get_logs(): | |
| """获取日志""" | |
| return jsonify(load_logs()) | |
| def clear_logs(): | |
| """清空日志""" | |
| save_logs([]) | |
| return jsonify({"success": True}) | |
| def get_status(): | |
| """获取系统状态""" | |
| job = scheduler.get_job('keepalive_job') | |
| urls = load_urls() | |
| return jsonify({ | |
| "url_count": len(urls), | |
| "interval_hours": KEEPALIVE_INTERVAL_HOURS, | |
| "next_run": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") if job and job.next_run_time else None, | |
| "scheduler_running": scheduler.running | |
| }) | |
| if __name__ == '__main__': | |
| # 启动调度器 | |
| scheduler.start() | |
| add_log("系统", "info", "保活管理器启动") | |
| # 运行Flask应用 | |
| app.run(host='0.0.0.0', port=7860, debug=False) | |