import os import gradio as gr import tempfile import random import shutil from datetime import datetime try: from openai import OpenAI client = OpenAI(api_key=os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb")) if os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb") else None OPENAI_AVAILABLE = bool(client) except ImportError: OPENAI_AVAILABLE = False client = None try: from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx MOVIEPY_AVAILABLE = True except ImportError: MOVIEPY_AVAILABLE = False try: from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request import pickle YOUTUBE_AVAILABLE = True except ImportError: YOUTUBE_AVAILABLE = False MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB MAX_AUDIO_SIZE = 50 * 1024 * 1024 # 50MB MAX_TOTAL_SIZE = 500 * 1024 * 1024 # 500MB class YouTubeUploader: def __init__(self): self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload'] self.client_secrets_file = 'client_secrets.json' self.credentials_file = 'youtube_token.pickle' self.youtube = None def authenticate(self): creds = None if os.path.exists(self.credentials_file): with open(self.credentials_file, 'rb') as token: creds = pickle.load(token) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.client_secrets_file, self.SCOPES ) creds = flow.run_local_server(port=0, open_browser=False) print("请复制下面的链接到浏览器打开并完成授权") with open(self.credentials_file, 'wb') as token: pickle.dump(creds, token) self.youtube = build('youtube', 'v3', credentials=creds) return True def upload_video(self, video_path, title, description="", tags="", privacy="private"): if not os.path.exists(video_path): return {"success": False, "error": f"视频文件 {video_path} 不存在"} body = { "snippet": { "title": title, "description": description, "tags": tags.split(",") if tags else [], "categoryId": "22" }, "status": { "privacyStatus": privacy, "selfDeclaredMadeForKids": False } } media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype="video/*") request = self.youtube.videos().insert(part=",".join(body.keys()), body=body, media_body=media) response = request.execute() video_id = response.get("id") url = f"https://www.youtube.com/watch?v={video_id}" return {"success": True, "video_id": video_id, "url": url, "title": title} def check_filesize(video_files, audio_file): total_size = 0 for v in video_files: size = os.path.getsize(v.name) if size > MAX_VIDEO_SIZE: return False, f"单个视频文件 {os.path.basename(v.name)} 过大,超过100MB限制" total_size += size if audio_file: audio_size = os.path.getsize(audio_file.name) if audio_size > MAX_AUDIO_SIZE: return False, f"音频文件 {os.path.basename(audio_file.name)} 过大,超过50MB限制" total_size += audio_size if total_size > MAX_TOTAL_SIZE: return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,超过500MB限制" return True, "" def process_videos(video_files, audio_file, clip_duration, num_output): ok, msg = check_filesize(video_files, audio_file) if not ok: return msg, [], "" out_files = [] try: clips = [] for video in video_files: clip = VideoFileClip(video.name) duration = clip.duration for start in range(0, int(duration), int(clip_duration)): end = min(start + clip_duration, duration) subclip = clip.subclip(start, end).without_audio() clips.append(subclip) random.shuffle(clips) clips_per_video = max(1, len(clips) // num_output) output_dir = tempfile.mkdtemp() for i in range(num_output): start_idx = i * clips_per_video end_idx = len(clips) if i == num_output - 1 else (start_idx + clips_per_video) selected_clips = clips[start_idx:end_idx] if not selected_clips: continue final_clip = concatenate_videoclips(selected_clips) if audio_file: audio_clip = AudioFileClip(audio_file.name) min_duration = min(final_clip.duration, audio_clip.duration) final_clip = final_clip.subclip(0, min_duration) audio_clip = audio_clip.subclip(0, min_duration) final_clip = final_clip.set_audio(audio_clip) output_path = os.path.join(output_dir, f"mixcut_{i+1}.mp4") final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", verbose=False, logger=None) out_files.append(output_path) final_clip.close() shutil.rmtree(output_dir, ignore_errors=True) # 可选,处理完删除临时文件 return f"成功生成 {len(out_files)} 个混剪视频", out_files, output_dir except Exception as e: return f"❌ 视频处理出错: {str(e)}", [], "" def generate_ai_analysis(video_count, file_names): if not OPENAI_AVAILABLE or video_count == 0: return "⚠️ 无视频文件或 AI 配置不可用。" try: file_info = ", ".join([f"'{name}'" for name in file_names[:3]]) prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(包括:{file_info} 等),请提供分析: 1. 内容类型推测 2. 背景音乐建议 3. 剪辑策略 4. 目标用户 用中文回答,格式清晰。""" response = client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "你是视频内容专家。"}, {"role": "user", "content": prompt} ], max_tokens=600, temperature=0.7 ) return response.choices[0].message.content except Exception as e: return f"❌ AI 分析失败: {str(e)}" def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy): status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output) file_names = [os.path.basename(f.name) for f in (video_files or [])] ai_analysis = generate_ai_analysis(len(video_files or []), file_names) yt_result = "" if upload_yt: if not YOUTUBE_AVAILABLE: yt_result = "⚠️ 未安装 YouTube API 库" elif not os.path.exists("client_secrets.json"): yt_result = "⚠️ 缺少 client_secrets.json 文件" else: uploader = YouTubeUploader() if uploader.authenticate(): yt_result = "认证成功,上传功能请手动调用上传接口。" else: yt_result = "❌ 认证失败" return status, ai_analysis, yt_result try: import googleapiclient YOUTUBE_AVAILABLE = True except ImportError: YOUTUBE_AVAILABLE = False with gr.Blocks() as demo: gr.Markdown("## pyTovideo2 完整视频切割混剪 + YouTube上传(需预先配置)") video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件", file_count="multiple") audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件") clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10) num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10) upload_yt = gr.Checkbox(label="处理后上传到 YouTube") yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="选择 YouTube 视频隐私") btn = gr.Button("开始处理") status_out = gr.Textbox(label="处理状态", lines=4, interactive=False) ai_out = gr.Textbox(label="AI 内容分析", lines=12) yt_out = gr.Textbox(label="YouTube 上传状态", lines=6) btn.click( main_app, inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy], outputs=[status_out, ai_out, yt_out] ) demo.launch()