Ryanus commited on
Commit
7762994
·
verified ·
1 Parent(s): 0439da2

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +87 -222
app.py CHANGED
@@ -1,226 +1,91 @@
1
  import os
2
- import gradio as gr
3
- import tempfile
4
- import random
5
- import shutil
6
- from datetime import datetime
7
-
8
- try:
9
- from openai import OpenAI
10
- client = OpenAI(api_key=os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb")) if os.getenv("sk-ZYyxdh3NexC863OUdicSlKrunXEPlRlsLBMwc1vXMGEVwZdb") else None
11
- OPENAI_AVAILABLE = bool(client)
12
- except ImportError:
13
- OPENAI_AVAILABLE = False
14
- client = None
15
-
16
- try:
17
- from moviepy.editor import VideoFileClip, concatenate_videoclips, AudioFileClip, vfx
18
- MOVIEPY_AVAILABLE = True
19
- except ImportError:
20
- MOVIEPY_AVAILABLE = False
21
-
22
- try:
23
- from googleapiclient.discovery import build
24
- from googleapiclient.http import MediaFileUpload
25
- from google_auth_oauthlib.flow import InstalledAppFlow
26
- from google.auth.transport.requests import Request
27
- import pickle
28
- YOUTUBE_AVAILABLE = True
29
- except ImportError:
30
- YOUTUBE_AVAILABLE = False
31
-
32
- MAX_VIDEO_SIZE = 100 * 1024 * 1024 # 100MB
33
- MAX_AUDIO_SIZE = 50 * 1024 * 1024 # 50MB
34
- MAX_TOTAL_SIZE = 500 * 1024 * 1024 # 500MB
35
-
36
- class YouTubeUploader:
37
- def __init__(self):
38
- self.SCOPES = ['https://www.googleapis.com/auth/youtube.upload']
39
- self.client_secrets_file = 'client_secrets.json'
40
- self.credentials_file = 'youtube_token.pickle'
41
- self.youtube = None
42
-
43
- def authenticate(self):
44
- creds = None
45
- if os.path.exists(self.credentials_file):
46
- with open(self.credentials_file, 'rb') as token:
47
- creds = pickle.load(token)
48
- if not creds or not creds.valid:
49
- if creds and creds.expired and creds.refresh_token:
50
- creds.refresh(Request())
51
- else:
52
- flow = InstalledAppFlow.from_client_secrets_file(
53
- self.client_secrets_file, self.SCOPES
54
- )
55
- creds = flow.run_local_server(port=0, open_browser=False)
56
- print("请复制下面的链接到浏览器打开并完成授权")
57
- with open(self.credentials_file, 'wb') as token:
58
- pickle.dump(creds, token)
59
- self.youtube = build('youtube', 'v3', credentials=creds)
60
- return True
61
-
62
- def upload_video(self, video_path, title, description="", tags="", privacy="private"):
63
- if not os.path.exists(video_path):
64
- return {"success": False, "error": f"视频文件 {video_path} 不存在"}
65
-
66
- body = {
67
- "snippet": {
68
- "title": title,
69
- "description": description,
70
- "tags": tags.split(",") if tags else [],
71
- "categoryId": "22"
72
- },
73
- "status": {
74
- "privacyStatus": privacy,
75
- "selfDeclaredMadeForKids": False
76
- }
77
- }
78
-
79
- media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype="video/*")
80
- request = self.youtube.videos().insert(part=",".join(body.keys()), body=body, media_body=media)
81
-
82
- response = request.execute()
83
- video_id = response.get("id")
84
- url = f"https://www.youtube.com/watch?v={video_id}"
85
- return {"success": True, "video_id": video_id, "url": url, "title": title}
86
-
87
- def check_filesize(video_files, audio_file):
88
- total_size = 0
89
- for v in video_files:
90
- size = os.path.getsize(v.name)
91
- if size > MAX_VIDEO_SIZE:
92
- return False, f"单个视频文件 {os.path.basename(v.name)} 过大,超过100MB限制"
93
- total_size += size
94
- if audio_file:
95
- audio_size = os.path.getsize(audio_file.name)
96
- if audio_size > MAX_AUDIO_SIZE:
97
- return False, f"音频文件 {os.path.basename(audio_file.name)} 过大,超过50MB限制"
98
- total_size += audio_size
99
- if total_size > MAX_TOTAL_SIZE:
100
- return False, f"上传文件总大小过大 {total_size/(1024*1024):.1f}MB,超过500MB限制"
101
- return True, ""
102
-
103
- def process_videos(video_files, audio_file, clip_duration, num_output):
104
- ok, msg = check_filesize(video_files, audio_file)
105
- if not ok:
106
- return msg, [], ""
107
-
108
- out_files = []
109
- try:
110
- clips = []
111
- for video in video_files:
112
- clip = VideoFileClip(video.name)
113
- duration = clip.duration
114
-
115
- for start in range(0, int(duration), int(clip_duration)):
116
- end = min(start + clip_duration, duration)
117
- subclip = clip.subclip(start, end).without_audio()
118
- clips.append(subclip)
119
-
120
- random.shuffle(clips)
121
-
122
- clips_per_video = max(1, len(clips) // num_output)
123
- output_dir = tempfile.mkdtemp()
124
-
125
- for i in range(num_output):
126
- start_idx = i * clips_per_video
127
- end_idx = len(clips) if i == num_output - 1 else (start_idx + clips_per_video)
128
- selected_clips = clips[start_idx:end_idx]
129
- if not selected_clips:
130
- continue
131
-
132
- final_clip = concatenate_videoclips(selected_clips)
133
-
134
- if audio_file:
135
- audio_clip = AudioFileClip(audio_file.name)
136
- min_duration = min(final_clip.duration, audio_clip.duration)
137
- final_clip = final_clip.subclip(0, min_duration)
138
- audio_clip = audio_clip.subclip(0, min_duration)
139
- final_clip = final_clip.set_audio(audio_clip)
140
-
141
- output_path = os.path.join(output_dir, f"mixcut_{i+1}.mp4")
142
- final_clip.write_videofile(output_path, codec="libx264", audio_codec="aac", verbose=False, logger=None)
143
- out_files.append(output_path)
144
- final_clip.close()
145
-
146
- shutil.rmtree(output_dir, ignore_errors=True) # 可选,处理完删除临时文件
147
-
148
- return f"成功生成 {len(out_files)} 个混剪视频", out_files, output_dir
149
-
150
- except Exception as e:
151
- return f"❌ 视频处理出错: {str(e)}", [], ""
152
-
153
- def generate_ai_analysis(video_count, file_names):
154
- if not OPENAI_AVAILABLE or video_count == 0:
155
- return "⚠️ 无视频文件或 AI 配置不可用。"
156
- try:
157
- file_info = ", ".join([f"'{name}'" for name in file_names[:3]])
158
- prompt = f"""作为专业视频分析师,基于用户上传的 {video_count} 个视频文件(包括:{file_info} 等),请提供分析:
159
- 1. 内容类型推测
160
- 2. 背景音乐建议
161
- 3. 剪辑策略
162
- 4. 目标用户
163
-
164
- 用中文回答,格式清晰。"""
165
- response = client.chat.completions.create(
166
- model="gpt-4o-mini",
167
- messages=[
168
- {"role": "system", "content": "你是视频内容专家。"},
169
- {"role": "user", "content": prompt}
170
- ],
171
- max_tokens=600,
172
- temperature=0.7
173
- )
174
- return response.choices[0].message.content
175
- except Exception as e:
176
- return f"❌ AI 分析失败: {str(e)}"
177
-
178
- def main_app(video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy):
179
- status, outfiles, _ = process_videos(video_files, audio_file, clip_duration, num_output)
180
-
181
- file_names = [os.path.basename(f.name) for f in (video_files or [])]
182
- ai_analysis = generate_ai_analysis(len(video_files or []), file_names)
183
-
184
- yt_result = ""
185
- if upload_yt:
186
- if not YOUTUBE_AVAILABLE:
187
- yt_result = "⚠️ 未安装 YouTube API 库"
188
- elif not os.path.exists("client_secrets.json"):
189
- yt_result = "⚠️ 缺少 client_secrets.json 文件"
190
  else:
191
- uploader = YouTubeUploader()
192
- if uploader.authenticate():
193
- yt_result = "认证成功,上传功能请手动调用上传接口。"
194
- else:
195
- yt_result = "❌ 认证失败"
196
-
197
- return status, ai_analysis, yt_result
198
-
199
- try:
200
- import googleapiclient
201
- YOUTUBE_AVAILABLE = True
202
- except ImportError:
203
- YOUTUBE_AVAILABLE = False
204
-
205
- with gr.Blocks() as demo:
206
- gr.Markdown("## pyTovideo2 完整视频切割混剪 + YouTube上传(需预先配置)")
207
-
208
- video_files = gr.File(file_types=[".mp4", ".mov", ".avi"], label="上传视频文件", file_count="multiple")
209
- audio_file = gr.File(file_types=[".mp3", ".wav", ".m4a"], label="上传音频文件")
210
- clip_duration = gr.Number(value=2, label="切片时长 (秒)", minimum=1, maximum=10)
211
- num_output = gr.Number(value=3, label="输出视频数量", minimum=1, maximum=10)
212
- upload_yt = gr.Checkbox(label="处理后上传到 YouTube")
213
- yt_privacy = gr.Dropdown(choices=["public", "private", "unlisted"], value="private", label="选择 YouTube 视频隐私")
214
- btn = gr.Button("开始处理")
215
-
216
- status_out = gr.Textbox(label="处理状态", lines=4, interactive=False)
217
- ai_out = gr.Textbox(label="AI 内容分析", lines=12)
218
- yt_out = gr.Textbox(label="YouTube 上传状态", lines=6)
219
-
220
- btn.click(
221
- main_app,
222
- inputs=[video_files, audio_file, clip_duration, num_output, upload_yt, yt_privacy],
223
- outputs=[status_out, ai_out, yt_out]
224
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
 
226
- demo.launch()
 
 
1
  import os
2
+ import pickle
3
+ from googleapiclient.discovery import build
4
+ from googleapiclient.http import MediaFileUpload
5
+ from google_auth_oauthlib.flow import InstalledAppFlow
6
+ from google.auth.transport.requests import Request
7
+
8
+ CLIENT_SECRETS_FILE = 'client_secrets.json'
9
+ CREDENTIALS_FILE = 'youtube_token.pickle'
10
+ SCOPES = ['https://www.googleapis.com/auth/youtube.upload']
11
+
12
+ def authenticate_youtube():
13
+ creds = None
14
+ if os.path.exists(CREDENTIALS_FILE):
15
+ with open(CREDENTIALS_FILE, 'rb') as token:
16
+ creds = pickle.load(token)
17
+ if not creds or not creds.valid:
18
+ if creds and creds.expired and creds.refresh_token:
19
+ creds.refresh(Request())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  else:
21
+ flow = InstalledAppFlow.from_client_secrets_file(
22
+ CLIENT_SECRETS_FILE, SCOPES)
23
+ # 手动认证:命令行打印链接,复制粘贴访问授权
24
+ creds = flow.run_console()
25
+ with open(CREDENTIALS_FILE, 'wb') as token:
26
+ pickle.dump(creds, token)
27
+ youtube = build('youtube', 'v3', credentials=creds)
28
+ return youtube
29
+
30
+ def upload_video(youtube, video_path, title, description, tags=None, privacy_status="private"):
31
+ if not os.path.exists(video_path):
32
+ print(f"错误:找不到视频文件 {video_path}")
33
+ return None
34
+
35
+ body = {
36
+ 'snippet': {
37
+ 'title': title,
38
+ 'description': description,
39
+ 'tags': tags or [],
40
+ 'categoryId': '22' # People & Blogs
41
+ },
42
+ 'status': {
43
+ 'privacyStatus': privacy_status,
44
+ 'selfDeclaredMadeForKids': False
45
+ }
46
+ }
47
+
48
+ media = MediaFileUpload(video_path, chunksize=-1, resumable=True, mimetype='video/*')
49
+ request = youtube.videos().insert(
50
+ part=','.join(body.keys()),
51
+ body=body,
52
+ media_body=media
 
53
  )
54
+ response = request.execute()
55
+
56
+ print(f"上传成功!视频链接:https://www.youtube.com/watch?v={response['id']}")
57
+ return response['id']
58
+
59
+ def main():
60
+ youtube = authenticate_youtube()
61
+
62
+ # 在这里替换为你想上传的视频文件及信息
63
+ videos_to_upload = [
64
+ {
65
+ 'path': 'your_video_1.mp4',
66
+ 'title': '视频标题1',
67
+ 'description': '这是视频描述1',
68
+ 'tags': ['示例', '测试'],
69
+ 'privacy_status': 'private'
70
+ },
71
+ {
72
+ 'path': 'your_video_2.mp4',
73
+ 'title': '视频标题2',
74
+ 'description': '这是视频描述2',
75
+ 'tags': ['样例', '上传'],
76
+ 'privacy_status': 'unlisted'
77
+ }
78
+ ]
79
+
80
+ for video_info in videos_to_upload:
81
+ upload_video(
82
+ youtube,
83
+ video_info['path'],
84
+ video_info['title'],
85
+ video_info['description'],
86
+ video_info.get('tags'),
87
+ video_info.get('privacy_status', 'private')
88
+ )
89
 
90
+ if __name__ == '__main__':
91
+ main()