File size: 11,910 Bytes
35e7795 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 | #!/usr/bin/env python
"""数据库备份脚本
支持手动备份和定期自动备份。
可以配置备份频率、保留天数、压缩等选项。
"""
import argparse
import gzip
import shutil
import sys
from datetime import datetime, timedelta
from pathlib import Path
import logging
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from qa_annotate.database.base import DB_PATH, DB_DIR # noqa: E402
# 配置日志
def setup_logging(log_file=None, log_level=logging.INFO):
"""设置日志配置"""
log_format = "%(asctime)s - %(levelname)s - %(message)s"
date_format = "%Y-%m-%d %H:%M:%S"
handlers = [logging.StreamHandler(sys.stdout)]
if log_file:
handlers.append(logging.FileHandler(log_file, encoding="utf-8"))
logging.basicConfig(
level=log_level, format=log_format, datefmt=date_format, handlers=handlers
)
def backup_database(
backup_dir=None, compress=True, max_backups=30, backup_prefix="annotations_backup"
):
"""备份数据库
Args:
backup_dir: 备份目录路径(如果为None,则使用 data/backups)
compress: 是否压缩备份文件
max_backups: 保留的最大备份数量(超过此数量会删除最旧的备份)
backup_prefix: 备份文件前缀
Returns:
Path: 备份文件路径,如果失败则返回None
"""
try:
# 检查源数据库文件是否存在
if not DB_PATH.exists():
logging.error(f"数据库文件不存在: {DB_PATH}")
return None
# 确定备份目录
if backup_dir is None:
backup_dir = DB_DIR / "backups"
else:
backup_dir = Path(backup_dir)
# 创建备份目录
backup_dir.mkdir(parents=True, exist_ok=True)
# 生成备份文件名(包含时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_filename = f"{backup_prefix}_{timestamp}.db"
if compress:
backup_filename += ".gz"
backup_path = backup_dir / backup_filename
# 执行备份
logging.info(f"开始备份数据库: {DB_PATH} -> {backup_path}")
if compress:
# 使用 gzip 压缩备份
with open(DB_PATH, "rb") as f_in:
with gzip.open(backup_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
logging.info(f"数据库已压缩备份到: {backup_path}")
else:
# 直接复制文件
shutil.copy2(DB_PATH, backup_path)
logging.info(f"数据库已备份到: {backup_path}")
# 获取备份文件大小
backup_size = backup_path.stat().st_size
size_mb = backup_size / (1024 * 1024)
logging.info(f"备份文件大小: {size_mb:.2f} MB")
# 清理旧备份
cleanup_old_backups(backup_dir, max_backups, backup_prefix, compress)
return backup_path
except Exception as e:
logging.error(f"备份失败: {str(e)}", exc_info=True)
return None
def cleanup_old_backups(backup_dir, max_backups, backup_prefix, compress):
"""清理旧的备份文件
Args:
backup_dir: 备份目录
max_backups: 保留的最大备份数量
backup_prefix: 备份文件前缀
compress: 是否压缩(用于匹配文件扩展名)
"""
try:
# 获取所有备份文件
pattern = f"{backup_prefix}_*.db"
if compress:
pattern += ".gz"
backup_files = sorted(
backup_dir.glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True
)
# 如果备份数量超过限制,删除最旧的
if len(backup_files) > max_backups:
files_to_delete = backup_files[max_backups:]
total_size = 0
for file_path in files_to_delete:
file_size = file_path.stat().st_size
total_size += file_size
file_path.unlink()
logging.info(f"已删除旧备份: {file_path.name}")
size_mb = total_size / (1024 * 1024)
logging.info(
f"已清理 {len(files_to_delete)} 个旧备份,释放空间: {size_mb:.2f} MB"
)
except Exception as e:
logging.warning(f"清理旧备份时出错: {str(e)}")
def cleanup_backups_by_age(backup_dir, days_to_keep, backup_prefix, compress):
"""根据保留天数清理备份文件
Args:
backup_dir: 备份目录
days_to_keep: 保留天数
backup_prefix: 备份文件前缀
compress: 是否压缩(用于匹配文件,但函数会同时处理压缩和非压缩文件)
"""
try:
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
# 获取所有备份文件(包括压缩和非压缩的)
patterns = [
f"{backup_prefix}_*.db.gz", # 压缩备份
f"{backup_prefix}_*.db", # 非压缩备份
]
backup_files = []
for pattern in patterns:
backup_files.extend(backup_dir.glob(pattern))
deleted_count = 0
total_size = 0
for file_path in backup_files:
# 从文件名中提取时间戳
try:
# 文件名格式: backup_prefix_YYYYMMDD_HHMMSS.db[.gz]
name = file_path.name
# 去掉扩展名
if name.endswith(".gz"):
name = name[:-3] # 去掉 .gz
if name.endswith(".db"):
name = name[:-3] # 去掉 .db
# 提取时间戳部分
timestamp_str = name.replace(f"{backup_prefix}_", "")
file_date = datetime.strptime(timestamp_str, "%Y%m%d_%H%M%S")
if file_date < cutoff_date:
file_size = file_path.stat().st_size
total_size += file_size
file_path.unlink()
deleted_count += 1
logging.info(
f"已删除过期备份: {file_path.name} (创建于 {file_date.strftime('%Y-%m-%d %H:%M:%S')})"
)
except (ValueError, IndexError):
# 如果无法解析文件名,跳过
logging.warning(f"无法解析备份文件名: {file_path.name}, 跳过")
continue
if deleted_count > 0:
size_mb = total_size / (1024 * 1024)
logging.info(
f"已清理 {deleted_count} 个过期备份,释放空间: {size_mb:.2f} MB"
)
except Exception as e:
logging.warning(f"按日期清理备份时出错: {str(e)}")
def run_scheduled_backup(
interval_hours=24,
backup_dir=None,
compress=True,
max_backups=30,
backup_prefix="annotations_backup",
log_file=None,
):
"""运行定期备份任务
Args:
interval_hours: 备份间隔(小时)
backup_dir: 备份目录
compress: 是否压缩
max_backups: 保留的最大备份数量
backup_prefix: 备份文件前缀
log_file: 日志文件路径
"""
try:
import schedule
import time
except ImportError:
logging.error("需要安装 schedule 库才能使用定期备份功能")
logging.error("请运行: pip install schedule")
return
setup_logging(log_file=log_file)
logging.info("=" * 60)
logging.info("数据库定期备份服务启动")
logging.info(f"备份间隔: 每 {interval_hours} 小时")
logging.info(f"备份目录: {backup_dir or (DB_DIR / 'backups')}")
logging.info(f"压缩备份: {compress}")
logging.info(f"最大备份数: {max_backups}")
logging.info("=" * 60)
# 立即执行一次备份
backup_database(backup_dir, compress, max_backups, backup_prefix)
# 设置定期任务
schedule.every(interval_hours).hours.do(
backup_database,
backup_dir=backup_dir,
compress=compress,
max_backups=max_backups,
backup_prefix=backup_prefix,
)
# 运行调度器
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
logging.info("\n定期备份服务已停止")
def main():
"""主函数"""
parser = argparse.ArgumentParser(
description="数据库备份工具",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 手动执行一次备份
python scripts/backup_db.py
# 指定备份目录
python scripts/backup_db.py --backup-dir /path/to/backups
# 不压缩备份
python scripts/backup_db.py --no-compress
# 设置保留备份数量
python scripts/backup_db.py --max-backups 10
# 启动定期备份服务(每24小时备份一次)
python scripts/backup_db.py --schedule --interval 24
# 启动定期备份服务(每12小时备份一次)
python scripts/backup_db.py --schedule --interval 12
""",
)
parser.add_argument(
"--backup-dir",
"-d",
type=str,
default=None,
help="备份目录路径(默认: data/backups)",
)
parser.add_argument("--no-compress", action="store_true", help="不压缩备份文件")
parser.add_argument(
"--max-backups",
"-n",
type=int,
default=30,
help="保留的最大备份数量(默认: 30)",
)
parser.add_argument(
"--backup-prefix",
type=str,
default="annotations_backup",
help="备份文件前缀(默认: annotations_backup)",
)
parser.add_argument(
"--schedule", "-s", action="store_true", help="启动定期备份服务"
)
parser.add_argument(
"--interval", "-i", type=int, default=24, help="定期备份间隔(小时,默认: 24)"
)
parser.add_argument(
"--log-file", type=str, default=None, help="日志文件路径(默认: 输出到控制台)"
)
parser.add_argument(
"--log-level",
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
help="日志级别(默认: INFO)",
)
parser.add_argument(
"--cleanup-days",
type=int,
default=None,
help="清理指定天数之前的备份(可选,与 --max-backups 配合使用)",
)
args = parser.parse_args()
# 设置日志
log_level = getattr(logging, args.log_level)
setup_logging(log_file=args.log_file, log_level=log_level)
# 如果启用定期备份
if args.schedule:
run_scheduled_backup(
interval_hours=args.interval,
backup_dir=args.backup_dir,
compress=not args.no_compress,
max_backups=args.max_backups,
backup_prefix=args.backup_prefix,
log_file=args.log_file,
)
else:
# 执行单次备份
backup_path = backup_database(
backup_dir=args.backup_dir,
compress=not args.no_compress,
max_backups=args.max_backups,
backup_prefix=args.backup_prefix,
)
# 如果指定了清理天数,执行按日期清理
if args.cleanup_days:
backup_dir = args.backup_dir or (DB_DIR / "backups")
cleanup_backups_by_age(
backup_dir=backup_dir,
days_to_keep=args.cleanup_days,
backup_prefix=args.backup_prefix,
compress=not args.no_compress,
)
if backup_path:
logging.info("备份完成!")
sys.exit(0)
else:
logging.error("备份失败!")
sys.exit(1)
if __name__ == "__main__":
main()
|