import gradio as gr import gdown import os import requests import zipfile from pathlib import Path from urllib.parse import urlparse import re from datetime import datetime # 创建下载目录 DOWNLOAD_DIR = Path("download") DOWNLOAD_DIR.mkdir(exist_ok=True) def is_google_drive_link(url): """检查是否为Google Drive链接""" patterns = [ r'drive\.google\.com', r'docs\.google\.com' ] return any(re.search(pattern, url) for pattern in patterns) def get_file_size_str(size_bytes): """将字节转换为可读的文件大小""" for unit in ['B', 'KB', 'MB', 'GB']: if size_bytes < 1024.0: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.2f} TB" def compress_folder(folder_path): """压缩文件夹为zip文件""" zip_path = f"{folder_path}.zip" with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, os.path.dirname(folder_path)) zipf.write(file_path, arcname) return zip_path def download_file(url, headers_dict=None, progress=gr.Progress()): """下载文件的主函数""" try: progress(0, desc="开始下载...") # 解析请求头 headers = {} if headers_dict: for key, value in headers_dict.items(): if key and value: headers[key] = value # 判断是否为Google Drive链接 if is_google_drive_link(url): progress(0.3, desc="检测到Google Drive链接,使用gdown下载...") # 使用gdown下载 output_path = str(DOWNLOAD_DIR) + '/' # 检测是否为文件夹链接 is_folder = 'folders' in url if is_folder: downloaded = gdown.download_folder(url, output=output_path, quiet=False) if downloaded: folder_path = os.path.dirname(downloaded[0]) progress(0.8, desc="压缩文件夹...") zip_path = compress_folder(folder_path) file_size = os.path.getsize(zip_path) return ( zip_path, f"✅ 下载成功!\n文件: {os.path.basename(zip_path)}\n大小: {get_file_size_str(file_size)}" ) else: output_file = gdown.download(url, output=output_path, quiet=False, fuzzy=True) if output_file: file_size = os.path.getsize(output_file) return ( output_file, f"✅ 下载成功!\n文件: {os.path.basename(output_file)}\n大小: {get_file_size_str(file_size)}" ) else: return None, "❌ Google Drive下载失败" else: progress(0.3, desc="使用HTTP方式下载...") # 普通HTTP下载 response = requests.get(url, headers=headers, stream=True, timeout=30) response.raise_for_status() # 获取文件名 content_disposition = response.headers.get('content-disposition') if content_disposition: filename = re.findall('filename="(.+?)"', content_disposition) filename = filename[0] if filename else None else: filename = os.path.basename(urlparse(url).path) if not filename: filename = f"download_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # 保存文件 file_path = DOWNLOAD_DIR / filename total_size = int(response.headers.get('content-length', 0)) downloaded = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=256*1024): if chunk: f.write(chunk) downloaded += len(chunk) if total_size: progress(0.3 + 0.6 * (downloaded / total_size), desc=f"下载中... {get_file_size_str(downloaded)}/{get_file_size_str(total_size)}") file_size = os.path.getsize(file_path) return ( str(file_path), f"✅ 下载成功!\n文件: {filename}\n大小: {get_file_size_str(file_size)}" ) except Exception as e: return None, f"❌ 下载失败: {str(e)}" def get_downloaded_files(): """获取已下载的文件列表""" files = [] for item in DOWNLOAD_DIR.iterdir(): if item.is_file(): size = item.stat().st_size modified = datetime.fromtimestamp(item.stat().st_mtime).strftime('%Y-%m-%d %H:%M:%S') files.append({ 'name': item.name, 'path': str(item), 'size': get_file_size_str(size), 'modified': modified }) return sorted(files, key=lambda x: x['modified'], reverse=True) def delete_file(file_path): """删除文件""" try: if os.path.exists(file_path): os.remove(file_path) return f"✅ 已删除: {os.path.basename(file_path)}", refresh_file_list() else: return "❌ 文件不存在", refresh_file_list() except Exception as e: return f"❌ 删除失败: {str(e)}", refresh_file_list() def refresh_file_list(): """刷新文件列表""" files = get_downloaded_files() if not files: return "📁 暂无已下载的文件" html = "