Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import yt_dlp | |
| import os | |
| import re | |
| import json | |
| from pathlib import Path | |
| import tempfile | |
| import shutil | |
| from urllib.parse import urlparse, parse_qs | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor | |
| import time | |
| SUPPORTED_PLATFORMS = { | |
| "抖音": r'(https?://)?(v\.douyin\.com|www\.douyin\.com)', | |
| "快手": r'(https?://)?(v\.kuaishou\.com|www\.kuaishou\.com)', | |
| "哔哩哔哩": r'(https?://)?(www\.bilibili\.com|b23\.tv)', | |
| "YouTube": r'(https?://)?(www\.youtube\.com|youtu\.be)', | |
| "小红书": r'(https?://)?(www\.xiaohongshu\.com|xhslink\.com)', | |
| "微博": r'(https?://)?(weibo\.com|t\.cn)', | |
| "西瓜视频": r'(https?://)?(www\.ixigua\.com)', | |
| "腾讯视频": r'(https?://)?(v\.qq\.com)' | |
| } | |
| def get_platform_from_url(url): | |
| """ | |
| 自动识别URL所属平台 | |
| """ | |
| if not url: | |
| return None | |
| for platform, pattern in SUPPORTED_PLATFORMS.items(): | |
| if re.search(pattern, url): | |
| return platform | |
| return None | |
| def get_platform_config(url, format_id=None): | |
| """ | |
| 根据URL返回对的配置 | |
| """ | |
| platform = get_platform_from_url(url) | |
| if not platform: | |
| return None | |
| # 基础配置 | |
| base_config = { | |
| 'format': format_id if format_id else 'best', | |
| 'merge_output_format': 'mp4', | |
| # 网络相关设置 | |
| 'socket_timeout': 10, # 减少超时时间 | |
| 'retries': 2, # 减少重试次数 | |
| 'fragment_retries': 2, | |
| 'retry_sleep': 2, # 减少重试等待时间 | |
| 'concurrent_fragment_downloads': 8, | |
| } | |
| configs = { | |
| "抖音": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| }, | |
| "快手": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| }, | |
| "哔哩哔哩": { | |
| **base_config, | |
| 'format': format_id if format_id else 'bestvideo+bestaudio/best', | |
| # B站特定设置 | |
| 'concurrent_fragment_downloads': 16, | |
| 'file_access_retries': 2, | |
| 'extractor_retries': 2, | |
| 'fragment_retries': 2, | |
| 'retry_sleep': 2, | |
| }, | |
| "YouTube": { | |
| **base_config, | |
| 'format': format_id if format_id else 'bestvideo+bestaudio/best', | |
| }, | |
| "小红书": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| }, | |
| "微博": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| }, | |
| "西瓜视频": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| }, | |
| "腾讯视频": { | |
| **base_config, | |
| 'format': format_id if format_id else 'best', | |
| } | |
| } | |
| return configs.get(platform) | |
| def validate_url(url): | |
| """ | |
| 验证URL是否符合支持的平台格式 | |
| """ | |
| if not url: | |
| return False, "请输入视频链接" | |
| platform = get_platform_from_url(url) | |
| if not platform: | |
| return False, "不支持的平台或链接格式不正确" | |
| return True, f"识别为{platform}平台" | |
| def format_filesize(bytes): | |
| """ | |
| 格式化文件大小显示 | |
| """ | |
| if not bytes: | |
| return "未知大小" | |
| for unit in ['B', 'KB', 'MB', 'GB']: | |
| if bytes < 1024: | |
| return f"{bytes:.1f} {unit}" | |
| bytes /= 1024 | |
| return f"{bytes:.1f} TB" | |
| def parse_video_info(url): | |
| """ | |
| 解析视频信息 | |
| """ | |
| try: | |
| # 验证URL | |
| is_valid, message = validate_url(url) | |
| if not is_valid: | |
| return {"status": "error", "message": message} | |
| # 获取平台特定配置 | |
| ydl_opts = get_platform_config(url) | |
| if not ydl_opts: | |
| return {"status": "error", "message": "不支持的平台"} | |
| ydl_opts.update({ | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| }) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| if not info: | |
| return {"status": "error", "message": "无法获取视频信息"} | |
| # 获取可用的格式 | |
| formats = [] | |
| seen_resolutions = set() # 用于去重 | |
| if 'formats' in info: | |
| # 过滤和排序格式 | |
| video_formats = [] | |
| for f in info['formats']: | |
| # 过滤音频格式和没有视频编码的格式 | |
| if f.get('vcodec') == 'none' or not f.get('vcodec'): | |
| continue | |
| # 获取分辨率 | |
| width = f.get('width', 0) | |
| height = f.get('height', 0) | |
| resolution = f.get('resolution', 'unknown') | |
| if width and height: | |
| resolution = f"{width}x{height}" | |
| # 获取格式说明 | |
| format_note = f.get('format_note', '') | |
| if not format_note and resolution != 'unknown': | |
| if height: | |
| format_note = f"{height}p" | |
| # 创建唯一标识用于去重 | |
| resolution_key = f"{height}_{width}" if height and width else resolution | |
| # 如果这个分辨率已经存在,跳过 | |
| if resolution_key in seen_resolutions: | |
| continue | |
| seen_resolutions.add(resolution_key) | |
| # 创建格式信息 | |
| format_info = { | |
| 'format_id': f.get('format_id', ''), | |
| 'ext': f.get('ext', ''), | |
| 'resolution': resolution, | |
| 'format_note': format_note, | |
| 'quality': height or 0 # 用于排序 | |
| } | |
| video_formats.append(format_info) | |
| # 按质量排序 | |
| video_formats.sort(key=lambda x: x['quality'], reverse=True) | |
| formats = video_formats | |
| # 获取��览图 | |
| thumbnail = info.get('thumbnail', '') | |
| if not thumbnail and 'thumbnails' in info: | |
| thumbnails = info['thumbnails'] | |
| if thumbnails: | |
| thumbnail = thumbnails[-1]['url'] | |
| platform = get_platform_from_url(url) | |
| return { | |
| "status": "success", | |
| "message": "解析成功", | |
| "platform": platform, | |
| "title": info.get('title', '未知标题'), | |
| "duration": info.get('duration', 0), | |
| "formats": formats, | |
| "thumbnail": thumbnail, | |
| "description": info.get('description', ''), | |
| "webpage_url": info.get('webpage_url', url), | |
| } | |
| except Exception as e: | |
| return {"status": "error", "message": f"解析失败: {str(e)}"} | |
| class DownloadProgress: | |
| def __init__(self): | |
| self.progress = 0 | |
| self.status = "准备下载" | |
| self.lock = threading.Lock() | |
| def update(self, d): | |
| with self.lock: | |
| if d.get('status') == 'downloading': | |
| total = d.get('total_bytes') | |
| downloaded = d.get('downloaded_bytes') | |
| if total and downloaded: | |
| self.progress = (downloaded / total) * 100 | |
| self.status = f"下载中: {d.get('_percent_str', '0%')} of {d.get('_total_bytes_str', 'unknown')}" | |
| elif d.get('status') == 'finished': | |
| self.progress = 100 | |
| self.status = "下载完成,正在处理..." | |
| def get_downloads_dir(): | |
| """ | |
| 获取用户的下载目录 | |
| """ | |
| # 获取用户主目录 | |
| home = str(Path.home()) | |
| # 获取下载目录 | |
| downloads_dir = os.path.join(home, "Downloads") | |
| # 如果下载目录不存在,则创建 | |
| if not os.path.exists(downloads_dir): | |
| downloads_dir = home | |
| return downloads_dir | |
| def clean_filename(title, platform): | |
| """ | |
| 清理并格式化文件名 | |
| """ | |
| # 移除非法字符 | |
| illegal_chars = r'[<>:"/\\|?*\n\r\t]' | |
| clean_title = re.sub(illegal_chars, '', title) | |
| # 移除多余的空格和特殊符号 | |
| clean_title = re.sub(r'\s+', ' ', clean_title).strip() | |
| clean_title = re.sub(r'[,.,。!!@#$%^&*()()+=\[\]{};:]+', '', clean_title) | |
| # 移除表情符号 | |
| clean_title = re.sub(r'[\U0001F300-\U0001F9FF]', '', clean_title) | |
| # 添加平台标识 | |
| platform_suffix = { | |
| "抖音": "抖音", | |
| "快手": "快手", | |
| "哔哩哔哩": "B站", | |
| "YouTube": "YT", | |
| "小红书": "XHS", | |
| "微博": "微博", | |
| "西瓜视频": "西瓜", | |
| "腾讯视频": "腾讯" | |
| } | |
| # 限制标题长度(考虑到平台标识的长度) | |
| max_length = 50 | |
| if len(clean_title) > max_length: | |
| clean_title = clean_title[:max_length-3] + '...' | |
| # 添加时间戳和平台标识 | |
| timestamp = time.strftime("%Y%m%d", time.localtime()) | |
| suffix = platform_suffix.get(platform, "视频") | |
| # 最终文件名格式:标题_时间_平台.mp4 | |
| final_name = f"{clean_title}_{timestamp}_{suffix}" | |
| return final_name | |
| def download_single_video(url, format_id, progress_tracker): | |
| """ | |
| 下载单个视频 | |
| """ | |
| try: | |
| # 创建临时目录 | |
| temp_dir = tempfile.mkdtemp() | |
| # 获取平台信息 | |
| platform = get_platform_from_url(url) | |
| if not platform: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": "不支持的平台"} | |
| # 获取视频信息 | |
| with yt_dlp.YoutubeDL({'quiet': True}) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| # 清理并格式化文件名 | |
| clean_title = clean_filename(info.get('title', 'video'), platform) | |
| ydl_opts = get_platform_config(url, format_id) | |
| if not ydl_opts: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": "不支持的平台"} | |
| # 更新下载配置 | |
| ydl_opts.update({ | |
| 'quiet': False, | |
| 'no_warnings': False, | |
| 'extract_flat': False, | |
| 'paths': {'home': temp_dir}, | |
| 'progress_hooks': [progress_tracker.update], | |
| 'outtmpl': clean_title + '.%(ext)s', # 不使用绝对路径 | |
| 'ignoreerrors': True, # 忽略部分错误继续下载 | |
| 'noprogress': False, # 显示进度 | |
| 'continuedl': True, # 支持断点续传 | |
| 'retries': float('inf'), # 无限重试 | |
| 'fragment_retries': float('inf'), # 片段无限重试 | |
| 'skip_unavailable_fragments': True, # 跳过不可用片段 | |
| 'no_abort_on_error': True, # 发生错误时不中止 | |
| }) | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| try: | |
| info = ydl.extract_info(url, download=True) | |
| if 'requested_downloads' in info: | |
| file_path = info['requested_downloads'][0]['filepath'] | |
| else: | |
| file_path = os.path.join(temp_dir, f"{clean_title}.mp4") | |
| if os.path.exists(file_path): | |
| # 检查文件大小 | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": "下载的文件大小为0,可能下载失败"} | |
| # 创建一个新的临时文件 | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') | |
| temp_file.close() | |
| shutil.copy2(file_path, temp_file.name) | |
| # 清理原始临时目录 | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return { | |
| "status": "success", | |
| "file_path": temp_file.name, | |
| "title": clean_title, | |
| "ext": "mp4" | |
| } | |
| else: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": "下载文件不存在"} | |
| except Exception as e: | |
| error_msg = str(e) | |
| # 如果是超时错误且进度不为0,继续下载 | |
| if ("timed out" in error_msg or "timeout" in error_msg) and progress_tracker.progress > 0: | |
| return { | |
| "status": "success", | |
| "file_path": file_path if 'file_path' in locals() else None, | |
| "title": clean_title, | |
| "ext": "mp4" | |
| } | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": f"下载过程中出错: {error_msg}"} | |
| except Exception as e: | |
| if 'temp_dir' in locals(): | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return {"status": "error", "message": str(e)} | |
| def download_video(urls, format_id=None): | |
| """ | |
| 下载视频并返回文件 | |
| """ | |
| if isinstance(urls, str): | |
| urls = [url.strip() for url in urls.split('\n') if url.strip()] | |
| if not urls: | |
| return "请输入至少一个视频链接", None, 0, "未开始下载" | |
| progress_tracker = DownloadProgress() | |
| result = download_single_video(urls[0], format_id, progress_tracker) | |
| if result["status"] == "success": | |
| try: | |
| # 返回文件路径供Gradio处理下载 | |
| return "下载成功,正在传输...", result["file_path"], 100, "下载完成" | |
| except Exception as e: | |
| return f"文件处理失败: {str(e)}", None, 0, "下载失败" | |
| else: | |
| return f"下载失败: {result.get('message', '未知错误')}", None, 0, "下载失败" | |
| # 创建Gradio界面 | |
| with gr.Blocks(title="视频下载工具", theme=gr.themes.Soft()) as demo: | |
| # 存储视频信息的状态变量 | |
| video_info_state = gr.State({}) | |
| with gr.Column(elem_id="header"): | |
| gr.Markdown(""" | |
| # 🎥 视频下载工具 | |
| 一键下载各大平台视频,支持以下平台: | |
| """) | |
| with gr.Row(): | |
| for platform in SUPPORTED_PLATFORMS.keys(): | |
| gr.Markdown(f"<span class='platform-badge'>{platform}</span>", elem_classes="platform") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # 输入部分 | |
| url_input = gr.Textbox( | |
| label="视频链接", | |
| placeholder="请输入视频链接,支持批量下载(每行一个链接)...", | |
| lines=3, | |
| info="支持多个平台的视频链接,自动识别平台类型" | |
| ) | |
| parse_btn = gr.Button("解析视频", variant="secondary", size="lg") | |
| # 视频信息显示(使用Accordion组件) | |
| with gr.Accordion("视频详细信息", open=False, visible=False) as video_info_accordion: | |
| video_info = gr.JSON(show_label=False) | |
| format_choice = gr.Dropdown( | |
| label="选择清晰度", | |
| choices=[], | |
| interactive=True, | |
| visible=False | |
| ) | |
| download_btn = gr.Button("开始下载", variant="primary", size="lg", interactive=False) | |
| with gr.Column(scale=3): | |
| # 预览和输出部分 | |
| with gr.Row(): | |
| preview_image = gr.Image(label="视频预览", visible=False) | |
| with gr.Row(): | |
| progress = gr.Slider( | |
| minimum=0, | |
| maximum=100, | |
| value=0, | |
| label="下载进度", | |
| interactive=False | |
| ) | |
| status = gr.Textbox( | |
| label="状态信息", | |
| value="等待开始下载...", | |
| interactive=False | |
| ) | |
| # 使用File组件来处理下载 | |
| output_file = gr.File(label="下载文件") | |
| # 添加自定义CSS | |
| gr.Markdown(""" | |
| <style> | |
| #header { | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| } | |
| .platform-badge { | |
| display: inline-block; | |
| padding: 0.5rem 1rem; | |
| margin: 0.5rem; | |
| border-radius: 2rem; | |
| background-color: #2196F3; | |
| color: white; | |
| font-weight: bold; | |
| } | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .contain { | |
| margin: 0 auto; | |
| padding: 2rem; | |
| } | |
| .download-link { | |
| display: inline-block; | |
| padding: 0.8rem 1.5rem; | |
| background-color: #4CAF50; | |
| color: white; | |
| text-decoration: none; | |
| border-radius: 0.5rem; | |
| margin-top: 1rem; | |
| font-weight: bold; | |
| transition: background-color 0.3s; | |
| } | |
| .download-link:hover { | |
| background-color: #45a049; | |
| } | |
| </style> | |
| """) | |
| def update_video_info(url): | |
| """更新视频信息""" | |
| # 只解析第一个链接 | |
| first_url = url.split('\n')[0].strip() | |
| info = parse_video_info(first_url) | |
| if info["status"] == "success": | |
| # 准备清晰度选项 | |
| format_choices = [] | |
| for fmt in info["formats"]: | |
| # 构建格式标签 | |
| label_parts = [] | |
| if fmt['format_note']: | |
| label_parts.append(fmt['format_note']) | |
| if fmt['resolution'] != 'unknown': | |
| label_parts.append(fmt['resolution']) | |
| label = " - ".join(filter(None, label_parts)) | |
| if not label: | |
| label = f"格式 {fmt['format_id']}" | |
| format_choices.append((label, fmt['format_id'])) | |
| return [ | |
| gr.update(visible=True, value=info), # video_info | |
| gr.update(visible=True, choices=format_choices, value=format_choices[0][1] if format_choices else None), # format_choice | |
| gr.update(interactive=True), # download_btn | |
| gr.update(visible=True, value=info["thumbnail"]), # preview_image | |
| f"解析成功: {info['title']} ({info['platform']})", # status | |
| gr.update(visible=True) # video_info_accordion | |
| ] | |
| else: | |
| return [ | |
| gr.update(visible=False), # video_info | |
| gr.update(visible=False), # format_choice | |
| gr.update(interactive=False), # download_btn | |
| gr.update(visible=False), # preview_image | |
| info["message"], # status | |
| gr.update(visible=False) # video_info_accordion | |
| ] | |
| # 绑定解析按钮事件 | |
| parse_btn.click( | |
| fn=update_video_info, | |
| inputs=[url_input], | |
| outputs=[video_info, format_choice, download_btn, preview_image, status, video_info_accordion] | |
| ) | |
| # 绑定下载按钮事件 | |
| download_btn.click( | |
| fn=download_video, | |
| inputs=[url_input, format_choice], | |
| outputs=[status, output_file, progress, status] | |
| ) | |
| # 启动应用 | |
| if __name__ == "__main__": | |
| demo.launch() |