File size: 1,547 Bytes
63ff2ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
from datetime import datetime
import uuid

import oss2
from retry import retry

oss_ak = os.environ.get("ALIYUN_ACCESS_KEY_ID")
oss_sk = os.environ.get("ALIYUN_ACCESS_KEY_SECRET")
oss_endpoint = os.environ.get("ALIYUN_ENDPOINT")
oss_bucket_name = os.environ.get("ALIYUN_BUCKET_NAME")

# 配置 OSS 连接信息 - 请替换为你的实际信息

auth = oss2.Auth(oss_ak, oss_sk)
# Configure timeouts via config; use both connect and read timeout
# service = oss2.Service(auth, oss_endpoint, connect_timeout=60)
bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name, connect_timeout=60)


@retry(tries=2)
def upload_to_oss(file_path):
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"文件不存在: {file_path}")
    file_name = os.path.basename(file_path)
    file_ext = file_name.split(".")[-1] if "." in file_name else ""
    rand4 = uuid.uuid4().hex[:4]
    unique_key = (
        f"gradio_upload/{datetime.now().strftime('%Y%m%d%H%M%S')}_{rand4}.{file_ext}"
    )
    try:
        with open(file_path, "rb") as f:
            result = bucket.put_object(unique_key, f)
        if result.status == 200:
            oss_url = f"https://{bucket.bucket_name}.{bucket.endpoint.split('//')[1]}/{unique_key}"
            return oss_url
        else:
            print(f"上传失败,错误码:{result.status}")
            raise RuntimeError(f"上传失败,错误码:{result.status}")
    except Exception as e:
        print(f"上传出错:{e}")
        raise RuntimeError(f"上传出错:{e}") from e