|
|
""" |
|
|
文件处理工具函数 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import time |
|
|
import threading |
|
|
from typing import List, Set |
|
|
|
|
|
|
|
|
|
|
|
used_temp_dirs: Set[str] = set() |
|
|
|
|
|
|
|
|
def cleanup_temp_files(temp_dirs=None): |
|
|
""" |
|
|
清理指定的临时文件夹中的旧文件 |
|
|
temp_dirs: 要清理的目录列表,如果为None则只清理记录的目录 |
|
|
""" |
|
|
try: |
|
|
|
|
|
if temp_dirs is None: |
|
|
temp_dirs = [] |
|
|
|
|
|
current_time = time.time() |
|
|
cleaned_count = 0 |
|
|
|
|
|
for temp_dir in temp_dirs: |
|
|
try: |
|
|
|
|
|
if not os.path.exists(temp_dir) or not os.path.isdir(temp_dir): |
|
|
continue |
|
|
|
|
|
|
|
|
for root, dirs, files in os.walk(temp_dir): |
|
|
for file_name in files: |
|
|
file_path = os.path.join(root, file_name) |
|
|
try: |
|
|
|
|
|
file_mtime = os.path.getmtime(file_path) |
|
|
|
|
|
if current_time - file_mtime > 1800: |
|
|
|
|
|
if any(file_path.lower().endswith(ext) for ext in |
|
|
['.jpg', '.jpeg', '.png', '.gif', '.webp', '.tmp']): |
|
|
os.remove(file_path) |
|
|
cleaned_count += 1 |
|
|
print(f"Cleaned old temp file: {file_path}") |
|
|
except Exception as e: |
|
|
print(f"Error cleaning {file_path}: {e}") |
|
|
|
|
|
|
|
|
for dir_name in dirs: |
|
|
dir_path = os.path.join(root, dir_name) |
|
|
try: |
|
|
if os.path.exists(dir_path) and not os.listdir(dir_path): |
|
|
os.rmdir(dir_path) |
|
|
print(f"Removed empty temp directory: {dir_path}") |
|
|
except Exception as e: |
|
|
print(f"Error removing directory {dir_path}: {e}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error processing directory {temp_dir}: {e}") |
|
|
|
|
|
if cleaned_count > 0: |
|
|
print(f"Cleanup completed: removed {cleaned_count} temporary files") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error during temp cleanup: {e}") |
|
|
|
|
|
|
|
|
def start_cleanup_scheduler(): |
|
|
""" |
|
|
启动定时清理任务 |
|
|
""" |
|
|
def cleanup_worker(): |
|
|
while True: |
|
|
try: |
|
|
|
|
|
time.sleep(1800) |
|
|
print("Starting scheduled cleanup...") |
|
|
|
|
|
cleanup_temp_files(list(used_temp_dirs)) |
|
|
except Exception as e: |
|
|
print(f"Error in cleanup scheduler: {e}") |
|
|
|
|
|
|
|
|
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) |
|
|
cleanup_thread.start() |
|
|
|
|
|
|
|
|
def handle_file_upload(current_files, new_files): |
|
|
"""简化的文件上传处理 - 支持单张图片""" |
|
|
if not new_files: |
|
|
return [], "No image uploaded - will generate from text only" |
|
|
|
|
|
|
|
|
current_list = current_files or [] |
|
|
new_list = new_files if isinstance(new_files, list) else [new_files] |
|
|
|
|
|
|
|
|
all_files = current_list + [f for f in new_list if f] |
|
|
if len(all_files) > 1: |
|
|
all_files = all_files[:1] |
|
|
message = f"Uploaded 1 image (limit reached)" |
|
|
else: |
|
|
message = f"Uploaded {len(all_files)} image" |
|
|
|
|
|
return all_files, message |
|
|
|
|
|
|
|
|
def cleanup_temp_file(file_path: str): |
|
|
""" |
|
|
清理单个临时文件 |
|
|
""" |
|
|
try: |
|
|
if os.path.exists(file_path): |
|
|
|
|
|
temp_dir = os.path.dirname(file_path) |
|
|
used_temp_dirs.add(temp_dir) |
|
|
os.remove(file_path) |
|
|
except Exception as e: |
|
|
print(f"Error cleaning temp file {file_path}: {e}") |
|
|
|
|
|
|
|
|
def get_temp_dirs() -> Set[str]: |
|
|
""" |
|
|
获取所有使用过的临时目录 |
|
|
""" |
|
|
return used_temp_dirs.copy() |
|
|
|