| | import aiohttp |
| | import asyncio |
| | import json, pprint, uuid, os, datetime |
| | import tempfile, shutil |
| | from typing import List, Optional |
| | from datetime import datetime, timedelta |
| | from pydantic import BaseModel, HttpUrl |
| |
|
| |
|
| | class Metadata(BaseModel): |
| | filename: str |
| | type: str |
| |
|
| |
|
| | class Artifact(BaseModel): |
| | asset_id: str |
| | created_at: datetime |
| | file_extension: str |
| | id: str |
| | is_segmented: bool |
| | lookup_key: HttpUrl |
| | md5: str |
| | metadata: Metadata |
| | read_url: HttpUrl |
| | size: int |
| | status: str |
| | uploaded_by: str |
| |
|
| |
|
| | class TTSResponse(BaseModel): |
| | artifacts: List[Artifact] |
| | created_at: datetime |
| | created_by: str |
| | id: str |
| | lookup_key: HttpUrl |
| | metadata: Optional[dict] |
| |
|
| |
|
| | class DescriptTTS: |
| | def __init__(self, refresh_token=None): |
| | self.client_id = "VDfu7rg4pdCELWsrQjcw2tG63a8Qlymi" |
| | self.refresh_token_url = "https://auth0.descript.com/oauth/token" |
| | self.project_id = "f734c6d7-e39d-4c1d-8f41-417f94cd37ce" |
| | self.bearer_token = None |
| | self.voice_ids = { |
| | "Henry": "569fffb0-05a3-48a2-96a3-bf411c376477", |
| | "Malcom": "75f8b86e-d05d-4862-a228-8d96fdf55258", |
| | "Lawrance": "042460c0-98a5-41ae-9f31-33672ebb9016", |
| | |
| | } |
| |
|
| | self.refresh_token = refresh_token |
| | self.tau_id = "90f9e0ad-594e-4203-9297-d4c7cc691e5x" |
| |
|
| | async def login_and_get_bearer_token(self): |
| | |
| | new_bearer_token, new_refresh_token = await self.refresh_access_token() |
| |
|
| | |
| | await self.update_refresh_token(new_refresh_token) |
| |
|
| | |
| | self.bearer_token = new_bearer_token |
| | self.refresh_token = new_refresh_token |
| |
|
| | async def refresh_access_token(self): |
| | |
| | if self.refresh_token == None: |
| | await self.load_existing_refresh_token() |
| |
|
| | |
| | payload = { |
| | "grant_type": "refresh_token", |
| | "refresh_token": self.refresh_token, |
| | "client_id": self.client_id, |
| | } |
| |
|
| | |
| | async with aiohttp.ClientSession() as session: |
| | async with session.post(self.refresh_token_url, data=payload) as response: |
| | if response.status == 200: |
| | |
| | response_data = await response.json() |
| | new_bearer_token = response_data.get("access_token") |
| | new_refresh_token = response_data.get("refresh_token") |
| |
|
| | return new_bearer_token, new_refresh_token |
| | else: |
| | raise Exception( |
| | f"Failed to refresh access token. Status code: {response.status}, Error: {await response.text()}" |
| | ) |
| |
|
| | async def load_existing_refresh_token(self): |
| | |
| | async with aiohttp.ClientSession() as session: |
| | async with session.get( |
| | "https://herokuserver-185316.firebaseio.com/refresh_token_descript.json" |
| | ) as response: |
| | if response.status == 200: |
| | |
| | data = await response.json() |
| | self.refresh_token = data.get("refresh_token") |
| | else: |
| | raise Exception( |
| | f"Failed to load existing refresh token. Status code: {response.status}, Error: {await response.text()}" |
| | ) |
| |
|
| | async def download_and_store_file(self, access_url): |
| | temp_dir = tempfile.mkdtemp() |
| | |
| | random_filename = str(uuid.uuid4()) + ".wav" |
| | file_path = os.path.join(temp_dir, random_filename) |
| |
|
| | async with aiohttp.ClientSession() as session: |
| | async with session.get(access_url) as response: |
| | if response.status == 200: |
| | with open(file_path, "wb") as file: |
| | while True: |
| | chunk = await response.content.read(1024) |
| | if not chunk: |
| | break |
| | file.write(chunk) |
| |
|
| | |
| | delete_time = datetime.now() + timedelta(minutes=10) |
| |
|
| | async def schedule_delete(): |
| | while datetime.now() < delete_time: |
| | await asyncio.sleep(60) |
| | shutil.rmtree( |
| | temp_dir, ignore_errors=True |
| | ) |
| |
|
| | asyncio.ensure_future(schedule_delete()) |
| |
|
| | return file_path |
| |
|
| | async def search_unsplash_images(self, query_terms): |
| | url = "https://api.descript.com/v2/cloud_libraries/providers/unsplash/image/search" |
| | data = { |
| | 'tracking_info': {'project_id': self.project_id}, |
| | 'pagination_info': {'page': 2, 'page_size': 25}, |
| | 'query': {'terms': query_terms} |
| | } |
| |
|
| | try: |
| | response = await self.make_authenticated_request(url, method="POST", data=data) |
| | return response |
| | except Exception as e: |
| | print(f"Failed to search Unsplash images: {e}") |
| | return None |
| |
|
| |
|
| |
|
| |
|
| | async def search_sound_effects(self, query_terms): |
| | url = "https://api.descript.com/v2/cloud_libraries/providers/stock-sfx/audio/search" |
| | headers = { |
| | 'accept': 'application/json, text/plain, */*', |
| | 'accept-language': 'en-US,en;q=0.9', |
| | 'content-type': 'application/json', |
| | |
| | 'authorization': f'Bearer {self.bearer_token}', |
| | } |
| | data = { |
| | 'tracking_info': {'project_id': self.project_id}, |
| | 'pagination_info': {'page': 1, 'page_size': 25}, |
| | 'query': {'terms': query_terms} |
| | } |
| |
|
| | try: |
| | response = await self.make_authenticated_request(url, method="POST", data=data) |
| | return response |
| | except Exception as e: |
| | print(f"Failed to search sound effects: {e}") |
| | return {'status':str(e)} |
| |
|
| |
|
| | async def get_voices(self): |
| | url = "https://api.descript.com/v2/users/me/voices" |
| | try: |
| | response = await self.make_authenticated_request(url) |
| | voices = response |
| | self.voice_ids = {voice['name']: voice['id'] for voice in voices} |
| |
|
| | return voices |
| | except Exception as e: |
| | print(f"Failed to fetch voices: {e}") |
| | return None |
| |
|
| |
|
| | async def start_token_refresh_schedule(self): |
| | while True: |
| | try: |
| | new_bearer_token, new_refresh_token = await self.refresh_access_token() |
| | self.bearer_token = new_bearer_token |
| | self.refresh_token = new_refresh_token |
| |
|
| | |
| | await self.update_refresh_token(new_refresh_token) |
| |
|
| | print("Token refreshed successfully") |
| | except Exception as e: |
| | print(f"Failed to refresh token: {e}") |
| |
|
| | |
| | await asyncio.sleep(24 * 60 * 60) |
| |
|
| |
|
| | async def update_refresh_token(self, new_refresh_token): |
| | |
| | data = {"refresh_token": new_refresh_token} |
| | async with aiohttp.ClientSession() as session: |
| | async with session.put( |
| | "https://herokuserver-185316.firebaseio.com/refresh_token_descript.json", |
| | json=data, |
| | ) as response: |
| | if response.status != 200: |
| | raise Exception( |
| | f"Failed to update refresh token. Status code: {response.status}, Error: {await response.text()}" |
| | ) |
| |
|
| | async def make_authenticated_request(self, url, method="GET", data=None): |
| | if not self.bearer_token: |
| | await self.login_and_get_bearer_token() |
| |
|
| | headers = { |
| | "authority": "api.descript.com", |
| | "accept": "application/json, text/plain, */*", |
| | "accept-language": "en-US,en;q=0.9", |
| | "accept-version": "v1", |
| | "authorization": f"Bearer {self.bearer_token}", |
| | "cache-control": "no-cache", |
| | "content-type": "application/json", |
| | "origin": "https://web.descript.com", |
| | "pragma": "no-cache", |
| | "referer": "https://web.descript.com/", |
| | "sec-ch-ua": '"Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"', |
| | "sec-ch-ua-mobile": "?0", |
| | "sec-ch-ua-platform": '"Windows"', |
| | "sec-fetch-dest": "empty", |
| | "sec-fetch-mode": "cors", |
| | "sec-fetch-site": "same-site", |
| | "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36", |
| | "x-descript-app-build-number": "20231206.146", |
| | "x-descript-app-build-type": "release", |
| | "x-descript-app-id": "48db7358-5ebc-4866-b672-10b412ac39c1", |
| | "x-descript-app-name": "web", |
| | "x-descript-app-version": "78.2.4", |
| | "x-descript-auth": "auth0", |
| | } |
| |
|
| | async with aiohttp.ClientSession() as session: |
| | async with session.request( |
| | method, url, headers=headers, json=data |
| | ) as response: |
| | if response.status < 300: |
| | return await response.json() |
| | elif response.status == 401: |
| | |
| | await self.login_and_get_bearer_token() |
| | headers["authorization"] = f"Bearer {self.bearer_token}" |
| | async with session.request( |
| | method, url, headers=headers, json=data |
| | ) as retry_response: |
| | if retry_response.status == 200: |
| | return await retry_response.json() |
| | else: |
| | raise Exception( |
| | f"Request failed even after refreshing token. Status code: {retry_response.status}, Error: {await retry_response.text()}" |
| | ) |
| | else: |
| | raise Exception( |
| | f"Request failed. Status code: {response.status}, Error: {await response.text()}" |
| | ) |
| |
|
| | async def get_assets(self): |
| | url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/media_assets?include_artifacts=true&cursor=1702016922390&include_placeholder=true" |
| | try: |
| | result = await self.make_authenticated_request(url) |
| | return result |
| | except Exception as e: |
| | print(f"Failed to get assets: {str(e)}") |
| |
|
| | async def overdub_text(self, text, speaker="Lawrance",_voice_id=None): |
| | url = "https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub" |
| | voice_id = _voice_id or self.voice_ids[speaker] |
| | data = { |
| | "text": text, |
| | "voice_id": voice_id, |
| | "concatenate_audio": True, |
| | "tau_id": self.tau_id, |
| | "allow_prefix_expansion": True, |
| | "allow_suffix_expansion": True, |
| | } |
| |
|
| | try: |
| | result = await self.make_authenticated_request( |
| | url, method="POST", data=data |
| | ) |
| | return result |
| | except Exception as e: |
| | |
| | if "authorization" in str(e).lower(): |
| | await self.login_and_get_bearer_token() |
| | result = await self.make_authenticated_request( |
| | url, method="POST", data=data |
| | ) |
| | print(result) |
| | return result |
| | else: |
| | print(f"Failed to perform overdub: {str(e)}") |
| |
|
| | async def overdub_staus(self, id): |
| | url = f"https://api.descript.com/v2/projects/f734c6d7-e39d-4c1d-8f41-417f94cd37ce/overdub/{id}" |
| |
|
| | try: |
| | result = await self.make_authenticated_request(url, method="GET") |
| | print(result) |
| | return result |
| | except Exception as e: |
| | |
| | if "authorization" in str(e).lower(): |
| | await self.login_and_get_bearer_token() |
| | result = await self.make_authenticated_request( |
| | url, method="POST", data=data |
| | ) |
| | print(result) |
| | return result |
| | else: |
| | print(f"Failed to perform overdub: {str(e)}") |
| |
|
| | async def request_status(self, id): |
| | status = await self.overdub_staus(id) |
| | if status["state"] == "done": |
| | asset_id=status["result"]["imputation_audio_asset_id"] |
| | overdub = await self.get_assets() |
| | for asset in overdub["data"]: |
| | if asset["id"] == asset_id: |
| | data = TTSResponse(**asset) |
| | url = data.artifacts[0].read_url |
| | return {'url':url,'status':'done'} |
| | return status |
| |
|
| |
|
| |
|
| | async def say(self, text, speaker="Henry"): |
| | overdub = await self.overdub_text(text, speaker=speaker) |
| | |
| | asset_id = None |
| | while True: |
| | status = await self.overdub_staus(overdub["id"]) |
| | |
| | if status["state"] == "done": |
| | |
| | asset_id = status["result"]["imputation_audio_asset_id"] |
| | break |
| | await asyncio.sleep(3) |
| |
|
| | overdub = await self.get_assets() |
| | for asset in overdub["data"]: |
| | if asset["id"] == asset_id: |
| | data = TTSResponse(**asset) |
| | url = data.artifacts[0].read_url |
| | print(url) |
| | path = await self.download_and_store_file(str(url)) |
| | return path, url |
| |
|
| |
|
| |
|