Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8 -*- | |
| import os | |
| import sys | |
| import string | |
| import time | |
| import datetime | |
| import json | |
| from io import BytesIO | |
| import oss2 | |
| import random | |
| import requests | |
| import shutil | |
| from myconfigs import * | |
| use_internal_network = False | |
| def get_random_string(): | |
| now = datetime.datetime.now() | |
| date = now.strftime('%Y%m%d') | |
| time = now.strftime('%H%M%S') | |
| microsecond = now.strftime('%f') | |
| microsecond = microsecond[:6] | |
| rand_num = ''.join([str(random.randint(0, 9)) for _ in range(6)]) | |
| random_string = ''.join(random.choices(string.ascii_uppercase, k=6)) # ascii_lowercase | |
| return date + "-" + time + "-" + microsecond + "-" + random_string | |
| class ossService(): | |
| def __init__(self): | |
| # print(f'AK: ******{OSSAccessKeyId[-3:]}') | |
| # print(f'SK: ******{OSSAccessKeySecret[-3:]}') | |
| self.AccessKeyId = OSSAccessKeyId | |
| self.AccessKeySecret = OSSAccessKeySecret | |
| self.Endpoint = OSSEndpoint | |
| if use_internal_network: | |
| self.Endpoint = OSSEndpoint[:-len(".aliyuncs.com")] + "-internal.aliyuncs.com" | |
| self.BucketName = OSSBucketName # "vigen-invi" # "vigen-video" | |
| self.ObjectName = OSSObjectName # "video_generation" # "VideoGeneration" | |
| self.Prefix = "oss://" + self.BucketName | |
| auth = oss2.Auth(self.AccessKeyId, self.AccessKeySecret) | |
| self.bucket = oss2.Bucket(auth, self.Endpoint, self.BucketName) | |
| # oss_url: eg: oss://BucketName/ObjectName/xxx.mp4 | |
| # timeout: eg: 3600*100 | |
| # params: eg: {'x-oss-process': style} | |
| def sign(self, oss_url, timeout=3600, params=None): | |
| try: | |
| oss_path = oss_url[len("oss://" + self.BucketName + "/"):] | |
| return 1, self.bucket.sign_url('GET', oss_path, timeout, slash_safe=True, params=params) | |
| except Exception as e: | |
| print("sign error: {}".format(e)) | |
| return 0, "" | |
| def uploadOssFile(self, oss_full_path, local_full_path): | |
| try: | |
| self.bucket.put_object_from_file(oss_full_path, local_full_path) | |
| return self.sign(self.Prefix+"/"+oss_full_path, timeout=3600) | |
| except oss2.exceptions.OssError as e: | |
| print("oss upload error: ", e) | |
| return 0, "" | |
| def downloadOssFile(self, oss_full_path, local_full_path): | |
| status = 1 | |
| try: | |
| self.bucket.get_object_to_file(oss_full_path, local_full_path) | |
| except oss2.exceptions.OssError as e: | |
| print("oss download error: ", e) | |
| status = 0 | |
| return status | |
| def downloadFile(self, file_full_url, local_full_path): | |
| status = 1 | |
| response = requests.get(file_full_url) | |
| if response.status_code == 200: | |
| with open(local_full_path, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| print("oss download error. ") | |
| status = 0 | |
| return status | |