| import os |
| import gradio as gr |
| import tempfile |
| import random |
| import shutil |
| from datetime import datetime |
|
|
| try: |
| from openai import OpenAI |
| client = OpenAI(api_key=os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb")) if os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb") else None |
| OPENAI_AVAILABLE = bool(client) |
| except ImportError: |
| OPENAI_AVAILABLE = False |
| client = None |
|
|
| try: |
| from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx |
| MOVIEPY_AVAILABLE = True |
| except ImportError: |
| MOVIEPY_AVAILABLE = False |
|
|
| try: |
| from googleapiclient.discovery import build |
| from googleapiclient.http import MediaFileUpload |
| from google_auth_oauthlib.flow import InstalledAppFlow |
| from google.auth.transport.requests import Request |
| import pickle |
| YOUTUBE_AVAILABLE = True |
| except ImportError: |
| YOUTUBE_AVAILABLE = False |
|
|
| MAX_VIDEO_SIZE = 100 * 1024 * 1024 |
| MAX_AUDIO_SIZE = 50 * 1024 * 1024 |
| MAX_TOTAL_SIZE = 500 * 1024 * 1024 |
|
|
| class YouTubeUploader: |
| def __init__(self): |
| self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload'] |
| self.client_secrets_file = 'client_secrets.json' |
| self.credentials_file = 'youtube_token.pickle' |
| self.youtube = None |
|
|
| def authenticate(self): |
| creds = None |
| if os.path.exists(self.credentials_file): |
| with open(self.credentials_file, 'rb') as token: |
| creds = pickle.load(token) |
| if not creds or not creds.valid: |
| if creds and creds.expired and creds.refresh_token: |
| creds.refresh(Request()) |
| else: |
| flow = InstalledAppFlow.from_client_secrets_file( |
| self.client_secrets_file, self.SCOPES |
| ) |
| creds = flow.run_local_server(port=0, open_browser=False) |
| print("请复制下面的链接到浏览器打开并完成授权") |
| with open(self.credentials_file, 'wb') as token: |
| pickle.dump(creds, token) |
| self.youtube = build('youtube', 'v3', credentials=creds) |
| return True |
|
|
| def upload_video(self, video_path, title, description="", tags="", privacy="private"): |
| if not os.path.exists(video_path): |
| return {"success": False, "error": f"视频文件 {video_path} 不存在"} |
|
|
| body = { |
| "snippet": { |
| "title": title, |
| "description": description, |
| "tags": tags.split(",") if tags else [], |
| "categoryId": "22" |
| }, |
| "status": { |
| "privacyStatus": privacy, |
| "selfDeclaredMadeForKids": False |
| } |
| } |
|
|
| media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype="video/*") |
| request = self.youtube.videos().insert(part=",".join(body.keys()), body=body, media_body=media) |
|
|
| response = request.execute() |
| video_id = response.get("id") |
| url = f"https://www.youtube.com/watch?v={video_id}" |
| return {"success": True, "video_id": video_id, "url": url, "title": title} |
|
|
| def check_filesize(video_files, audio_file): |
| total_size = 0 |
| for v in video_files: |
| size = os.path.getsize(v.name) |
| if size > MAX_VIDEO_SIZE: |
| return False, f"单个视频文件 {os.path.basename(v.name)} 过大,超过100MB限制" |
| total_size += size |
| if audio_file: |
| audio_size = os.path.getsize(audio_file.name) |
| if audio_size > MAX_AUDIO_SIZE: |
| return False, f"音频文件 {os.path.basename(audio_file.name)} 过大,超过50MB限制" |
| total_size += audio_size |
| if total_size > MAX_TOTAL_SIZE: |
| return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,超过500MB限制" |
| return True, "" |
|
|
| def process_videos(video_files, audio_file, clip_duration, num_output): |
| ok, msg = check_filesize(video_files, audio_file) |
| if not ok: |
| return msg, [], "" |
|
|
| out_files = [] |
| try: |
| clips = [] |
| for video in video_files: |
| clip = VideoFileClip(video.name) |
| duration = clip.duration |
|
|
| for start in range(0, int(duration), int(clip_duration)): |
| end = min(start + clip_duration, duration) |
| subclip = clip.subclip(start, end).without_audio() |
| clips.append(subclip) |
|
|
| random.shuffle(clips) |
|
|
| clips_per_video = max(1, len(clips) // num_output) |
| output_dir = tempfile.mkdtemp() |
|
|
| for i in range(num_output): |
| start_idx = i * clips_per_video |
| end_idx = len(clips) if i == num_output - 1 else (start_idx + clips_per_video) |
| selected_clips = clips[start_idx:end_idx] |
| if not selected_clips: |
| continue |
|
|
| final_clip = concatenate_videoclips(selected_clips) |
|
|
| if audio_file: |
| audio_clip = AudioFileClip(audio_file.name) |
| min_duration = min(final_clip.duration, audio_clip.duration) |
| final_clip = final_clip.subclip(0, min_duration) |
| audio_clip = audio_clip.subclip(0, min_duration) |
| final_clip = final_clip.set_audio(audio_clip) |
|
|
| output_path = os.path.join(output_dir, f"mixcut_{i+1}.mp4") |
| final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", verbose=False, logger=None) |
| out_files.append(output_path) |
| final_clip.close() |
|
|
| shutil.rmtree(output_dir, ignore_errors=True) |
|
|
| return f"成功生成 {len(out_files)} 个混剪视频", out_files, output_dir |
|
|
| except Exception as e: |
| return f"❌ 视频处理出错: {str(e)}", [], "" |
|
|
| def generate_ai_analysis(video_count, file_names): |
| if not OPENAI_AVAILABLE or video_count == 0: |
| return "⚠️ 无视频文件或 AI 配置不可用。" |
| try: |
| file_info = ", ".join([f"'{name}'" for name in file_names[:3]]) |
| prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(包括:{file_info} 等),请提供分析: |
| 1. 内容类型推测 |
| 2. 背景音乐建议 |
| 3. 剪辑策略 |
| 4. 目标用户 |
| |
| 用中文回答,格式清晰。""" |
| response = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "你是视频内容专家。"}, |
| {"role": "user", "content": prompt} |
| ], |
| max_tokens=600, |
| temperature=0.7 |
| ) |
| return response.choices[0].message.content |
| except Exception as e: |
| return f"❌ AI 分析失败: {str(e)}" |
|
|
| def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy): |
| status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output) |
|
|
| file_names = [os.path.basename(f.name) for f in (video_files or [])] |
| ai_analysis = generate_ai_analysis(len(video_files or []), file_names) |
|
|
| yt_result = "" |
| if upload_yt: |
| if not YOUTUBE_AVAILABLE: |
| yt_result = "⚠️ 未安装 YouTube API 库" |
| elif not os.path.exists("client_secrets.json"): |
| yt_result = "⚠️ 缺少 client_secrets.json 文件" |
| else: |
| uploader = YouTubeUploader() |
| if uploader.authenticate(): |
| yt_result = "认证成功,上传功能请手动调用上传接口。" |
| else: |
| yt_result = "❌ 认证失败" |
|
|
| return status, ai_analysis, yt_result |
|
|
| try: |
| import googleapiclient |
| YOUTUBE_AVAILABLE = True |
| except ImportError: |
| YOUTUBE_AVAILABLE = False |
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("## pyTovideo2 完整视频切割混剪 + YouTube上传(需预先配置)") |
|
|
| video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件", file_count="multiple") |
| audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件") |
| clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10) |
| num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10) |
| upload_yt = gr.Checkbox(label="处理后上传到 YouTube") |
| yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="选择 YouTube 视频隐私") |
| btn = gr.Button("开始处理") |
|
|
| status_out = gr.Textbox(label="处理状态", lines=4, interactive=False) |
| ai_out = gr.Textbox(label="AI 内容分析", lines=12) |
| yt_out = gr.Textbox(label="YouTube 上传状态", lines=6) |
|
|
| btn.click( |
| main_app, |
| inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy], |
| outputs=[status_out, ai_out, yt_out] |
| ) |
|
|
| demo.launch() |
|
|