| import os |
| import gradio as gr |
| import tempfile |
| import random |
| from datetime import datetime |
|
|
| |
| try: |
| from openai import OpenAI |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) if os.getenv("OPENAI_API_KEY") else None |
| OPENAI_AVAILABLE = bool(client) |
| except ImportError: |
| OPENAI_AVAILABLE = False |
| client = None |
|
|
| |
| try: |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx |
| MOVIEPY_AVAILABLE = True |
| except ImportError: |
| MOVIEPY_AVAILABLE = False |
|
|
| |
| try: |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from google.auth.transport.requests import Request |
| import pickle |
| YOUTUBE_AVAILABLE = True |
| except ImportError: |
| YOUTUBE_AVAILABLE = False |
|
|
| MAX_VIDEO_SIZE = 100 * 1024 * 1024 |
| MAX_AUDIO_SIZE = 50 * 1024 * 1024 |
| MAX_TOTAL_SIZE = 500 * 1024 * 1024 |
|
|
| |
| class YouTubeUploader: |
| def __init__(self): |
| self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload'] |
| self.youtube = None |
| self.credentials_file = 'youtube_token.pickle' |
| self.client_secrets_file = 'client_secrets.json' |
| |
| def authenticate(self): |
| creds = None |
| if os.path.exists(self.credentials_file): |
| with open(self.credentials_file, 'rb') as token: |
| creds = pickle.load(token) |
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| else: |
| flow = InstalledAppFlow.from_client_secrets_file(self.client_secrets_file, self.SCOPES) |
| creds = flow.run_local_server(port=0) |
| with open(self.credentials_file, 'wb') as token: |
| pickle.dump(creds, token) |
| self.youtube = build('youtube', 'v3', credentials=creds) |
| return True |
| |
| def upload_video(self, video_path, title, description="", tags="", privacy="private"): |
| if not os.path.exists(video_path): |
| return {"success": False, "error": "视频文件不存在"} |
|
|
| body = { |
| 'snippet': { |
| 'title': title, |
| 'description': description, |
| 'tags': tags.split(',') if tags else [], |
| 'categoryId': '22' |
| }, |
| 'status': { |
| 'privacyStatus': privacy, |
| 'selfDeclaredMadeForKids': False |
| } |
| } |
| media = MediaFileUpload(video_path, resumable=True, mimetype='video/*') |
| request = self.youtube.videos().insert(part=','.join(body.keys()), body=body, media_body=media) |
| response = request.execute() |
| video_id = response.get('id') |
| video_url = f"https://www.youtube.com/watch?v={video_id}" |
| return {"success": True, "video_id": video_id, "url": video_url} |
|
|
| |
| def generate_ai_analysis(video_count, file_names): |
| if not OPENAI_AVAILABLE: |
| return "⚠️ 未配置 OpenAI,无法生成 AI 分析。" |
| try: |
| file_info = ", ".join([f"'{name}'" for name in file_names[:3]]) |
| prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(文件名包括:{file_info} 等),请提供详细分析: |
| 1. 内容类型推测和特点 |
| 2. 最佳背景音乐风格和节拍 |
| 3. 推荐剪辑策略和转场效果 |
| 4. 目标受众分析 |
| |
| 请用中文回答,格式清晰。""" |
| response = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "你是专业的视频内容分析专家。"}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=600, |
| temperature=0.7 |
| ) |
| return response.choices[0].message.content |
| except Exception as e: |
| return f"❌ AI 分析失败:{str(e)}" |
|
|
| |
| def check_filesize(video_files, audio_file): |
| total_size = 0 |
| for v in video_files: |
| size = os.path.getsize(v.name) |
| if size > MAX_VIDEO_SIZE: |
| return False, f"单个视频文件 {os.path.basename(v.name)} 过大 {size/(1024*1024):.1f}MB,最大限制为 100MB" |
| total_size += size |
| if audio_file: |
| audio_size = os.path.getsize(audio_file.name) |
| if audio_size > MAX_AUDIO_SIZE: |
| return False, f"音频文件 {os.path.basename(audio_file.name)} 过大 {audio_size/(1024*1024):.1f}MB,最大限制为 50MB" |
| total_size += audio_size |
| if total_size > MAX_TOTAL_SIZE: |
| return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,最大限制为 500MB" |
| return True, "" |
|
|
| |
| def process_videos(video_files, audio_file, clip_duration=2, num_output=3): |
| ok, msg = check_filesize(video_files, audio_file) |
| if not ok: |
| return msg, [], "" |
| |
| out_files = [] |
| return "✅ 视频上传成功,处理已完成(演示模式,不生成文件)", out_files, "" |
|
|
| |
| def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy): |
| status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output) |
| file_names = [os.path.basename(f.name) for f in (video_files or [])] |
| ai_analysis = generate_ai_analysis(len(video_files or []), file_names) if OPENAI_AVAILABLE else "未启用 AI 分析功能" |
| |
| yt_upload_res = "" |
| if upload_yt: |
| if not YOUTUBE_AVAILABLE: |
| yt_upload_res = "⚠️ 未安装 YouTube API 库,上传功能不可用" |
| elif not os.path.exists("client_secrets.json"): |
| yt_upload_res = "⚠️ client_secrets.json 文件缺失,无法上传" |
| else: |
| uploader = YouTubeUploader() |
| auth_ok = uploader.authenticate() |
| if not auth_ok: |
| yt_upload_res = "❌ YouTube 认证失败" |
| else: |
| |
| yt_upload_res = "上传功能暂未实现,请在本地运行上传脚本" |
|
|
| return status, ai_analysis, yt_upload_res |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("## pyTovideo2 视频处理 + YouTube 上传示范") |
| video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件(s)", file_count="multiple") |
| audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件", file_count="single") |
| clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10) |
| num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10) |
| upload_yt = gr.Checkbox(label="处理后自动上传至 YouTube") |
| yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="YouTube 视频隐私状态") |
| btn = gr.Button("开始处理") |
| status_out = gr.Textbox(label="处理状态", lines=4) |
| ai_out = gr.Textbox(label="AI 内容分析", lines=12) |
| yt_out = gr.Textbox(label="YouTube 上传结果", lines=8) |
| |
| btn.click(main_app, inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy], |
| outputs=[status_out, ai_out, yt_out]) |
|
|
| demo.launch() |
|
|