File size: 4,476 Bytes
0ab5288 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
"""
文件处理工具函数
"""
import os
import time
import threading
from typing import List, Set
# 全局变量记录使用过的临时目录
used_temp_dirs: Set[str] = set()
def cleanup_temp_files(temp_dirs=None):
"""
清理指定的临时文件夹中的旧文件
temp_dirs: 要清理的目录列表,如果为None则只清理记录的目录
"""
try:
# 如果没有指定目录,只清理我们记录的目录
if temp_dirs is None:
temp_dirs = []
current_time = time.time()
cleaned_count = 0
for temp_dir in temp_dirs:
try:
# 确保目录存在且是目录
if not os.path.exists(temp_dir) or not os.path.isdir(temp_dir):
continue
# 遍历目录中的文件
for root, dirs, files in os.walk(temp_dir):
for file_name in files:
file_path = os.path.join(root, file_name)
try:
# 检查文件修改时间
file_mtime = os.path.getmtime(file_path)
# 如果文件超过30分钟未修改,则删除
if current_time - file_mtime > 1800: # 1800秒 = 30分钟
# 检查是否是图片文件或临时文件
if any(file_path.lower().endswith(ext) for ext in
['.jpg', '.jpeg', '.png', '.gif', '.webp', '.tmp']):
os.remove(file_path)
cleaned_count += 1
print(f"Cleaned old temp file: {file_path}")
except Exception as e:
print(f"Error cleaning {file_path}: {e}")
# 清理空目录
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
if os.path.exists(dir_path) and not os.listdir(dir_path):
os.rmdir(dir_path)
print(f"Removed empty temp directory: {dir_path}")
except Exception as e:
print(f"Error removing directory {dir_path}: {e}")
except Exception as e:
print(f"Error processing directory {temp_dir}: {e}")
if cleaned_count > 0:
print(f"Cleanup completed: removed {cleaned_count} temporary files")
except Exception as e:
print(f"Error during temp cleanup: {e}")
def start_cleanup_scheduler():
"""
启动定时清理任务
"""
def cleanup_worker():
while True:
try:
# 每30分钟清理一次
time.sleep(1800) # 1800秒 = 30分钟
print("Starting scheduled cleanup...")
# 只清理我们记录的临时目录
cleanup_temp_files(list(used_temp_dirs))
except Exception as e:
print(f"Error in cleanup scheduler: {e}")
# 创建守护线程
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
def handle_file_upload(current_files, new_files):
"""简化的文件上传处理 - 支持单张图片"""
if not new_files:
return [], "No image uploaded - will generate from text only"
# 确保是列表格式
current_list = current_files or []
new_list = new_files if isinstance(new_files, list) else [new_files]
# 合并并限制数量为1张图片
all_files = current_list + [f for f in new_list if f]
if len(all_files) > 1:
all_files = all_files[:1]
message = f"Uploaded 1 image (limit reached)"
else:
message = f"Uploaded {len(all_files)} image"
return all_files, message
def cleanup_temp_file(file_path: str):
"""
清理单个临时文件
"""
try:
if os.path.exists(file_path):
# 记录文件所在的目录
temp_dir = os.path.dirname(file_path)
used_temp_dirs.add(temp_dir)
os.remove(file_path)
except Exception as e:
print(f"Error cleaning temp file {file_path}: {e}")
def get_temp_dirs() -> Set[str]:
"""
获取所有使用过的临时目录
"""
return used_temp_dirs.copy()
|