Spaces:
Running
Running
| import os | |
| import glob | |
| import time | |
| import platform | |
| import shutil | |
| from uuid import uuid4 | |
| from loguru import logger | |
| from app.utils import utils | |
| def open_task_folder(root_dir, task_id): | |
| """打开任务文件夹 | |
| Args: | |
| root_dir: 项目根目录 | |
| task_id: 任务ID | |
| """ | |
| try: | |
| sys = platform.system() | |
| path = os.path.join(root_dir, "storage", "tasks", task_id) | |
| if os.path.exists(path): | |
| if sys == 'Windows': | |
| os.system(f"start {path}") | |
| if sys == 'Darwin': | |
| os.system(f"open {path}") | |
| if sys == 'Linux': | |
| os.system(f"xdg-open {path}") | |
| except Exception as e: | |
| logger.error(f"打开任务文件夹失败: {e}") | |
| def cleanup_temp_files(temp_dir, max_age=3600): | |
| """清理临时文件 | |
| Args: | |
| temp_dir: 临时文件目录 | |
| max_age: 文件最大保存时间(秒) | |
| """ | |
| if os.path.exists(temp_dir): | |
| for file in os.listdir(temp_dir): | |
| file_path = os.path.join(temp_dir, file) | |
| try: | |
| if os.path.getctime(file_path) < time.time() - max_age: | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| elif os.path.isdir(file_path): | |
| shutil.rmtree(file_path) | |
| logger.debug(f"已清理临时文件: {file_path}") | |
| except Exception as e: | |
| logger.error(f"清理临时文件失败: {file_path}, 错误: {e}") | |
| def get_file_list(directory, file_types=None, sort_by='ctime', reverse=True): | |
| """获取指定目录下的文件列表 | |
| Args: | |
| directory: 目录路径 | |
| file_types: 文件类型列表,如 ['.mp4', '.mov'] | |
| sort_by: 排序方式,支持 'ctime'(创建时间), 'mtime'(修改时间), 'size'(文件大小), 'name'(文件名) | |
| reverse: 是否倒序排序 | |
| Returns: | |
| list: 文件信息列表 | |
| """ | |
| if not os.path.exists(directory): | |
| return [] | |
| files = [] | |
| if file_types: | |
| for file_type in file_types: | |
| files.extend(glob.glob(os.path.join(directory, f"*{file_type}"))) | |
| else: | |
| files = glob.glob(os.path.join(directory, "*")) | |
| file_list = [] | |
| for file_path in files: | |
| try: | |
| file_stat = os.stat(file_path) | |
| file_info = { | |
| "name": os.path.basename(file_path), | |
| "path": file_path, | |
| "size": file_stat.st_size, | |
| "ctime": file_stat.st_ctime, | |
| "mtime": file_stat.st_mtime | |
| } | |
| file_list.append(file_info) | |
| except Exception as e: | |
| logger.error(f"获取文件信息失败: {file_path}, 错误: {e}") | |
| # 排序 | |
| if sort_by in ['ctime', 'mtime', 'size', 'name']: | |
| file_list.sort(key=lambda x: x.get(sort_by, ''), reverse=reverse) | |
| return file_list | |
| def save_uploaded_file(uploaded_file, save_dir, allowed_types=None): | |
| """保存上传的文件 | |
| Args: | |
| uploaded_file: StreamlitUploadedFile对象 | |
| save_dir: 保存目录 | |
| allowed_types: 允许的文件类型列表,如 ['.mp4', '.mov'] | |
| Returns: | |
| str: 保存后的文件路径,失败返回None | |
| """ | |
| try: | |
| if not os.path.exists(save_dir): | |
| os.makedirs(save_dir) | |
| file_name, file_extension = os.path.splitext(uploaded_file.name) | |
| # 检查文件类型 | |
| if allowed_types and file_extension.lower() not in allowed_types: | |
| logger.error(f"不支持的文件类型: {file_extension}") | |
| return None | |
| # 如果文件已存在,添加时间戳 | |
| save_path = os.path.join(save_dir, uploaded_file.name) | |
| if os.path.exists(save_path): | |
| timestamp = time.strftime("%Y%m%d%H%M%S") | |
| new_file_name = f"{file_name}_{timestamp}{file_extension}" | |
| save_path = os.path.join(save_dir, new_file_name) | |
| # 保存文件 | |
| with open(save_path, "wb") as f: | |
| f.write(uploaded_file.read()) | |
| logger.info(f"文件保存成功: {save_path}") | |
| return save_path | |
| except Exception as e: | |
| logger.error(f"保存上传文件失败: {e}") | |
| return None | |
| def create_temp_file(prefix='tmp', suffix='', directory=None): | |
| """创建临时文件 | |
| Args: | |
| prefix: 文件名前缀 | |
| suffix: 文件扩展名 | |
| directory: 临时文件目录,默认使用系统临时目录 | |
| Returns: | |
| str: 临时文件路径 | |
| """ | |
| try: | |
| if directory is None: | |
| directory = utils.storage_dir("temp", create=True) | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| temp_file = os.path.join(directory, f"{prefix}-{str(uuid4())}{suffix}") | |
| return temp_file | |
| except Exception as e: | |
| logger.error(f"创建临时文件失败: {e}") | |
| return None | |
| def get_file_size(file_path, format='MB'): | |
| """获取文件大小 | |
| Args: | |
| file_path: 文件路径 | |
| format: 返回格式,支持 'B', 'KB', 'MB', 'GB' | |
| Returns: | |
| float: 文件大小 | |
| """ | |
| try: | |
| size_bytes = os.path.getsize(file_path) | |
| if format.upper() == 'B': | |
| return size_bytes | |
| elif format.upper() == 'KB': | |
| return size_bytes / 1024 | |
| elif format.upper() == 'MB': | |
| return size_bytes / (1024 * 1024) | |
| elif format.upper() == 'GB': | |
| return size_bytes / (1024 * 1024 * 1024) | |
| else: | |
| return size_bytes | |
| except Exception as e: | |
| logger.error(f"获取文件大小失败: {file_path}, 错误: {e}") | |
| return 0 | |
| def ensure_directory(directory): | |
| """确保目录存在,如果不存在则创建 | |
| Args: | |
| directory: 目录路径 | |
| Returns: | |
| bool: 是否成功 | |
| """ | |
| try: | |
| if not os.path.exists(directory): | |
| os.makedirs(directory) | |
| return True | |
| except Exception as e: | |
| logger.error(f"创建目录失败: {directory}, 错误: {e}") | |
| return False | |
| def create_zip(files: list, zip_path: str, base_dir: str = None, folder_name: str = "demo") -> bool: | |
| """ | |
| 创建zip文件 | |
| Args: | |
| files: 要打包的文件列表 | |
| zip_path: zip文件保存路径 | |
| base_dir: 基础目录,用于保持目录结构 | |
| folder_name: zip解压后的文件夹名称,默认为frames | |
| Returns: | |
| bool: 是否成功 | |
| """ | |
| try: | |
| import zipfile | |
| # 确保目标目录存在 | |
| os.makedirs(os.path.dirname(zip_path), exist_ok=True) | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file in files: | |
| if not os.path.exists(file): | |
| logger.warning(f"文件不存在,跳过: {file}") | |
| continue | |
| # 计算文件在zip中的路径,添加folder_name作为前缀目录 | |
| if base_dir: | |
| arcname = os.path.join(folder_name, os.path.relpath(file, base_dir)) | |
| else: | |
| arcname = os.path.join(folder_name, os.path.basename(file)) | |
| try: | |
| zipf.write(file, arcname) | |
| except Exception as e: | |
| logger.error(f"添加文件到zip失败: {file}, 错误: {e}") | |
| continue | |
| return True | |
| except Exception as e: | |
| logger.error(f"创建zip文件失败: {e}") | |
| return False |