import asyncio from kokoro import KPipeline import numpy as np import struct import time async def main(): pipeline = KPipeline(lang_code='a') generator = pipeline("Hello, this is a test of streaming audio from Kokoro.", voice="am_adam", speed=1, split_pattern=r'\n+') with open("test_stream.wav", "wb") as f: # Write WAV header # chunk_size = 36 + data_size header = struct.pack('<4sI4s4sIHHIIHH4sI', b'RIFF', 0xFFFFFFFF, b'WAVE', b'fmt ', 16, 1, 1, 24000, 48000, 2, 16, b'data', 0xFFFFFFFF ) f.write(header) for gs, ps, audio in generator: if audio is not None: print("Got chunk:", len(audio)) pcm = (np.clip(audio, -1.0, 1.0) * 32767).astype(np.int16).tobytes() f.write(pcm) if __name__ == "__main__": asyncio.run(main())