Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import requests | |
| import oss2 | |
| from pydub import AudioSegment # For audio conversion | |
| from requests.cookies import RequestsCookieJar | |
| def convert_to_mp3(input_path, output_path): | |
| """ | |
| Converts an audio file to MP3 format using pydub. | |
| Args: | |
| input_path (str): Path to the input audio file. | |
| output_path (str): Path where the converted MP3 file will be saved. | |
| Returns: | |
| None | |
| """ | |
| sound = AudioSegment.from_file(input_path) | |
| sound.export(output_path, format="mp3") | |
| def upload_image_to_cdn(base_url, auth_token, image_path): | |
| """ | |
| Uploads an image to the CDN using STS credentials. | |
| Args: | |
| base_url (str): The base URL of the API endpoint (e.g., "https://chat.qwen.ai"). | |
| auth_token (str): The Bearer token for authentication. | |
| image_path (str): The local path to the image file to upload. | |
| Returns: | |
| str: The CDN URL where the image is accessible. | |
| Raises: | |
| Exception: If the upload fails at any step. | |
| """ | |
| class ImageUploader: | |
| def __init__(self, base_url, auth_token): | |
| self.session = requests.Session() | |
| self.base_url = base_url | |
| self.auth_token = auth_token | |
| self.headers = self._create_headers() | |
| self.cookies = RequestsCookieJar() | |
| def _create_headers(self): | |
| return { | |
| 'x-request-id': str(uuid.uuid4()), | |
| 'Authorization': f'Bearer {self.auth_token}', | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' | |
| } | |
| def get_sts_token(self, filename, filesize, filetype): | |
| endpoint = '/api/v1/files/getstsToken' | |
| payload = { | |
| 'filename': filename, | |
| 'filesize': filesize, | |
| 'filetype': filetype | |
| } | |
| response = self.session.post( | |
| f'{self.base_url}{endpoint}', | |
| json=payload, | |
| headers=self.headers, | |
| cookies=self.cookies | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| def upload_to_oss(self, sts_data, file_path): | |
| auth = oss2.StsAuth( | |
| sts_data['access_key_id'], | |
| sts_data['access_key_secret'], | |
| sts_data['security_token'] | |
| ) | |
| endpoint = f"https://{sts_data['region']}.aliyuncs.com" | |
| bucket = oss2.Bucket(auth, endpoint, sts_data['bucketname']) | |
| with open(file_path, 'rb') as file: | |
| result = bucket.put_object(sts_data['file_path'], file) | |
| return result.status == 200 | |
| def upload_file(self, file_path, filetype): | |
| filename = os.path.basename(file_path) | |
| filesize = os.path.getsize(file_path) | |
| # Step 1: Get STS token | |
| sts_data = self.get_sts_token(filename, filesize, filetype) | |
| # Step 2: Upload to OSS | |
| upload_success = self.upload_to_oss(sts_data, file_path) | |
| if upload_success: | |
| return sts_data['file_url'] | |
| else: | |
| raise Exception("Upload failed") | |
| # Initialize the uploader and perform the upload | |
| uploader = ImageUploader(base_url, auth_token) | |
| try: | |
| file_url = uploader.upload_file(image_path, filetype='image') | |
| return file_url | |
| except Exception as e: | |
| raise Exception(f"Image upload failed: {str(e)}") | |
| def upload_audio_to_cdn(base_url, auth_token, audio_path): | |
| """ | |
| Uploads an audio file to the CDN using STS credentials. | |
| Args: | |
| base_url (str): The base URL of the API endpoint (e.g., "https://chat.qwen.ai"). | |
| auth_token (str): The Bearer token for authentication. | |
| audio_path (str): The local path to the audio file to upload. | |
| Returns: | |
| str: The CDN URL where the audio is accessible. | |
| Raises: | |
| Exception: If the upload fails at any step. | |
| """ | |
| class AudioUploader: | |
| def __init__(self, base_url, auth_token): | |
| self.session = requests.Session() | |
| self.base_url = base_url | |
| self.auth_token = auth_token | |
| self.headers = self._create_headers() | |
| self.cookies = RequestsCookieJar() | |
| def _create_headers(self): | |
| return { | |
| 'x-request-id': str(uuid.uuid4()), | |
| 'Authorization': f'Bearer {self.auth_token}', | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36' | |
| } | |
| def get_sts_token(self, filename, filesize, filetype): | |
| endpoint = '/api/v1/files/getstsToken' | |
| payload = { | |
| 'filename': filename, | |
| 'filesize': filesize, | |
| 'filetype': filetype | |
| } | |
| response = self.session.post( | |
| f'{self.base_url}{endpoint}', | |
| json=payload, | |
| headers=self.headers, | |
| cookies=self.cookies | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| def upload_to_oss(self, sts_data, file_path): | |
| auth = oss2.StsAuth( | |
| sts_data['access_key_id'], | |
| sts_data['access_key_secret'], | |
| sts_data['security_token'] | |
| ) | |
| endpoint = f"https://{sts_data['region']}.aliyuncs.com" | |
| bucket = oss2.Bucket(auth, endpoint, sts_data['bucketname']) | |
| with open(file_path, 'rb') as file: | |
| result = bucket.put_object( | |
| sts_data['file_path'], | |
| file, | |
| headers={'Content-Type': 'audio/mpeg'} # Set correct MIME type | |
| ) | |
| return result.status == 200 | |
| def upload_file(self, file_path, filetype): | |
| # Convert the audio file to MP3 format | |
| mp3_path = f"{os.path.splitext(file_path)[0]}.mp3" | |
| convert_to_mp3(file_path, mp3_path) | |
| # Use the converted MP3 file for upload | |
| filename = os.path.basename(mp3_path) | |
| filesize = os.path.getsize(mp3_path) | |
| # Step 1: Get STS token | |
| sts_data = self.get_sts_token(filename, filesize, filetype) | |
| # Step 2: Upload to OSS | |
| upload_success = self.upload_to_oss(sts_data, mp3_path) | |
| if upload_success: | |
| return sts_data['file_url'] | |
| else: | |
| raise Exception("Upload failed") | |
| # Initialize the uploader and perform the upload | |
| uploader = AudioUploader(base_url, auth_token) | |
| try: | |
| file_url = uploader.upload_file(audio_path, filetype='audio') | |
| return file_url | |
| except Exception as e: | |
| raise Exception(f"Audio upload failed: {str(e)}") |