Py / app.py
Ryanus's picture
Update app.py
a96293b verified
raw
history blame
7.57 kB
import os
import gradio as gr
import tempfile
import random
from datetime import datetime
# AI 导入
try:
from openai import OpenAI
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if os.getenv("OPENAI_API_KEY") else None
OPENAI_AVAILABLE = bool(client)
except ImportError:
OPENAI_AVAILABLE = False
client = None
# 视频处理导入(MoviePy)
try:
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx
MOVIEPY_AVAILABLE = True
except ImportError:
MOVIEPY_AVAILABLE = False
# YouTube API 导入
try:
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
YOUTUBE_AVAILABLE = True
except ImportError:
YOUTUBE_AVAILABLE = False
MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB
MAX_AUDIO_SIZE = 50 * 1024 * 1024 # 50MB
MAX_TOTAL_SIZE = 500 * 1024 * 1024 # 500MB
# YouTube 上传类
class YouTubeUploader:
def __init__(self):
self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload']
self.youtube = None
self.credentials_file = 'youtube_token.pickle'
self.client_secrets_file = 'client_secrets.json'
def authenticate(self):
creds = None
if os.path.exists(self.credentials_file):
with open(self.credentials_file, 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(self.client_secrets_file, self.SCOPES)
creds = flow.run_local_server(port=0)
with open(self.credentials_file, 'wb') as token:
pickle.dump(creds, token)
self.youtube = build('youtube', 'v3', credentials=creds)
return True
def upload_video(self, video_path, title, description="", tags="", privacy="private"):
if not os.path.exists(video_path):
return {"success": False, "error": "视频文件不存在"}
body = {
'snippet': {
'title': title,
'description': description,
'tags': tags.split(',') if tags else [],
'categoryId': '22' # People & Blogs
},
'status': {
'privacyStatus': privacy,
'selfDeclaredMadeForKids': False
}
}
media = MediaFileUpload(video_path, resumable=True, mimetype='video/*')
request = self.youtube.videos().insert(part=','.join(body.keys()), body=body, media_body=media)
response = request.execute()
video_id = response.get('id')
video_url = f"https://www.youtube.com/watch?v={video_id}"
return {"success": True, "video_id": video_id, "url": video_url}
# AI 视频分析
def generate_ai_analysis(video_count, file_names):
if not OPENAI_AVAILABLE:
return "⚠️ 未配置 OpenAI,无法生成 AI 分析。"
try:
file_info = ", ".join([f"'{name}'" for name in file_names[:3]])
prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(文件名包括:{file_info} 等),请提供详细分析:
1. 内容类型推测和特点
2. 最佳背景音乐风格和节拍
3. 推荐剪辑策略和转场效果
4. 目标受众分析
请用中文回答,格式清晰。"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是专业的视频内容分析专家。"},
{"role": "user", "content": prompt}
],
max_tokens=600,
temperature=0.7
)
return response.choices[0].message.content
except Exception as e:
return f"❌ AI 分析失败:{str(e)}"
# 视频文件大小检查
def check_filesize(video_files, audio_file):
total_size = 0
for v in video_files:
size = os.path.getsize(v.name)
if size > MAX_VIDEO_SIZE:
return False, f"单个视频文件 {os.path.basename(v.name)} 过大 {size/(1024*1024):.1f}MB,最大限制为 100MB"
total_size += size
if audio_file:
audio_size = os.path.getsize(audio_file.name)
if audio_size > MAX_AUDIO_SIZE:
return False, f"音频文件 {os.path.basename(audio_file.name)} 过大 {audio_size/(1024*1024):.1f}MB,最大限制为 50MB"
total_size += audio_size
if total_size > MAX_TOTAL_SIZE:
return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,最大限制为 500MB"
return True, ""
# 视频处理示范(简化,未完整实现)
def process_videos(video_files, audio_file, clip_duration=2, num_output=3):
ok, msg = check_filesize(video_files, audio_file)
if not ok:
return msg, [], ""
# 这里可以使用 MoviePy 处理视频,当前仅模拟
out_files = [] # 真实项目中应为生成的视频路径列表
return "✅ 视频上传成功,处理已完成(演示模式,不生成文件)", out_files, ""
# Gradio 主界面
def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy):
status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output)
file_names = [os.path.basename(f.name) for f in (video_files or [])]
ai_analysis = generate_ai_analysis(len(video_files or []), file_names) if OPENAI_AVAILABLE else "未启用 AI 分析功能"
yt_upload_res = ""
if upload_yt:
if not YOUTUBE_AVAILABLE:
yt_upload_res = "⚠️ 未安装 YouTube API 库,上传功能不可用"
elif not os.path.exists("client_secrets.json"):
yt_upload_res = "⚠️ client_secrets.json 文件缺失,无法上传"
else:
uploader = YouTubeUploader()
auth_ok = uploader.authenticate()
if not auth_ok:
yt_upload_res = "❌ YouTube 认证失败"
else:
# 这里示范上传已生成文件,示例为空,需替换为真实文件路径
yt_upload_res = "上传功能暂未实现,请在本地运行上传脚本"
return status, ai_analysis, yt_upload_res
with gr.Blocks() as demo:
gr.Markdown("## pyTovideo2 视频处理 + YouTube 上传示范")
video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件(s)", file_count="multiple")
audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件", file_count="single")
clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10)
num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10)
upload_yt = gr.Checkbox(label="处理后自动上传至 YouTube")
yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="YouTube 视频隐私状态")
btn = gr.Button("开始处理")
status_out = gr.Textbox(label="处理状态", lines=4)
ai_out = gr.Textbox(label="AI 内容分析", lines=12)
yt_out = gr.Textbox(label="YouTube 上传结果", lines=8)
btn.click(main_app, inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy],
outputs=[status_out, ai_out, yt_out])
demo.launch()