Arvin9496 commited on
Commit
72ad91d
·
verified ·
1 Parent(s): 1ae5125

Create gcs_uploader.py

Browse files
Files changed (1) hide show
  1. gcs_uploader.py +64 -0
gcs_uploader.py ADDED
@@ -0,0 +1,64 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # gcs_uploader.py - Google Cloud Storage 上传工具
2
+ import os
3
+ from google.cloud import storage
4
+ from datetime import timedelta
5
+
6
+ def upload_video_to_gcs(
7
+ video_file_path: str,
8
+ bucket_name: str = "n8n-33-arvin",
9
+ destination_prefix: str = "video-demo/",
10
+ credentials_path: str = None,
11
+ expires: int = 3600
12
+ ):
13
+ """
14
+ 上传视频到 Google Cloud Storage,返回 (local_path, signed_url)
15
+
16
+ Args:
17
+ video_file_path (str): 本地视频文件路径
18
+ bucket_name (str): GCS 存储桶名称
19
+ destination_prefix (str): GCS 对象前缀(如 "video-demo/")
20
+ credentials_path (str): 服务账号 JSON 路径(可选)
21
+ expires (int): 签名 URL 有效期(秒),默认 1 小时
22
+
23
+ Returns:
24
+ tuple: (local_path, signed_url) 或 (None, error_message)
25
+ """
26
+ if not video_file_path or not os.path.isfile(video_file_path):
27
+ return None, "❌ 视频文件无效或不存在"
28
+
29
+ try:
30
+ # 设置凭据(优先使用传入路径,其次环境变量)
31
+ if credentials_path:
32
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
33
+ elif "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ:
34
+ # 尝试从 HF Secrets 获取 base64 编码的凭据(可选扩展)
35
+ pass
36
+
37
+ # 初始化客户端
38
+ storage_client = storage.Client()
39
+ bucket = storage_client.bucket(bucket_name)
40
+
41
+ # 构造目标对象名
42
+ original_name = os.path.basename(video_file_path)
43
+ destination_blob_name = f"{destination_prefix.rstrip('/')}/{original_name}"
44
+
45
+ blob = bucket.blob(destination_blob_name)
46
+
47
+ # 上传文件
48
+ blob.upload_from_filename(video_file_path)
49
+
50
+ # 生成签名 URL
51
+ signed_url = blob.generate_signed_url(
52
+ version="v4",
53
+ expiration=timedelta(seconds=expires),
54
+ method="GET"
55
+ )
56
+
57
+ print(f"[GCS] ✅ 上传成功: {destination_blob_name}")
58
+ print(f"[GCS] 🔗 签名链接: {signed_url}")
59
+ return video_file_path, signed_url
60
+
61
+ except Exception as e:
62
+ error_msg = f"❌ GCS 上传失败: {type(e).__name__} - {str(e)}"
63
+ print(f"[GCS] {error_msg}")
64
+ return None, error_msg