picpocket / cleanup_scheduler.py
chawin.chen
fix
6128c41
"""
定时清理图片文件模块
每小时检查一次IMAGES_DIR目录,删除1小时以前的图片文件
"""
import glob
import os
import time
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from config import logger, IMAGES_DIR, CLEANUP_INTERVAL_HOURS, CLEANUP_AGE_HOURS, BOS_CLEANUP_ENABLED
from utils import delete_file_from_bos
class ImageCleanupScheduler:
"""图片清理定时任务类"""
def __init__(self, images_dir=None, cleanup_hours=None, interval_hours=None):
"""
初始化清理调度器
Args:
images_dir (str): 图片目录路径,默认使用config中的IMAGES_DIR
cleanup_hours (float): 清理时间阈值(小时),默认使用环境变量CLEANUP_AGE_HOURS
interval_hours (float): 定时任务执行间隔(小时),默认使用环境变量CLEANUP_INTERVAL_HOURS
"""
self.images_dir = images_dir or IMAGES_DIR
self.cleanup_hours = cleanup_hours if cleanup_hours is not None else CLEANUP_AGE_HOURS
self.interval_hours = interval_hours if interval_hours is not None else CLEANUP_INTERVAL_HOURS
self.scheduler = BackgroundScheduler()
self.is_running = False
# 确保目录存在
os.makedirs(self.images_dir, exist_ok=True)
logger.info(f"Image cleanup scheduler initialized, monitoring directory: {self.images_dir}, cleanup threshold: {self.cleanup_hours} hours, execution interval: {self.interval_hours} hours")
def cleanup_old_images(self):
"""
清理过期的图片文件
删除超过指定时间的图片文件
"""
try:
current_time = time.time()
cutoff_time = current_time - (self.cleanup_hours * 3600) # 转换为秒
cutoff_datetime = datetime.fromtimestamp(cutoff_time)
# 支持的图片格式
image_extensions = ['*.jpg', '*.jpeg', '*.png', '*.webp', '*.gif', '*.bmp']
deleted_files = []
total_size_deleted = 0
logger.info(f"Starting to clean image directory: {self.images_dir}")
logger.info(f"Cleanup threshold time: {cutoff_datetime.strftime('%Y-%m-%d %H:%M:%S')}")
# 遍历所有图片文件
for extension in image_extensions:
pattern = os.path.join(self.images_dir, extension)
for file_path in glob.glob(pattern):
try:
# 获取文件修改时间
file_mtime = os.path.getmtime(file_path)
# 如果文件时间早于阈值时间,则删除
if file_mtime < cutoff_time:
file_size = os.path.getsize(file_path)
file_time = datetime.fromtimestamp(file_mtime)
# 删除文件
os.remove(file_path)
# 仅在 BOS_CLEANUP_ENABLED 为 True 时删除 BOS 上的文件
if BOS_CLEANUP_ENABLED:
delete_file_from_bos(file_path)
deleted_files.append(os.path.basename(file_path))
total_size_deleted += file_size
logger.info(f"Deleting expired file: {os.path.basename(file_path)} ")
except (OSError, IOError) as e:
logger.error(f"Failed to delete file {os.path.basename(file_path)}: {e}")
continue
logger.info(f"Cleanup completed! Deleted {len(deleted_files)} files, ")
logger.info(f"Deleted file list: {', '.join(deleted_files[:10])}")
else:
logger.info("Cleanup completed! No expired files found to clean")
return {
'success': True,
'deleted_count': len(deleted_files),
'deleted_size': total_size_deleted,
'deleted_files': deleted_files,
'cutoff_time': cutoff_datetime.isoformat()
}
except Exception as e:
error_msg = f"图片清理任务执行失败: {e}"
logger.error(error_msg)
return {
'success': False,
'error': str(e),
'deleted_count': 0,
'deleted_size': 0
}
def _format_size(self, size_bytes):
"""格式化文件大小显示"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB"]
i = 0
while size_bytes >= 1024 and i < len(size_names) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_names[i]}"
def start(self):
"""启动定时清理任务"""
if self.is_running:
logger.warning("Image cleanup scheduler is already running")
return
try:
# 添加定时任务:使用可配置的执行间隔
self.scheduler.add_job(
func=self.cleanup_old_images,
trigger='interval',
hours=self.interval_hours, # 使用环境变量配置的执行间隔
id='image_cleanup',
name='image clean tast',
replace_existing=True
)
# 启动调度器
self.scheduler.start()
self.is_running = True
logger.info(f"Image cleanup scheduler started, will execute cleanup task every {self.interval_hours} hours")
# 立即执行一次清理(可选)
logger.info("Executing image cleanup task immediately...")
self.cleanup_old_images()
except Exception as e:
logger.error(f"Failed to start image cleanup scheduler: {e}")
raise
def stop(self):
"""停止定时清理任务"""
if not self.is_running:
logger.warning("Image cleanup scheduler is not running")
return
try:
self.scheduler.shutdown(wait=False)
self.is_running = False
logger.info("Image cleanup scheduler stopped")
except Exception as e:
logger.error(f"Failed to stop image cleanup scheduler: {e}")
def get_status(self):
"""获取调度器状态"""
return {
'running': self.is_running,
'images_dir': self.images_dir,
'cleanup_hours': self.cleanup_hours,
'interval_hours': self.interval_hours,
'next_run': self.scheduler.get_jobs()[0].next_run_time.isoformat()
if self.is_running and self.scheduler.get_jobs() else None
}
# 创建全局调度器实例
cleanup_scheduler = ImageCleanupScheduler()
def start_cleanup_scheduler():
"""启动图片清理调度器"""
cleanup_scheduler.start()
def stop_cleanup_scheduler():
"""停止图片清理调度器"""
cleanup_scheduler.stop()
def get_cleanup_status():
"""获取清理调度器状态"""
return cleanup_scheduler.get_status()
def manual_cleanup():
"""手动执行一次清理"""
return cleanup_scheduler.cleanup_old_images()
if __name__ == "__main__":
# 测试代码
print("测试图片清理功能...")
test_scheduler = ImageCleanupScheduler()
result = test_scheduler.cleanup_old_images()
print(f"清理结果: {result}")