Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import os | |
| import sys | |
| import datetime | |
| import json | |
| import sqlite3 | |
| import tarfile | |
| from huggingface_hub import HfApi, login | |
| # 强制 stdout/stderr 实时输出(对 cron 日志很重要) | |
| sys.stdout.reconfigure(line_buffering=True) | |
| sys.stderr.reconfigure(line_buffering=True) | |
| # ===== 配置 ===== | |
| DB_PATH = "/app/server/data/freeapi.db" # 数据库路径 | |
| DATASET_REPO = "lydgs/freellm-backup" # 数据集名称 | |
| BACKUP_PREFIX = "freeapi_backup" # 二进制备份前缀 | |
| CONFIG_PREFIX = "config_export" # 配置导出前缀 | |
| RETENTION_DAYS = 30 | |
| # ================ | |
| def export_db_to_json(db_path, temp_dir): | |
| """将数据库中所有表导出为JSON文件,保存在temp_dir下,返回生成的文件列表""" | |
| if not os.path.exists(db_path): | |
| print(f"⚠️ 数据库文件不存在: {db_path}") | |
| return [] | |
| conn = sqlite3.connect(db_path) | |
| cursor = conn.cursor() | |
| # 获取所有用户表,排除系统表 | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'") | |
| tables = cursor.fetchall() | |
| exported_files = [] | |
| for (table_name,) in tables: | |
| cursor.execute(f"SELECT * FROM {table_name}") | |
| rows = cursor.fetchall() | |
| # 获取列名 | |
| col_names = [desc[0] for desc in cursor.description] | |
| # 转换为字典列表 | |
| data = [dict(zip(col_names, row)) for row in rows] | |
| json_path = os.path.join(temp_dir, f"{table_name}.json") | |
| with open(json_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| exported_files.append(json_path) | |
| print(f"📄 导出表 {table_name} 到 {json_path} ({len(data)} 行)") | |
| conn.close() | |
| return exported_files | |
| def backup_database(): | |
| # 打印调试信息:检查环境变量 | |
| print(f"[DEBUG] HF_TOKEN present: {bool(os.getenv('HF_TOKEN'))}") | |
| print(f"[DEBUG] BAILIAN_API_KEY present: {bool(os.getenv('BAILIAN_API_KEY'))}") | |
| print(f"[DEBUG] Database file exists: {os.path.exists(DB_PATH)}") | |
| print(f"[DEBUG] Database file size: {os.path.getsize(DB_PATH) if os.path.exists(DB_PATH) else 'N/A'}") | |
| if not os.path.exists(DB_PATH): | |
| print(f"❌ 数据库文件不存在: {DB_PATH}") | |
| return False | |
| token = os.getenv("HF_TOKEN") | |
| if not token: | |
| print("❌ 环境变量 HF_TOKEN 未设置") | |
| return False | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_name = f"{BACKUP_PREFIX}_{timestamp}.db" | |
| config_tar_name = f"{CONFIG_PREFIX}_{timestamp}.tar.gz" | |
| try: | |
| login(token=token) | |
| print("[DEBUG] Hugging Face login successful") | |
| except Exception as e: | |
| print(f"❌ 登录 Hugging Face 失败: {e}") | |
| return False | |
| api = HfApi() | |
| # 1. 上传二进制备份 | |
| try: | |
| api.upload_file( | |
| path_or_fileobj=DB_PATH, | |
| path_in_repo=backup_name, | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset" | |
| ) | |
| print(f"✅ 二进制备份成功: {backup_name}") | |
| except Exception as e: | |
| print(f"❌ 二进制上传失败: {e}") | |
| # 2. 导出 JSON 配置并打包上传 | |
| try: | |
| temp_export_dir = f"/tmp/config_export_{timestamp}" | |
| os.makedirs(temp_export_dir, exist_ok=True) | |
| export_db_to_json(DB_PATH, temp_export_dir) | |
| # 打包成 tar.gz | |
| tar_path = f"/tmp/{config_tar_name}" | |
| with tarfile.open(tar_path, "w:gz") as tar: | |
| for file in os.listdir(temp_export_dir): | |
| full_path = os.path.join(temp_export_dir, file) | |
| tar.add(full_path, arcname=file) | |
| api.upload_file( | |
| path_or_fileobj=tar_path, | |
| path_in_repo=config_tar_name, | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset" | |
| ) | |
| print(f"✅ 配置导出包上传成功: {config_tar_name}") | |
| # 清理临时文件 | |
| import shutil | |
| shutil.rmtree(temp_export_dir) | |
| os.remove(tar_path) | |
| except Exception as e: | |
| print(f"❌ 配置导出失败: {e}") | |
| # 3. 清理旧备份 | |
| try: | |
| files = api.list_repo_files(repo_id=DATASET_REPO, repo_type="dataset") | |
| now = datetime.datetime.now() | |
| deleted = 0 | |
| for f in files: | |
| if f.startswith(BACKUP_PREFIX) or f.startswith(CONFIG_PREFIX): | |
| # 提取时间戳 | |
| if f.startswith(BACKUP_PREFIX): | |
| ts_part = f.replace(BACKUP_PREFIX + "_", "").replace(".db", "") | |
| else: | |
| ts_part = f.replace(CONFIG_PREFIX + "_", "").replace(".tar.gz", "") | |
| try: | |
| file_time = datetime.datetime.strptime(ts_part, "%Y%m%d_%H%M%S") | |
| if (now - file_time).days > RETENTION_DAYS: | |
| api.delete_file(path_in_repo=f, repo_id=DATASET_REPO, repo_type="dataset") | |
| print(f"🗑️ 已删除旧备份: {f}") | |
| deleted += 1 | |
| except Exception: | |
| pass | |
| if deleted: | |
| print(f"✅ 清理完成,共删除 {deleted} 个旧备份") | |
| else: | |
| print("✅ 没有需要清理的旧备份") | |
| except Exception as e: | |
| print(f"⚠️ 清理旧备份时出错: {e}") | |
| return True | |
| if __name__ == "__main__": | |
| print(f"[{datetime.datetime.now()}] 开始备份及配置导出...") | |
| success = backup_database() | |
| print(f"[DEBUG] backup_database returned: {success}") | |
| if success: | |
| print("备份任务完成") | |
| sys.exit(0) | |
| else: | |
| print("备份任务失败") | |
| sys.exit(1) |