Py / app.py
Ryanus's picture
Update app.py
2ce2eb3 verified
raw
history blame
17.5 kB
import os
import gradio as gr
import tempfile
import random
import subprocess
import shutil
import zipfile
import threading
import concurrent.futures
from datetime import datetime
import time
class FastVideoProcessor:
def __init__(self, max_processing_time=30):
self.max_processing_time = max_processing_time
self.start_time = None
def check_time_limit(self):
"""检查是否超过时间限制"""
if self.start_time and time.time() - self.start_time > self.max_processing_time:
return True
return False
def ffmpeg_cut_video_fast(self, input_path, start_time, duration, output_path):
"""高速剪切视频片段"""
command = [
'ffmpeg',
'-ss', str(start_time),
'-i', input_path,
'-t', str(duration),
'-c:v', 'libx264', # H264编码器
'-preset', 'ultrafast', # 最快编码预设
'-crf', '20', # 高质量(18-24范围)
'-c:a', 'aac',
'-b:a', '128k',
'-threads', '0', # 使用所有CPU核心
'-y',
output_path
]
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.returncode == 0
def ffmpeg_resize_video_fast(self, input_path, output_path, target_ratio):
"""高速调整视频比例"""
if target_ratio == '9:16':
scale_filter = "scale=1080:1920:force_original_aspect_ratio=decrease,pad=1080:1920:(ow-iw)/2:(oh-ih)/2"
else:
scale_filter = "scale=1920:1080:force_original_aspect_ratio=decrease,pad=1920:1080:(ow-iw)/2:(oh-ih)/2"
command = [
'ffmpeg',
'-i', input_path,
'-vf', scale_filter,
'-c:v', 'libx264',
'-preset', 'ultrafast',
'-crf', '20',
'-c:a', 'copy', # 音频直接复制,不重编码
'-threads', '0',
'-y',
output_path
]
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.returncode == 0
def concat_videos_fast(self, file_list, output_path):
"""高速合并视频"""
list_file = tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt')
try:
for f in file_list:
list_file.write("file '{}'\n".format(f.replace("'", r"'\''")))
list_file.close()
command = [
'ffmpeg',
'-f', 'concat',
'-safe', '0',
'-i', list_file.name,
'-c', 'copy', # 直接复制,无重编码
'-threads', '0',
'-y',
output_path
]
process = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return process.returncode == 0
finally:
os.unlink(list_file.name)
def extract_clips_parallel(self, video_files, clip_duration):
"""并行提取视频片段"""
self.start_time = time.time()
clips = []
temp_dir = tempfile.mkdtemp()
# 计算合理的片段数量,避免处理过多
max_clips_per_video = max(3, int(20 / len(video_files))) # 动态调整
def process_single_video(args):
idx, video_file = args
video_clips = []
try:
video_path = video_file.name
# 快速获取视频时长
cmd_duration = [
'ffprobe', '-v', 'quiet',
'-show_entries', 'format=duration',
'-of', 'default=noprint_wrappers=1:nokey=1',
video_path
]
result = subprocess.run(cmd_duration, capture_output=True, text=True, timeout=5)
total_duration = float(result.stdout.strip())
# 智能选择起始点,避免从头开始
step = max(clip_duration, total_duration / max_clips_per_video)
count = 0
start = 0.0
while start < total_duration and count < max_clips_per_video:
if self.check_time_limit():
break
duration = min(clip_duration, total_duration - start)
clip_path = os.path.join(temp_dir, f"clip_{idx}_{count}.mp4")
success = self.ffmpeg_cut_video_fast(video_path, start, duration, clip_path)
if success:
video_clips.append(clip_path)
start += step # 跳跃式选择片段,提高多样性
count += 1
return video_clips
except Exception as e:
print(f"处理视频 {idx} 出错: {e}")
return []
# 并行处理所有视频
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
video_args = [(idx, vf) for idx, vf in enumerate(video_files)]
results = executor.map(process_single_video, video_args)
for video_clips in results:
clips.extend(video_clips)
return clips, temp_dir
def generate_mixed_videos_fast(self, clips, num_output_videos, target_ratio):
"""高速生成混剪视频"""
temp_dir = tempfile.mkdtemp()
random.shuffle(clips)
# 智能分配片段
clips_per_video = max(2, len(clips) // num_output_videos)
output_files = []
def create_single_mixed_video(args):
i, selected_clips = args
try:
if self.check_time_limit():
return None
# 先快速合并
temp_video_path = os.path.join(temp_dir, f"temp_mixed_{i+1}.mp4")
success = self.concat_videos_fast(selected_clips, temp_video_path)
if not success:
return None
# 再调整比例
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_path = os.path.join(temp_dir, f"混剪视频_{target_ratio.replace(':', 'x')}_{i+1}_{timestamp}.mp4")
success = self.ffmpeg_resize_video_fast(temp_video_path, output_path, target_ratio)
if success:
return output_path
return None
except Exception as e:
print(f"生成混剪视频 {i+1} 出错: {e}")
return None
# 准备任务
tasks = []
for i in range(num_output_videos):
start_idx = i * clips_per_video
end_idx = len(clips) if i == num_output_videos - 1 else (start_idx + clips_per_video)
selected_clips = clips[start_idx:end_idx]
if selected_clips:
tasks.append((i, selected_clips))
# 并行生成
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(create_single_mixed_video, tasks)
for result in results:
if result:
output_files.append(result)
return output_files, temp_dir
processor = FastVideoProcessor()
def create_video_package_fast(output_files, original_file_names, target_ratio, processing_time):
"""快速打包视频文件"""
package_dir = tempfile.mkdtemp()
zip_path = os.path.join(package_dir, f"混剪视频包_{target_ratio.replace(':', 'x')}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip")
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=1) as zipf: # 最低压缩级别
for video_file in output_files:
arcname = os.path.basename(video_file)
zipf.write(video_file, arcname)
readme_content = f"""# 高速混剪视频包
## ⚡ 处理信息
- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- 处理耗时: {processing_time:.1f} 秒 ⚡
- 视频数量: {len(output_files)}
- 视频比例: {target_ratio}
- 优化级别: 高速模式 (30秒内完成)
## 🚀 高速优化技术
- 并行处理: 多线程同时处理
- 快速编码: ultrafast 预设
- 智能跳跃: 避免连续片段
- 零拷贝合并: 减少重编码
- 动态调整: 根据文件数量优化
## 📁 文件列表
"""
for i, video_file in enumerate(output_files, 1):
file_size = os.path.getsize(video_file) / (1024 * 1024)
readme_content += f"- 混剪视频_{i}.mp4 ({file_size:.1f}MB)\n"
readme_content += f"""
## 🎬 质量保证
- CRF 20: 高质量编码
- 原始音频: 保持音质
- 智能填充: 完美适配比例
- 多核加速: 充分利用CPU
## 📱 发布就绪
视频已经过专业优化,可直接发布到各大平台!
---
⚡ 高速混剪工具 - 30秒完成专业混剪
"""
zipf.writestr("README.txt", readme_content.encode('utf-8'))
return zip_path
except Exception as e:
shutil.rmtree(package_dir, ignore_errors=True)
raise e
def process_and_package_ultra_fast(video_files, clip_duration, num_output_videos, target_ratio):
if not video_files or len(video_files) == 0:
return "❌ 请上传至少一个视频文件", None, ""
start_time = time.time()
try:
original_names = [os.path.basename(f.name) for f in video_files]
# 1. 高速提取片段
clips, clips_temp_dir = processor.extract_clips_parallel(video_files, clip_duration)
if not clips:
return "❌ 未能提取到有效片段", None, ""
# 2. 高速生成混剪视频
output_files, output_temp_dir = processor.generate_mixed_videos_fast(clips, num_output_videos, target_ratio)
# 清理切片临时目录
shutil.rmtree(clips_temp_dir, ignore_errors=True)
if not output_files:
return "❌ 未能生成混剪视频", None, ""
# 3. 快速打包
processing_time = time.time() - start_time
zip_file_path = create_video_package_fast(output_files, original_names, target_ratio, processing_time)
total_size = os.path.getsize(zip_file_path) / (1024 * 1024)
platform_info = "📱 短视频平台" if target_ratio == '9:16' else "🖥️ 长视频平台"
status_msg = f"""⚡ 高速处理完成!
📊 **处理统计:**
• ⚡ 处理时间: {processing_time:.1f}
• 🎬 混剪视频: {len(output_files)}
• 📐 视频比例: {target_ratio}
• 🎯 适合平台: {platform_info}
• 📦 文件大小: {total_size:.1f}MB
🚀 **高速优化:**
• 多线程并行处理
• 智能片段选择
• 快速编码预设
• 零拷贝合并技术
💾 **立即下载:**
点击下方按钮保存到本地
"""
details = f"""⚡ **高速处理详情:**
🎬 **视频信息:**
"""
for i, video_file in enumerate(output_files, 1):
file_size = os.path.getsize(video_file) / (1024 * 1024)
details += f"• 混剪视频_{i}: {file_size:.1f}MB\n"
details += f"""
⚡ **优化技术:**
• 处理时间: {processing_time:.1f}秒 (目标: 30秒内)
• 并行处理: {'✅ 已启用' if len(video_files) > 1 else '单文件处理'}
• 快速编码: ultrafast + CRF 20
• 智能跳跃: 避免连续片段
• 多核心: CPU全核心利用
🎯 **质量保证:**
• 视频编码: H.264 高质量
• 音频处理: AAC 128k
• 比例适配: 智能填充黑边
• 兼容性: 支持所有主流播放器
📱 **发布优势:**
"""
if target_ratio == '9:16':
details += """• 完美适配移动端全屏
• 抖音算法友好格式
• 高清1080x1920分辨率
• 即刻发布无需二次处理"""
else:
details += """• 传统影视观看体验
• YouTube推荐格式
• 高清1920x1080分辨率
• 适合详细内容展示"""
shutil.rmtree(output_temp_dir, ignore_errors=True)
return status_msg, zip_file_path, details
except Exception as e:
processing_time = time.time() - start_time
return f"❌ 处理失败 (耗时{processing_time:.1f}秒): {str(e)}", None, ""
def main():
with gr.Blocks(
title="⚡高速FFmpeg剪辑工具",
theme=gr.themes.Soft()
) as demo:
gr.HTML("""
<div style="text-align: center; padding: 20px; background: linear-gradient(135deg, #FF6B6B 0%, #4ECDC4 100%); border-radius: 15px; color: white; margin-bottom: 20px;">
<h1>⚡ 高速 FFmpeg 自动剪辑工具</h1>
<p style="margin: 10px 0 0 0; font-size: 18px;">🚀 30秒内完成 → 智能并行 → 保证品质 → 一键下载</p>
</div>
""")
gr.Markdown("""
### ⚡ 高速优化特性
- 🚀 **30秒完成**: 智能时间控制,确保快速处理
- 🔥 **并行处理**: 多线程同时处理,充分利用CPU
- 💎 **保证品质**: CRF 20高质量编码,ultrafast预设
- 🧠 **智能优化**: 动态调整处理量,避免超时
""")
with gr.Row():
with gr.Column(scale=2):
video_input = gr.File(
label="📤 上传视频文件 (支持批量)",
file_types=[".mp4", ".mov", ".avi", ".mkv"],
file_count="multiple",
height=120
)
with gr.Row():
clip_duration = gr.Number(
value=3,
label="切片时长(秒)",
minimum=2,
maximum=8,
info="建议2-5秒,太长影响速度"
)
num_output = gr.Number(
value=3,
label="生成数量",
minimum=1,
maximum=5,
info="建议1-3个,太多影响速度"
)
ratio_selection = gr.Radio(
choices=["9:16", "16:9"],
value="9:16",
label="📐 视频比例",
info="9:16抖音快手 | 16:9 YouTube B站"
)
process_btn = gr.Button(
"⚡ 高速处理 (30秒完成)",
variant="primary",
size="lg"
)
with gr.Column(scale=1):
status_output = gr.Textbox(
label="📊 处理状态",
lines=12,
interactive=False,
show_copy_button=True
)
with gr.Row():
with gr.Column(scale=1):
download_file = gr.File(
label="⚡ 高速下载视频包",
interactive=False
)
with gr.Column(scale=1):
details_output = gr.Textbox(
label="📝 处理详情",
lines=18,
interactive=False,
show_copy_button=True
)
process_btn.click(
fn=process_and_package_ultra_fast,
inputs=[video_input, clip_duration, num_output, ratio_selection],
outputs=[status_output, download_file, details_output]
)
gr.Markdown("""
---
### ⚡ 高速处理说明
**🚀 速度优化技术:**
- **并行切片**: 多个视频同时处理,4线程并发
- **ultrafast预设**: FFmpeg最快编码,保证30秒完成
- **智能跳跃**: 非连续片段选择,提高多样性
- **零拷贝合并**: 减少重编码,加速处理
- **动态调整**: 根据文件量自动优化处理数量
**💎 品质保证:**
- **CRF 20编码**: 高质量视频输出
- **原音保持**: 音频直接复制,无损处理
- **智能填充**: 完美适配目标比例
- **兼容性优化**: H.264编码,支持所有平台
**📊 性能建议:**
- **视频数量**: 1-3个文件最佳速度
- **切片时长**: 2-5秒推荐
- **输出数量**: 1-3个视频最快
- **文件大小**: 单文件<500MB最优
**⚠️ 注意事项:**
- 系统将在30秒内强制完成处理
- 超时会返回已处理的部分结果
- 建议在稳定网络环境下使用
- CPU性能越好处理越快
""")
demo.launch()
if __name__ == "__main__":
main()