Spaces:
Paused
Paused
| #!/usr/bin/python3 | |
| # -*- coding: utf-8 -*- | |
| import json | |
| import logging | |
| import time | |
| import oss2 | |
| from oss2.credentials import StaticCredentialsProvider | |
| from tenacity import before_sleep_log, retry, retry_if_exception_type, stop_after_attempt, wait_fixed | |
| api_logger = logging.getLogger("api") | |
| class AliyunOSS(object): | |
| """ | |
| https://help.aliyun.com/zh/oss/developer-reference/getting-started-with-oss-sdk-for-python#0cf90ff8b28eg | |
| """ | |
| def __init__(self, | |
| endpoint_url: str, | |
| region: str, | |
| secret_key: str, | |
| secret_id: str, | |
| bucket: str, | |
| ): | |
| self.endpoint_url = endpoint_url | |
| self.region = region | |
| self.secret_key = secret_key | |
| self.secret_id = secret_id | |
| self.bucket = bucket | |
| self.auth = oss2.ProviderAuthV4(StaticCredentialsProvider( | |
| self.secret_key, self.secret_id | |
| )) | |
| self.bucket = oss2.Bucket(self.auth, self.endpoint_url, self.bucket, region=self.region) | |
| async def upload_by_filename(self, local_filename: str, cos_filename: str): | |
| file_in_bytes = open(local_filename, "rb") | |
| response = await self.upload_by_bytes(file_in_bytes, cos_filename) | |
| return response | |
| async def upload_by_bytes(self, data_bytes: bytes, url_path: str) -> dict: | |
| start_time = time.time() | |
| try: | |
| response = self.bucket.put_object(url_path, data_bytes) | |
| msg = "success" | |
| rsp_text = json.dumps({ | |
| "status": response.status, | |
| "etag": response.etag, | |
| }) | |
| time_cost = time.time() - start_time | |
| api_logger.info(f"s3|{msg}|{time_cost:.3f}s|{self.endpoint_url}|{self.bucket}|{url_path}|{rsp_text}") | |
| except Exception as e: | |
| msg = f"{type(e)}: {str(e)}" | |
| rsp_text = "" | |
| time_cost = time.time() - start_time | |
| api_logger.info(f"s3|{msg}|{time_cost:.3f}s|{self.endpoint_url}|{self.bucket}|{url_path}|{rsp_text}") | |
| raise e | |
| return response | |
| if __name__ == "__main__": | |
| pass | |