|
|
import os |
|
|
import gradio as gr |
|
|
import tempfile |
|
|
import random |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
|
|
|
try: |
|
|
from openai import OpenAI |
|
|
client = OpenAI(api_key=os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb")) if os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb") else None |
|
|
OPENAI_AVAILABLE = bool(client) |
|
|
except ImportError: |
|
|
OPENAI_AVAILABLE = False |
|
|
client = None |
|
|
|
|
|
try: |
|
|
from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx |
|
|
MOVIEPY_AVAILABLE = True |
|
|
except ImportError: |
|
|
MOVIEPY_AVAILABLE = False |
|
|
|
|
|
try: |
|
|
from googleapiclient.discovery import build |
|
|
from googleapiclient.http import MediaFileUpload |
|
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
|
from google.auth.transport.requests import Request |
|
|
import pickle |
|
|
YOUTUBE_AVAILABLE = True |
|
|
except ImportError: |
|
|
YOUTUBE_AVAILABLE = False |
|
|
|
|
|
MAX_VIDEO_SIZE = 100 * 1024 * 1024 |
|
|
MAX_AUDIO_SIZE = 50 * 1024 * 1024 |
|
|
MAX_TOTAL_SIZE = 500 * 1024 * 1024 |
|
|
|
|
|
class YouTubeUploader: |
|
|
def __init__(self): |
|
|
self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload'] |
|
|
self.client_secrets_file = 'client_secrets.json' |
|
|
self.credentials_file = 'youtube_token.pickle' |
|
|
self.youtube = None |
|
|
|
|
|
def authenticate(self): |
|
|
creds = None |
|
|
if os.path.exists(self.credentials_file): |
|
|
with open(self.credentials_file, 'rb') as token: |
|
|
creds = pickle.load(token) |
|
|
if not creds or not creds.valid: |
|
|
if creds and creds.expired and creds.refresh_token: |
|
|
creds.refresh(Request()) |
|
|
else: |
|
|
flow = InstalledAppFlow.from_client_secrets_file( |
|
|
self.client_secrets_file, self.SCOPES |
|
|
) |
|
|
creds = flow.run_local_server(port=0, open_browser=False) |
|
|
print("请复制下面的链接到浏览器打开并完成授权") |
|
|
with open(self.credentials_file, 'wb') as token: |
|
|
pickle.dump(creds, token) |
|
|
self.youtube = build('youtube', 'v3', credentials=creds) |
|
|
return True |
|
|
|
|
|
def upload_video(self, video_path, title, description="", tags="", privacy="private"): |
|
|
if not os.path.exists(video_path): |
|
|
return {"success": False, "error": f"视频文件 {video_path} 不存在"} |
|
|
|
|
|
body = { |
|
|
"snippet": { |
|
|
"title": title, |
|
|
"description": description, |
|
|
"tags": tags.split(",") if tags else [], |
|
|
"categoryId": "22" |
|
|
}, |
|
|
"status": { |
|
|
"privacyStatus": privacy, |
|
|
"selfDeclaredMadeForKids": False |
|
|
} |
|
|
} |
|
|
|
|
|
media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype="video/*") |
|
|
request = self.youtube.videos().insert(part=",".join(body.keys()), body=body, media_body=media) |
|
|
|
|
|
response = request.execute() |
|
|
video_id = response.get("id") |
|
|
url = f"https://www.youtube.com/watch?v={video_id}" |
|
|
return {"success": True, "video_id": video_id, "url": url, "title": title} |
|
|
|
|
|
def check_filesize(video_files, audio_file): |
|
|
total_size = 0 |
|
|
for v in video_files: |
|
|
size = os.path.getsize(v.name) |
|
|
if size > MAX_VIDEO_SIZE: |
|
|
return False, f"单个视频文件 {os.path.basename(v.name)} 过大,超过100MB限制" |
|
|
total_size += size |
|
|
if audio_file: |
|
|
audio_size = os.path.getsize(audio_file.name) |
|
|
if audio_size > MAX_AUDIO_SIZE: |
|
|
return False, f"音频文件 {os.path.basename(audio_file.name)} 过大,超过50MB限制" |
|
|
total_size += audio_size |
|
|
if total_size > MAX_TOTAL_SIZE: |
|
|
return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,超过500MB限制" |
|
|
return True, "" |
|
|
|
|
|
def process_videos(video_files, audio_file, clip_duration, num_output): |
|
|
ok, msg = check_filesize(video_files, audio_file) |
|
|
if not ok: |
|
|
return msg, [], "" |
|
|
|
|
|
out_files = [] |
|
|
try: |
|
|
clips = [] |
|
|
for video in video_files: |
|
|
clip = VideoFileClip(video.name) |
|
|
duration = clip.duration |
|
|
|
|
|
for start in range(0, int(duration), int(clip_duration)): |
|
|
end = min(start + clip_duration, duration) |
|
|
subclip = clip.subclip(start, end).without_audio() |
|
|
clips.append(subclip) |
|
|
|
|
|
random.shuffle(clips) |
|
|
|
|
|
clips_per_video = max(1, len(clips) // num_output) |
|
|
output_dir = tempfile.mkdtemp() |
|
|
|
|
|
for i in range(num_output): |
|
|
start_idx = i * clips_per_video |
|
|
end_idx = len(clips) if i == num_output - 1 else (start_idx + clips_per_video) |
|
|
selected_clips = clips[start_idx:end_idx] |
|
|
if not selected_clips: |
|
|
continue |
|
|
|
|
|
final_clip = concatenate_videoclips(selected_clips) |
|
|
|
|
|
if audio_file: |
|
|
audio_clip = AudioFileClip(audio_file.name) |
|
|
min_duration = min(final_clip.duration, audio_clip.duration) |
|
|
final_clip = final_clip.subclip(0, min_duration) |
|
|
audio_clip = audio_clip.subclip(0, min_duration) |
|
|
final_clip = final_clip.set_audio(audio_clip) |
|
|
|
|
|
output_path = os.path.join(output_dir, f"mixcut_{i+1}.mp4") |
|
|
final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", verbose=False, logger=None) |
|
|
out_files.append(output_path) |
|
|
final_clip.close() |
|
|
|
|
|
shutil.rmtree(output_dir, ignore_errors=True) |
|
|
|
|
|
return f"成功生成 {len(out_files)} 个混剪视频", out_files, output_dir |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ 视频处理出错: {str(e)}", [], "" |
|
|
|
|
|
def generate_ai_analysis(video_count, file_names): |
|
|
if not OPENAI_AVAILABLE or video_count == 0: |
|
|
return "⚠️ 无视频文件或 AI 配置不可用。" |
|
|
try: |
|
|
file_info = ", ".join([f"'{name}'" for name in file_names[:3]]) |
|
|
prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(包括:{file_info} 等),请提供分析: |
|
|
1. 内容类型推测 |
|
|
2. 背景音乐建议 |
|
|
3. 剪辑策略 |
|
|
4. 目标用户 |
|
|
|
|
|
用中文回答,格式清晰。""" |
|
|
response = client.chat.completions.create( |
|
|
model="gpt-4o-mini", |
|
|
messages=[ |
|
|
{"role": "system", "content": "你是视频内容专家。"}, |
|
|
{"role": "user", "content": prompt} |
|
|
], |
|
|
max_tokens=600, |
|
|
temperature=0.7 |
|
|
) |
|
|
return response.choices[0].message.content |
|
|
except Exception as e: |
|
|
return f"❌ AI 分析失败: {str(e)}" |
|
|
|
|
|
def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy): |
|
|
status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output) |
|
|
|
|
|
file_names = [os.path.basename(f.name) for f in (video_files or [])] |
|
|
ai_analysis = generate_ai_analysis(len(video_files or []), file_names) |
|
|
|
|
|
yt_result = "" |
|
|
if upload_yt: |
|
|
if not YOUTUBE_AVAILABLE: |
|
|
yt_result = "⚠️ 未安装 YouTube API 库" |
|
|
elif not os.path.exists("client_secrets.json"): |
|
|
yt_result = "⚠️ 缺少 client_secrets.json 文件" |
|
|
else: |
|
|
uploader = YouTubeUploader() |
|
|
if uploader.authenticate(): |
|
|
yt_result = "认证成功,上传功能请手动调用上传接口。" |
|
|
else: |
|
|
yt_result = "❌ 认证失败" |
|
|
|
|
|
return status, ai_analysis, yt_result |
|
|
|
|
|
try: |
|
|
import googleapiclient |
|
|
YOUTUBE_AVAILABLE = True |
|
|
except ImportError: |
|
|
YOUTUBE_AVAILABLE = False |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("## pyTovideo2 完整视频切割混剪 + YouTube上传(需预先配置)") |
|
|
|
|
|
video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件", file_count="multiple") |
|
|
audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件") |
|
|
clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10) |
|
|
num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10) |
|
|
upload_yt = gr.Checkbox(label="处理后上传到 YouTube") |
|
|
yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="选择 YouTube 视频隐私") |
|
|
btn = gr.Button("开始处理") |
|
|
|
|
|
status_out = gr.Textbox(label="处理状态", lines=4, interactive=False) |
|
|
ai_out = gr.Textbox(label="AI 内容分析", lines=12) |
|
|
yt_out = gr.Textbox(label="YouTube 上传状态", lines=6) |
|
|
|
|
|
btn.click( |
|
|
main_app, |
|
|
inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy], |
|
|
outputs=[status_out, ai_out, yt_out] |
|
|
) |
|
|
|
|
|
demo.launch() |
|
|
|