Py / app.py
Ryanus's picture
Update app.py
310da0f verified
raw
history blame
8.87 kB
import os
import gradio as gr
import tempfile
import random
import shutil
from datetime import datetime
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb")) if os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb") else None
OPENAI_AVAILABLE = bool(client)
except ImportError:
OPENAI_AVAILABLE = False
client = None
try:
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx
MOVIEPY_AVAILABLE = True
except ImportError:
MOVIEPY_AVAILABLE = False
try:
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
YOUTUBE_AVAILABLE = True
except ImportError:
YOUTUBE_AVAILABLE = False
MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB
MAX_AUDIO_SIZE = 50 * 1024 * 1024 # 50MB
MAX_TOTAL_SIZE = 500 * 1024 * 1024 # 500MB
class YouTubeUploader:
def __init__(self):
self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload']
self.client_secrets_file = 'client_secrets.json'
self.credentials_file = 'youtube_token.pickle'
self.youtube = None
def authenticate(self):
creds = None
if os.path.exists(self.credentials_file):
with open(self.credentials_file, 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.client_secrets_file, self.SCOPES
)
creds = flow.run_local_server(port=0, open_browser=False)
print("请复制下面的链接到浏览器打开并完成授权")
with open(self.credentials_file, 'wb') as token:
pickle.dump(creds, token)
self.youtube = build('youtube', 'v3', credentials=creds)
return True
def upload_video(self, video_path, title, description="", tags="", privacy="private"):
if not os.path.exists(video_path):
return {"success": False, "error": f"视频文件 {video_path} 不存在"}
body = {
"snippet": {
"title": title,
"description": description,
"tags": tags.split(",") if tags else [],
"categoryId": "22"
},
"status": {
"privacyStatus": privacy,
"selfDeclaredMadeForKids": False
}
}
media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype="video/*")
request = self.youtube.videos().insert(part=",".join(body.keys()), body=body, media_body=media)
response = request.execute()
video_id = response.get("id")
url = f"https://www.youtube.com/watch?v={video_id}"
return {"success": True, "video_id": video_id, "url": url, "title": title}
def check_filesize(video_files, audio_file):
total_size = 0
for v in video_files:
size = os.path.getsize(v.name)
if size > MAX_VIDEO_SIZE:
return False, f"单个视频文件 {os.path.basename(v.name)} 过大,超过100MB限制"
total_size += size
if audio_file:
audio_size = os.path.getsize(audio_file.name)
if audio_size > MAX_AUDIO_SIZE:
return False, f"音频文件 {os.path.basename(audio_file.name)} 过大,超过50MB限制"
total_size += audio_size
if total_size > MAX_TOTAL_SIZE:
return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,超过500MB限制"
return True, ""
def process_videos(video_files, audio_file, clip_duration, num_output):
ok, msg = check_filesize(video_files, audio_file)
if not ok:
return msg, [], ""
out_files = []
try:
clips = []
for video in video_files:
clip = VideoFileClip(video.name)
duration = clip.duration
for start in range(0, int(duration), int(clip_duration)):
end = min(start + clip_duration, duration)
subclip = clip.subclip(start, end).without_audio()
clips.append(subclip)
random.shuffle(clips)
clips_per_video = max(1, len(clips) // num_output)
output_dir = tempfile.mkdtemp()
for i in range(num_output):
start_idx = i * clips_per_video
end_idx = len(clips) if i == num_output - 1 else (start_idx + clips_per_video)
selected_clips = clips[start_idx:end_idx]
if not selected_clips:
continue
final_clip = concatenate_videoclips(selected_clips)
if audio_file:
audio_clip = AudioFileClip(audio_file.name)
min_duration = min(final_clip.duration, audio_clip.duration)
final_clip = final_clip.subclip(0, min_duration)
audio_clip = audio_clip.subclip(0, min_duration)
final_clip = final_clip.set_audio(audio_clip)
output_path = os.path.join(output_dir, f"mixcut_{i+1}.mp4")
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", verbose=False, logger=None)
out_files.append(output_path)
final_clip.close()
shutil.rmtree(output_dir, ignore_errors=True) # 可选,处理完删除临时文件
return f"成功生成 {len(out_files)} 个混剪视频", out_files, output_dir
except Exception as e:
return f"❌ 视频处理出错: {str(e)}", [], ""
def generate_ai_analysis(video_count, file_names):
if not OPENAI_AVAILABLE or video_count == 0:
return "⚠️ 无视频文件或 AI 配置不可用。"
try:
file_info = ", ".join([f"'{name}'" for name in file_names[:3]])
prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(包括:{file_info} 等),请提供分析:
1. 内容类型推测
2. 背景音乐建议
3. 剪辑策略
4. 目标用户
用中文回答,格式清晰。"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是视频内容专家。"},
{"role": "user", "content": prompt}
],
max_tokens=600,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
return f"❌ AI 分析失败: {str(e)}"
def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy):
status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output)
file_names = [os.path.basename(f.name) for f in (video_files or [])]
ai_analysis = generate_ai_analysis(len(video_files or []), file_names)
yt_result = ""
if upload_yt:
if not YOUTUBE_AVAILABLE:
yt_result = "⚠️ 未安装 YouTube API 库"
elif not os.path.exists("client_secrets.json"):
yt_result = "⚠️ 缺少 client_secrets.json 文件"
else:
uploader = YouTubeUploader()
if uploader.authenticate():
yt_result = "认证成功,上传功能请手动调用上传接口。"
else:
yt_result = "❌ 认证失败"
return status, ai_analysis, yt_result
try:
import googleapiclient
YOUTUBE_AVAILABLE = True
except ImportError:
YOUTUBE_AVAILABLE = False
with gr.Blocks() as demo:
gr.Markdown("## pyTovideo2 完整视频切割混剪 + YouTube上传(需预先配置)")
video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件", file_count="multiple")
audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件")
clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10)
num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10)
upload_yt = gr.Checkbox(label="处理后上传到 YouTube")
yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="选择 YouTube 视频隐私")
btn = gr.Button("开始处理")
status_out = gr.Textbox(label="处理状态", lines=4, interactive=False)
ai_out = gr.Textbox(label="AI 内容分析", lines=12)
yt_out = gr.Textbox(label="YouTube 上传状态", lines=6)
btn.click(
main_app,
inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy],
outputs=[status_out, ai_out, yt_out]
)
demo.launch()