#!/usr/bin/env python3 """ BeigeTTS - Streaming Inference Real-time streaming text-to-speech with chunk-based generation Research release derived from BlandAI's Khaki TTS system """ import torch import numpy as np import soundfile as sf from neucodec import NeuCodec from transformers import AutoModelForCausalLM, AutoTokenizer import threading import queue import time from typing import Optional, Generator, Tuple import pyaudio import argparse # ═══════════════════════════════════════════════════════════════════ # Configuration # ═══════════════════════════════════════════════════════════════════ class StreamingConfig: """Configuration for streaming TTS""" # Audio tokens AUDIO_START_TOKEN = 262145 AUDIO_END_TOKEN = 262146 NEUCODEC_BASE_OFFSET = 262154 NEUCODEC_VOCABULARY_SIZE = 65536 AUDIO_TOKEN_MIN = NEUCODEC_BASE_OFFSET AUDIO_TOKEN_MAX = NEUCODEC_BASE_OFFSET + NEUCODEC_VOCABULARY_SIZE # Streaming parameters CHUNK_SIZE = 50 # Audio tokens per chunk (~500ms) BUFFER_SIZE = 3 # Number of chunks to buffer SAMPLE_RATE = 24000 # Generation parameters DEFAULT_TEMPERATURE = 0.1 DEFAULT_TOP_P = 0.97 MAX_TOTAL_TOKENS = 1000 # Limited in research release (Khaki: unlimited) # ═══════════════════════════════════════════════════════════════════ # Streaming TTS Engine # ═══════════════════════════════════════════════════════════════════ class StreamingBeigeTTS: """Streaming BeigeTTS engine with chunk-based generation Note: Production Khaki system achieves <50ms latency with advanced buffering and predictive generation. """ def __init__(self, model_path: str = "BlandAI/BeigeTTS"): """Initialize streaming TTS engine""" self.config = StreamingConfig() self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("Initializing BeigeTTS streaming engine (research release)...") print("Note: Khaki production system supports <50ms latency and 57 languages") # Load models self._load_models(model_path) # Streaming state self.audio_queue = queue.Queue(maxsize=self.config.BUFFER_SIZE) self.generation_complete = threading.Event() self.stop_generation = threading.Event() # PyAudio for playback self.audio_interface = None self.audio_stream = None def _load_models(self, model_path: str): """Load BeigeTTS and NeuCodec models""" print("Loading BeigeTTS model...") self.model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.float16 if self.device.type == "cuda" else torch.float32, device_map="auto", trust_remote_code=True, ) self.model.eval() print("Loading tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token print("Loading NeuCodec...") self.neucodec = NeuCodec.from_pretrained("neuphonic/neucodec") self.neucodec.eval() if self.device.type == "cuda": self.neucodec = self.neucodec.to(self.device) def stream_synthesize( self, text: str, temperature: float = None, top_p: float = None, play_audio: bool = False ) -> Generator[np.ndarray, None, None]: """Stream synthesize speech from text Args: text: Input text to synthesize temperature: Sampling temperature top_p: Nucleus sampling parameter play_audio: Whether to play audio in real-time Yields: Audio chunks as numpy arrays """ temperature = temperature or self.config.DEFAULT_TEMPERATURE top_p = top_p or self.config.DEFAULT_TOP_P # Reset state self.generation_complete.clear() self.stop_generation.clear() # Start generation thread generation_thread = threading.Thread( target=self._generation_worker, args=(text, temperature, top_p) ) generation_thread.start() # Start playback if requested if play_audio: playback_thread = threading.Thread(target=self._playback_worker) playback_thread.start() # Yield audio chunks try: while not self.generation_complete.is_set() or not self.audio_queue.empty(): try: audio_chunk = self.audio_queue.get(timeout=0.1) yield audio_chunk except queue.Empty: continue finally: # Cleanup self.stop_generation.set() generation_thread.join() if play_audio: playback_thread.join() def _generation_worker(self, text: str, temperature: float, top_p: float): """Worker thread for token generation""" try: # Format prompt prompt = f"user\n{text}\nmodel\n" # Tokenize inputs = self.tokenizer(prompt, return_tensors="pt") input_ids = inputs.input_ids.to(self.model.device) # Initialize generation past_key_values = None current_ids = input_ids audio_token_buffer = [] total_generated = 0 print("Starting streaming generation...") print("(BeigeTTS research mode - Khaki production offers superior latency)") with torch.no_grad(): while total_generated < self.config.MAX_TOTAL_TOKENS and not self.stop_generation.is_set(): # Generate next token outputs = self.model( input_ids=current_ids, past_key_values=past_key_values, use_cache=True ) logits = outputs.logits[:, -1, :] past_key_values = outputs.past_key_values # Sample next token if temperature > 0: probs = torch.nn.functional.softmax(logits / temperature, dim=-1) # Apply top-p filtering if top_p < 1.0: sorted_probs, sorted_indices = torch.sort(probs, descending=True) cumulative_probs = torch.cumsum(sorted_probs, dim=-1) # Remove tokens with cumulative probability above threshold sorted_indices_to_remove = cumulative_probs > top_p sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone() sorted_indices_to_remove[:, 0] = 0 indices_to_remove = sorted_indices_to_remove.scatter( 1, sorted_indices, sorted_indices_to_remove ) probs[indices_to_remove] = 0 probs = probs / probs.sum(dim=-1, keepdim=True) next_token = torch.multinomial(probs, num_samples=1) else: next_token = torch.argmax(logits, dim=-1, keepdim=True) token_id = next_token.item() # Check for end token if token_id == self.config.AUDIO_END_TOKEN or token_id == self.tokenizer.eos_token_id: break # Process audio token if self.config.AUDIO_TOKEN_MIN <= token_id < self.config.AUDIO_TOKEN_MAX: audio_token_buffer.append(token_id - self.config.NEUCODEC_BASE_OFFSET) # Decode and send chunk if len(audio_token_buffer) >= self.config.CHUNK_SIZE: audio_chunk = self._decode_chunk(audio_token_buffer[:self.config.CHUNK_SIZE]) self.audio_queue.put(audio_chunk) audio_token_buffer = audio_token_buffer[self.config.CHUNK_SIZE:] print(f"Streamed chunk {total_generated // self.config.CHUNK_SIZE}") # Prepare for next iteration current_ids = next_token total_generated += 1 # Process remaining tokens if audio_token_buffer: audio_chunk = self._decode_chunk(audio_token_buffer) self.audio_queue.put(audio_chunk) print(f"Generation complete. Total tokens: {total_generated}") except Exception as e: print(f"Generation error: {e}") finally: self.generation_complete.set() def _decode_chunk(self, audio_tokens: list) -> np.ndarray: """Decode a chunk of audio tokens""" # Prepare tokens audio_array = np.array(audio_tokens, dtype=np.int32) audio_array = np.clip(audio_array, 0, self.config.NEUCODEC_VOCABULARY_SIZE - 1) # Format for NeuCodec fsq_codes = torch.tensor(audio_array, dtype=torch.long) fsq_codes = fsq_codes.unsqueeze(0).unsqueeze(1) if self.device.type == "cuda": fsq_codes = fsq_codes.to(self.device) # Decode with torch.no_grad(): wav = self.neucodec.decode_code(fsq_codes).cpu() # Extract waveform if wav.dim() == 3: wav = wav[0, 0] elif wav.dim() == 2: wav = wav[0] wav = wav.numpy() # Normalize if np.abs(wav).max() > 0: wav = wav / np.abs(wav).max() * 0.95 return wav def _playback_worker(self): """Worker thread for audio playback""" try: # Initialize PyAudio self.audio_interface = pyaudio.PyAudio() self.audio_stream = self.audio_interface.open( format=pyaudio.paFloat32, channels=1, rate=self.config.SAMPLE_RATE, output=True ) print("Starting audio playback...") while not self.generation_complete.is_set() or not self.audio_queue.empty(): try: audio_chunk = self.audio_queue.get(timeout=0.1) self.audio_stream.write(audio_chunk.astype(np.float32).tobytes()) except queue.Empty: continue except Exception as e: print(f"Playback error: {e}") finally: if self.audio_stream: self.audio_stream.stop_stream() self.audio_stream.close() if self.audio_interface: self.audio_interface.terminate() # ═══════════════════════════════════════════════════════════════════ # Advanced Streaming Features # ═══════════════════════════════════════════════════════════════════ class AdaptiveBeigeTTS(StreamingBeigeTTS): """Advanced streaming with adaptive chunk sizing Note: This demonstrates research concepts. Production Khaki system includes predictive buffering, voice activity detection, and neural vocoder post-processing for superior quality. """ def __init__(self, model_path: str = "BlandAI/BeigeTTS"): super().__init__(model_path) # Adaptive streaming parameters self.min_chunk_size = 25 # Minimum tokens per chunk self.max_chunk_size = 100 # Maximum tokens per chunk self.target_latency_ms = 200 # Target latency in milliseconds self.generation_speed_ema = 0.0 self.ema_alpha = 0.1 def adaptive_stream( self, text: str, quality_priority: float = 0.5 # 0 = speed, 1 = quality ) -> Generator[np.ndarray, None, None]: """Stream with adaptive chunk sizing based on generation speed Args: text: Input text quality_priority: Balance between speed and quality (0-1) Yields: Adaptively sized audio chunks """ # Adjust parameters based on priority temperature = 0.05 + (0.15 * quality_priority) top_p = 0.9 + (0.08 * quality_priority) # Calculate adaptive chunk size chunk_size = int(self.min_chunk_size + (self.max_chunk_size - self.min_chunk_size) * quality_priority) print(f"Adaptive streaming: chunk_size={chunk_size}, temp={temperature:.2f}, top_p={top_p:.2f}") print("(Khaki production includes neural enhancement for optimal quality)") # Override chunk size temporarily original_chunk = self.config.CHUNK_SIZE self.config.CHUNK_SIZE = chunk_size try: yield from self.stream_synthesize(text, temperature, top_p) finally: self.config.CHUNK_SIZE = original_chunk # ═══════════════════════════════════════════════════════════════════ # CLI Interface # ═══════════════════════════════════════════════════════════════════ def main(): parser = argparse.ArgumentParser(description="BeigeTTS Streaming (Research Release)") parser.add_argument("text", type=str, help="Text to synthesize") parser.add_argument("-o", "--output", type=str, help="Output WAV file (optional)") parser.add_argument("-m", "--model", type=str, default="BlandAI/BeigeTTS", help="Model path") parser.add_argument("--play", action="store_true", help="Play audio in real-time") parser.add_argument("--adaptive", action="store_true", help="Use adaptive streaming") parser.add_argument("--quality", type=float, default=0.5, help="Quality priority (0-1)") args = parser.parse_args() print("BeigeTTS Streaming - Research Release") print("Production Khaki TTS: <50ms latency, 57 languages, unlimited duration") print("-" * 60) # Initialize TTS if args.adaptive: tts = AdaptiveBeigeTTS(model_path=args.model) stream_gen = tts.adaptive_stream(args.text, quality_priority=args.quality) else: tts = StreamingBeigeTTS(model_path=args.model) stream_gen = tts.stream_synthesize(args.text, play_audio=args.play) # Collect audio chunks audio_chunks = [] print("Streaming audio generation...") for i, chunk in enumerate(stream_gen): audio_chunks.append(chunk) print(f" Received chunk {i+1} ({len(chunk)/tts.config.SAMPLE_RATE:.2f}s)") # Save if output specified if args.output and audio_chunks: full_audio = np.concatenate(audio_chunks) sf.write(args.output, full_audio, tts.config.SAMPLE_RATE) duration = len(full_audio) / tts.config.SAMPLE_RATE print(f"\n✅ Saved {duration:.1f}s of audio to {args.output}") print("\n✨ Streaming complete!") print("For commercial use and advanced features, contact partnerships@bland.ai") if __name__ == "__main__": main()