Spaces:
Paused
Paused
File size: 7,105 Bytes
64cfce9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
"""
TTS API Server
This module provides a server that's compatible with OpenAI's TTS API format.
"""
import asyncio
import aiohttp
import logging
import ssl
from aiohttp import web, TCPConnector
from typing import Optional
import random
from utils.config import load_config
from server.handlers import handle_openai_speech, handle_queue_size, handle_static, process_tts_request, handle_voice_sample
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Load configuration
config = load_config()
class TTSServer:
"""Server that's compatible with OpenAI's TTS API."""
def __init__(self, host: str = config['host'], port: int = config['port'],
max_queue_size: int = config['max_queue_size'], verify_ssl: bool = config['verify_ssl']):
"""Initialize the TTS server.
Args:
host: Host to bind to
port: Port to bind to
max_queue_size: Maximum number of tasks in queue
verify_ssl: Whether to verify SSL certificates when connecting to external services
"""
self.host = host
self.port = port
self.app = web.Application()
self.verify_ssl = verify_ssl
# Validate and set queue size
try:
max_queue_size = int(max_queue_size)
if max_queue_size < 1:
logger.warning(f"Invalid max_queue_size {max_queue_size}, defaulting to 100")
max_queue_size = 100
except (ValueError, TypeError):
logger.warning(f"Invalid max_queue_size {max_queue_size}, defaulting to 100")
max_queue_size = 100
# Initialize queue system with rate limiting
self.queue = asyncio.Queue(maxsize=max_queue_size)
self.current_task = None
self.processing_lock = asyncio.Lock()
self.last_request_time = 0
self.min_request_interval = 1.0 # Minimum time between requests in seconds
# Set up routes
self.setup_routes()
self.session = None
logger.info(f"Initialized TTS server with max queue size: {max_queue_size}")
def setup_routes(self):
"""Set up the API routes."""
# OpenAI compatible endpoint
self.app.router.add_post('/v1/audio/speech', self._handle_openai_speech)
self.app.router.add_get('/api/queue-size', self._handle_queue_size)
self.app.router.add_get('/api/voice-sample/{voice}', handle_voice_sample)
self.app.router.add_get('/{tail:.*}', handle_static)
async def _handle_openai_speech(self, request):
"""Handler for OpenAI speech endpoint."""
return await handle_openai_speech(
request,
self.queue,
session=self.session
)
async def _handle_queue_size(self, request):
"""Handler for queue size endpoint."""
return await handle_queue_size(request, self.queue)
async def start(self):
"""Start the TTS server."""
# Configure SSL context and connector with better settings
if not self.verify_ssl:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
logger.warning("SSL certificate verification disabled. This is insecure and should only be used for testing.")
connector = TCPConnector(
ssl=False,
limit=10, # Limit concurrent connections
ttl_dns_cache=300, # Cache DNS results for 5 minutes
use_dns_cache=True,
enable_cleanup_closed=True
)
else:
connector = TCPConnector(
limit=10,
ttl_dns_cache=300,
use_dns_cache=True,
enable_cleanup_closed=True
)
# Create session with better timeout settings
timeout = aiohttp.ClientTimeout(
total=30,
connect=10,
sock_read=20
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.9",
"Origin": "https://www.openai.fm",
"Referer": "https://www.openai.fm/",
"Content-Type": "application/x-www-form-urlencoded"
}
)
logger.info("Created aiohttp session with optimized settings")
# Start the task processor
asyncio.create_task(self.process_queue())
runner = web.AppRunner(self.app)
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
logger.info(f"TTS server running at http://{self.host}:{self.port}")
if not self.verify_ssl:
logger.warning("Running with SSL verification disabled. Not recommended for production use.")
async def stop(self):
"""Stop the TTS server."""
if self.session:
await self.session.close()
async def process_queue(self):
"""Background task to process the queue with rate limiting."""
while True:
try:
# Get next task from queue
task_data = await self.queue.get()
# Implement rate limiting
current_time = asyncio.get_event_loop().time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_request_interval:
await asyncio.sleep(self.min_request_interval - time_since_last_request)
async with self.processing_lock:
self.current_task = task_data
try:
# Process the task
response = await process_tts_request(
task_data,
self.session
)
# Send response through the response future
task_data['response_future'].set_result(response)
self.last_request_time = asyncio.get_event_loop().time()
except Exception as e:
task_data['response_future'].set_exception(e)
finally:
self.current_task = None
self.queue.task_done()
except Exception as e:
logger.error(f"Error processing queue: {str(e)}")
await asyncio.sleep(1) # Prevent tight loop on persistent errors |