Spaces:
Runtime error
Runtime error
| import discord | |
| from discord.ext import commands | |
| import os | |
| import re | |
| import time | |
| import requests | |
| import pathlib | |
| import tempfile | |
| import random | |
| from io import BytesIO | |
| class FailedToGenerateResponseError(Exception): | |
| pass | |
| class OpenAIFMTTS: | |
| headers = { | |
| "accept": "*/*", | |
| "accept-language": "en-US,en;q=0.9", | |
| "cache-control": "no-cache", | |
| "pragma": "no-cache", | |
| "sec-fetch-dest": "audio", | |
| "sec-fetch-mode": "no-cors", | |
| "sec-fetch-site": "same-origin", | |
| "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| "referer": "https://www.openai.fm" | |
| } | |
| SUPPORTED_MODELS = [ | |
| "gpt-4o-mini-tts", | |
| "tts-1", | |
| "tts-1-hd" | |
| ] | |
| SUPPORTED_VOICES = [ | |
| "alloy", | |
| "ash", | |
| "ballad", | |
| "coral", | |
| "echo", | |
| "fable", | |
| "nova", | |
| "onyx", | |
| "sage", | |
| "shimmer" | |
| ] | |
| voice_mapping = { | |
| "alloy": "alloy", | |
| "ash": "ash", | |
| "ballad": "ballad", | |
| "coral": "coral", | |
| "echo": "echo", | |
| "fable": "fable", | |
| "nova": "nova", | |
| "onyx": "onyx", | |
| "sage": "sage", | |
| "shimmer": "shimmer" | |
| } | |
| def __init__(self, timeout: int = 20, proxies: dict = None): | |
| self.api_url = "https://www.openai.fm/api/generate" | |
| self.session = requests.Session() | |
| self.session.headers.update(self.headers) | |
| if proxies: | |
| self.session.proxies.update(proxies) | |
| self.timeout = timeout | |
| self.temp_dir = tempfile.gettempdir() | |
| self.SUPPORTED_FORMATS = ["mp3", "opus", "aac", "flac", "wav", "pcm"] | |
| def validate_model(self, model: str) -> str: | |
| if model not in self.SUPPORTED_MODELS: | |
| raise ValueError(f"Unsupported model: {model}. Supported models: {self.SUPPORTED_MODELS}") | |
| return model | |
| def validate_voice(self, voice: str) -> str: | |
| if voice not in self.SUPPORTED_VOICES: | |
| raise ValueError(f"Unsupported voice: {voice}. Supported voices: {self.SUPPORTED_VOICES}") | |
| return voice | |
| def validate_format(self, response_format: str) -> str: | |
| if response_format not in self.SUPPORTED_FORMATS: | |
| raise ValueError(f"Unsupported format: {response_format}. Supported formats: {self.SUPPORTED_FORMATS}") | |
| return response_format | |
| def tts(self, text: str, model: str = "gpt-4o-mini-tts", voice: str = "coral", response_format: str = "mp3", instructions: str = None, verbose: bool = True) -> str: | |
| if not text or not isinstance(text, str): | |
| raise ValueError("Input text must be a non-empty string") | |
| if len(text) > 10000: | |
| raise ValueError("Input text exceeds maximum allowed length of 10,000 characters") | |
| model = self.validate_model(model) | |
| voice = self.validate_voice(voice) | |
| response_format = self.validate_format(response_format) | |
| voice_id = self.voice_mapping.get(voice, voice) | |
| if instructions is None: | |
| instructions = "Speak in a cheerful and positive tone." | |
| file_extension = f".{response_format}" if response_format != "pcm" else ".wav" | |
| with tempfile.NamedTemporaryFile(suffix=file_extension, dir=self.temp_dir, delete=False) as temp_file: | |
| filename = pathlib.Path(temp_file.name) | |
| params = { | |
| "input": text, | |
| "prompt": instructions, | |
| "voice": voice_id, | |
| "model": model, | |
| "response_format": response_format | |
| } | |
| try: | |
| response = self.session.get( | |
| self.api_url, | |
| params=params, | |
| timeout=self.timeout | |
| ) | |
| response.raise_for_status() | |
| if not response.content: | |
| raise FailedToGenerateResponseError("Empty response from API") | |
| with open(filename, "wb") as f: | |
| f.write(response.content) | |
| if verbose: | |
| print(f"[debug] Speech generated successfully") | |
| print(f"[debug] Model: {model}") | |
| print(f"[debug] Voice: {voice}") | |
| print(f"[debug] Format: {response_format}") | |
| print(f"[debug] Audio saved to {filename}") | |
| return filename.as_posix() | |
| except requests.exceptions.RequestException as e: | |
| if verbose: | |
| print(f"[debug] Failed to generate speech: {e}") | |
| raise FailedToGenerateResponseError(f"Failed to generate speech: {e}") | |
| except Exception as e: | |
| if verbose: | |
| print(f"[debug] Unexpected error: {e}") | |
| raise FailedToGenerateResponseError(f"Unexpected error during speech generation: {e}") | |
| def create_speech(self, input: str, model: str = "gpt-4o-mini-tts", voice: str = "coral", response_format: str = "mp3", instructions: str = None, verbose: bool = False) -> str: | |
| return self.tts(text=input, model=model, voice=voice, response_format=response_format, instructions=instructions, verbose=verbose) | |
| def with_streaming_response(self): | |
| return StreamingResponseContextManager(self) | |
| class StreamingResponseContextManager: | |
| def __init__(self, tts_provider: OpenAIFMTTS): | |
| self.tts_provider = tts_provider | |
| self.audio_file = None | |
| def create(self, input: str, model: str = "gpt-4o-mini-tts", voice: str = "coral", response_format: str = "mp3", instructions: str = None): | |
| self.audio_file = self.tts_provider.create_speech(input=input, model=model, voice=voice, response_format=response_format, instructions=instructions) | |
| return StreamingResponse(self.audio_file) | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| class StreamingResponse: | |
| def __init__(self, audio_file: str): | |
| self.audio_file = audio_file | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| pass | |
| def stream_to_file(self, file_path: str, chunk_size: int = 1024): | |
| import shutil | |
| shutil.copy2(self.audio_file, file_path) | |
| def iter_bytes(self, chunk_size: int = 1024): | |
| with open(self.audio_file, 'rb') as f: | |
| while chunk := f.read(chunk_size): | |
| yield chunk | |
| intents = discord.Intents.default() | |
| intents.message_content = True | |
| intents.members = True | |
| bot = commands.Bot(command_prefix='!', intents=intents) | |
| tts_provider = OpenAIFMTTS() | |
| async def on_ready(): | |
| print(f'Logged in as {bot.user.name} ({bot.user.id})') | |
| print('------') | |
| async def text_to_speech(ctx, *, text: str = None): | |
| try: | |
| if not text: | |
| await ctx.send("Please provide text to convert to speech. Usage: `!tts [text]` or `!tts [text] --voice [voice]`") | |
| return | |
| voice = "coral" | |
| supported_voices = ["alloy", "ash", "ballad", "coral", "echo", "fable", "nova", "onyx", "sage", "shimmer"] | |
| voice_match_end = re.search(r'\s+--voice\s+(\w+)$', text) | |
| voice_match_start = re.search(r'^--voice\s+(\w+)\s+', text) | |
| if voice_match_end: | |
| requested_voice = voice_match_end.group(1).lower() | |
| text = re.sub(r'\s+--voice\s+\w+$', '', text).strip() | |
| elif voice_match_start: | |
| requested_voice = voice_match_start.group(1).lower() | |
| text = re.sub(r'^--voice\s+\w+\s+', '', text).strip() | |
| else: | |
| requested_voice = None | |
| if requested_voice: | |
| if requested_voice in supported_voices: | |
| voice = requested_voice | |
| else: | |
| await ctx.send(f"❌ Invalid voice: `{requested_voice}`") | |
| return | |
| if not text.strip(): | |
| await ctx.send("Please provide text to convert to speech. Usage: `!tts [text]` or `!tts [text] --voice [voice]`") | |
| return | |
| if len(text) > 1000: | |
| await ctx.send("❌ Text is too long. Please keep it under 1000 characters.") | |
| return | |
| processing_msg = await ctx.send("🎵 Generating speech...") | |
| audio_file = tts_provider.create_speech( | |
| input=text, | |
| voice=voice, | |
| response_format="mp3", | |
| instructions="Speak clearly and naturally." | |
| ) | |
| await processing_msg.delete() | |
| await ctx.send( | |
| content=f"🎵 TTS generated with voice: **{voice}**", | |
| file=discord.File(audio_file, filename=f"tts_{voice}.mp3") | |
| ) | |
| os.remove(audio_file) | |
| except FailedToGenerateResponseError as e: | |
| await ctx.send(f"❌ Failed to generate speech: {str(e)}") | |
| except Exception as e: | |
| await ctx.send(f"❌ An error occurred: {str(e)}") | |
| print(f"TTS Error: {e}") | |
| async def on_message(message): | |
| if message.author == bot.user: | |
| return | |
| await bot.process_commands(message) | |
| TOKEN = os.getenv("DISCORD_BOT_TOKEN") | |
| if TOKEN: | |
| bot.run(TOKEN) | |
| else: | |
| print("Error: No bot token provided!") | |