import os from datetime import datetime import uuid import oss2 from retry import retry oss_ak = os.environ.get("ALIYUN_ACCESS_KEY_ID") oss_sk = os.environ.get("ALIYUN_ACCESS_KEY_SECRET") oss_endpoint = os.environ.get("ALIYUN_ENDPOINT") oss_bucket_name = os.environ.get("ALIYUN_BUCKET_NAME") # 配置 OSS 连接信息 - 请替换为你的实际信息 auth = oss2.Auth(oss_ak, oss_sk) # Configure timeouts via config; use both connect and read timeout # service = oss2.Service(auth, oss_endpoint, connect_timeout=60) bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name, connect_timeout=60) @retry(tries=2) def upload_to_oss(file_path): if not os.path.isfile(file_path): raise FileNotFoundError(f"文件不存在: {file_path}") file_name = os.path.basename(file_path) file_ext = file_name.split(".")[-1] if "." in file_name else "" rand4 = uuid.uuid4().hex[:4] unique_key = ( f"gradio_upload/{datetime.now().strftime('%Y%m%d%H%M%S')}_{rand4}.{file_ext}" ) try: with open(file_path, "rb") as f: result = bucket.put_object(unique_key, f) if result.status == 200: oss_url = f"https://{bucket.bucket_name}.{bucket.endpoint.split('//')[1]}/{unique_key}" return oss_url else: print(f"上传失败,错误码:{result.status}") raise RuntimeError(f"上传失败,错误码:{result.status}") except Exception as e: print(f"上传出错:{e}") raise RuntimeError(f"上传出错:{e}") from e