Spaces:
Runtime error
Runtime error
| import os | |
| from datetime import datetime | |
| import uuid | |
| import oss2 | |
| from retry import retry | |
| oss_ak = os.environ.get("ALIYUN_ACCESS_KEY_ID") | |
| oss_sk = os.environ.get("ALIYUN_ACCESS_KEY_SECRET") | |
| oss_endpoint = os.environ.get("ALIYUN_ENDPOINT") | |
| oss_bucket_name = os.environ.get("ALIYUN_BUCKET_NAME") | |
| # 配置 OSS 连接信息 - 请替换为你的实际信息 | |
| auth = oss2.Auth(oss_ak, oss_sk) | |
| # Configure timeouts via config; use both connect and read timeout | |
| # service = oss2.Service(auth, oss_endpoint, connect_timeout=60) | |
| bucket = oss2.Bucket(auth, oss_endpoint, oss_bucket_name, connect_timeout=60) | |
| def upload_to_oss(file_path): | |
| if not os.path.isfile(file_path): | |
| raise FileNotFoundError(f"文件不存在: {file_path}") | |
| file_name = os.path.basename(file_path) | |
| file_ext = file_name.split(".")[-1] if "." in file_name else "" | |
| rand4 = uuid.uuid4().hex[:4] | |
| unique_key = ( | |
| f"gradio_upload/{datetime.now().strftime('%Y%m%d%H%M%S')}_{rand4}.{file_ext}" | |
| ) | |
| try: | |
| with open(file_path, "rb") as f: | |
| result = bucket.put_object(unique_key, f) | |
| if result.status == 200: | |
| oss_url = f"https://{bucket.bucket_name}.{bucket.endpoint.split('//')[1]}/{unique_key}" | |
| return oss_url | |
| else: | |
| print(f"上传失败,错误码:{result.status}") | |
| raise RuntimeError(f"上传失败,错误码:{result.status}") | |
| except Exception as e: | |
| print(f"上传出错:{e}") | |
| raise RuntimeError(f"上传出错:{e}") from e | |