import os import gradio as gr import tempfile import random import subprocess import shutil import zipfile import threading import concurrent.futures from datetime import datetime import time class FastVideoProcessor: def __init__(self, max_processing_time=30): self.max_processing_time = max_processing_time self.start_time = None def check_time_limit(self): """检查是否超过时间限制""" if self.start_time and time.time() - self.start_time > self.max_processing_time: return True return False def ffmpeg_cut_video_fast(self, input_path, start_time, duration, output_path): """高速剪切视频片段""" command = [ 'ffmpeg', '-ss', str(start_time), '-i', input_path, '-t', str(duration), '-c:v', 'libx264', # H264编码器 '-preset', 'ultrafast', # 最快编码预设 '-crf', '20', # 高质量(18-24范围) '-c:a', 'aac', '-b:a', '128k', '-threads', '0', # 使用所有CPU核心 '-y', output_path ] process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return process.returncode == 0 def ffmpeg_resize_video_fast(self, input_path, output_path, target_ratio): """高速调整视频比例""" if target_ratio == '9:16': scale_filter = "scale=1080:1920:force_original_aspect_ratio=decrease,pad=1080:1920:(ow-iw)/2:(oh-ih)/2" else: scale_filter = "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2" command = [ 'ffmpeg', '-i', input_path, '-vf', scale_filter, '-c:v', 'libx264', '-preset', 'ultrafast', '-crf', '20', '-c:a', 'copy', # 音频直接复制,不重编码 '-threads', '0', '-y', output_path ] process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return process.returncode == 0 def concat_videos_fast(self, file_list, output_path): """高速合并视频""" list_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') try: for f in file_list: list_file.write("file '{}'\n".format(f.replace("'", r"'\''"))) list_file.close() command = [ 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', list_file.name, '-c', 'copy', # 直接复制,无重编码 '-threads', '0', '-y', output_path ] process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return process.returncode == 0 finally: os.unlink(list_file.name) def extract_clips_parallel(self, video_files, clip_duration): """并行提取视频片段""" self.start_time = time.time() clips = [] temp_dir = tempfile.mkdtemp() # 计算合理的片段数量,避免处理过多 max_clips_per_video = max(3, int(20 / len(video_files))) # 动态调整 def process_single_video(args): idx, video_file = args video_clips = [] try: video_path = video_file.name # 快速获取视频时长 cmd_duration = [ 'ffprobe', '-v', 'quiet', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path ] result = subprocess.run(cmd_duration, capture_output=True, text=True, timeout=5) total_duration = float(result.stdout.strip()) # 智能选择起始点,避免从头开始 step = max(clip_duration, total_duration / max_clips_per_video) count = 0 start = 0.0 while start < total_duration and count < max_clips_per_video: if self.check_time_limit(): break duration = min(clip_duration, total_duration - start) clip_path = os.path.join(temp_dir, f"clip_{idx}_{count}.mp4") success = self.ffmpeg_cut_video_fast(video_path, start, duration, clip_path) if success: video_clips.append(clip_path) start += step # 跳跃式选择片段,提高多样性 count += 1 return video_clips except Exception as e: print(f"处理视频 {idx} 出错: {e}") return [] # 并行处理所有视频 with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: video_args = [(idx, vf) for idx, vf in enumerate(video_files)] results = executor.map(process_single_video, video_args) for video_clips in results: clips.extend(video_clips) return clips, temp_dir def generate_mixed_videos_fast(self, clips, num_output_videos, target_ratio): """高速生成混剪视频""" temp_dir = tempfile.mkdtemp() random.shuffle(clips) # 智能分配片段 clips_per_video = max(2, len(clips) // num_output_videos) output_files = [] def create_single_mixed_video(args): i, selected_clips = args try: if self.check_time_limit(): return None # 先快速合并 temp_video_path = os.path.join(temp_dir, f"temp_mixed_{i+1}.mp4") success = self.concat_videos_fast(selected_clips, temp_video_path) if not success: return None # 再调整比例 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_path = os.path.join(temp_dir, f"混剪视频_{target_ratio.replace(':', 'x')}_{i+1}_{timestamp}.mp4") success = self.ffmpeg_resize_video_fast(temp_video_path, output_path, target_ratio) if success: return output_path return None except Exception as e: print(f"生成混剪视频 {i+1} 出错: {e}") return None # 准备任务 tasks = [] for i in range(num_output_videos): start_idx = i * clips_per_video end_idx = len(clips) if i == num_output_videos - 1 else (start_idx + clips_per_video) selected_clips = clips[start_idx:end_idx] if selected_clips: tasks.append((i, selected_clips)) # 并行生成 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: results = executor.map(create_single_mixed_video, tasks) for result in results: if result: output_files.append(result) return output_files, temp_dir processor = FastVideoProcessor() def create_video_package_fast(output_files, original_file_names, target_ratio, processing_time): """快速打包视频文件""" package_dir = tempfile.mkdtemp() zip_path = os.path.join(package_dir, f"混剪视频包_{target_ratio.replace(':', 'x')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip") try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=1) as zipf: # 最低压缩级别 for video_file in output_files: arcname = os.path.basename(video_file) zipf.write(video_file, arcname) readme_content = f"""# 高速混剪视频包 ## ⚡ 处理信息 - 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - 处理耗时: {processing_time:.1f} 秒 ⚡ - 视频数量: {len(output_files)} 个 - 视频比例: {target_ratio} - 优化级别: 高速模式 (30秒内完成) ## 🚀 高速优化技术 - 并行处理: 多线程同时处理 - 快速编码: ultrafast 预设 - 智能跳跃: 避免连续片段 - 零拷贝合并: 减少重编码 - 动态调整: 根据文件数量优化 ## 📁 文件列表 """ for i, video_file in enumerate(output_files, 1): file_size = os.path.getsize(video_file) / (1024 * 1024) readme_content += f"- 混剪视频_{i}.mp4 ({file_size:.1f}MB)\n" readme_content += f""" ## 🎬 质量保证 - CRF 20: 高质量编码 - 原始音频: 保持音质 - 智能填充: 完美适配比例 - 多核加速: 充分利用CPU ## 📱 发布就绪 视频已经过专业优化,可直接发布到各大平台! --- ⚡ 高速混剪工具 - 30秒完成专业混剪 """ zipf.writestr("README.txt", readme_content.encode('utf-8')) return zip_path except Exception as e: shutil.rmtree(package_dir, ignore_errors=True) raise e def process_and_package_ultra_fast(video_files, clip_duration, num_output_videos, target_ratio): if not video_files or len(video_files) == 0: return "❌ 请上传至少一个视频文件", None, "" start_time = time.time() try: original_names = [os.path.basename(f.name) for f in video_files] # 1. 高速提取片段 clips, clips_temp_dir = processor.extract_clips_parallel(video_files, clip_duration) if not clips: return "❌ 未能提取到有效片段", None, "" # 2. 高速生成混剪视频 output_files, output_temp_dir = processor.generate_mixed_videos_fast(clips, num_output_videos, target_ratio) # 清理切片临时目录 shutil.rmtree(clips_temp_dir, ignore_errors=True) if not output_files: return "❌ 未能生成混剪视频", None, "" # 3. 快速打包 processing_time = time.time() - start_time zip_file_path = create_video_package_fast(output_files, original_names, target_ratio, processing_time) total_size = os.path.getsize(zip_file_path) / (1024 * 1024) platform_info = "📱 短视频平台" if target_ratio == '9:16' else "🖥️ 长视频平台" status_msg = f"""⚡ 高速处理完成! 📊 **处理统计:** • ⚡ 处理时间: {processing_time:.1f} 秒 • 🎬 混剪视频: {len(output_files)} 个 • 📐 视频比例: {target_ratio} • 🎯 适合平台: {platform_info} • 📦 文件大小: {total_size:.1f}MB 🚀 **高速优化:** • 多线程并行处理 • 智能片段选择 • 快速编码预设 • 零拷贝合并技术 💾 **立即下载:** 点击下方按钮保存到本地 """ details = f"""⚡ **高速处理详情:** 🎬 **视频信息:** """ for i, video_file in enumerate(output_files, 1): file_size = os.path.getsize(video_file) / (1024 * 1024) details += f"• 混剪视频_{i}: {file_size:.1f}MB\n" details += f""" ⚡ **优化技术:** • 处理时间: {processing_time:.1f}秒 (目标: 30秒内) • 并行处理: {'✅ 已启用' if len(video_files) > 1 else '单文件处理'} • 快速编码: ultrafast + CRF 20 • 智能跳跃: 避免连续片段 • 多核心: CPU全核心利用 🎯 **质量保证:** • 视频编码: H.264 高质量 • 音频处理: AAC 128k • 比例适配: 智能填充黑边 • 兼容性: 支持所有主流播放器 📱 **发布优势:** """ if target_ratio == '9:16': details += """• 完美适配移动端全屏 • 抖音算法友好格式 • 高清1080x1920分辨率 • 即刻发布无需二次处理""" else: details += """• 传统影视观看体验 • YouTube推荐格式 • 高清1920x1080分辨率 • 适合详细内容展示""" shutil.rmtree(output_temp_dir, ignore_errors=True) return status_msg, zip_file_path, details except Exception as e: processing_time = time.time() - start_time return f"❌ 处理失败 (耗时{processing_time:.1f}秒): {str(e)}", None, "" def main(): with gr.Blocks( title="⚡高速FFmpeg剪辑工具", theme=gr.themes.Soft() ) as demo: gr.HTML("""
🚀 30秒内完成 → 智能并行 → 保证品质 → 一键下载