Rajhuggingface4253 commited on
Commit
a6aca15
·
verified ·
1 Parent(s): b662d71

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +85 -32
app.py CHANGED
@@ -299,41 +299,81 @@ async def text_to_speech(
299
  output_format: str = Form("wav", pattern="^(wav|mp3|flac)$"),
300
  reference_audio: UploadFile = File(...)):
301
  """
302
- Standard blocking TTS endpoint with in-memory processing and caching.
303
  """
304
  if not hasattr(app.state, 'tts_wrapper'):
305
  raise HTTPException(status_code=503, detail="Service unavailable: Model not loaded")
306
 
307
  start_time = time.time()
308
  try:
309
- # 1. Convert the uploaded file to WAV directly in memory
310
- converted_wav_buffer = await convert_to_wav_in_memory(reference_audio)
 
 
 
 
 
 
 
 
311
  ref_audio_bytes = converted_wav_buffer.getvalue()
312
-
313
- # 2. Offload the blocking AI process (now faster with caching)
314
- audio_data = await run_blocking_task_async(
315
- app.state.tts_wrapper.generate_speech_blocking,
316
- text,
317
- ref_audio_bytes, # Pass bytes, not a path
318
- reference_text
 
 
319
  )
320
-
321
- # 3. Convert to requested output format
322
- audio_bytes = await run_blocking_task_async(
323
- app.state.tts_wrapper._convert_to_streamable_format,
324
- audio_data,
325
- output_format
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
326
  )
327
 
328
  processing_time = time.time() - start_time
329
- audio_duration = len(audio_data) / SAMPLE_RATE
 
 
330
  return Response(
331
  content=audio_bytes,
332
  media_type=f"audio/{'mpeg' if output_format == 'mp3' else output_format}",
333
  headers={
334
  "Content-Disposition": f"attachment; filename=tts_output.{output_format}",
335
  "X-Processing-Time": f"{processing_time:.2f}s",
336
- "X-Audio-Duration": f"{audio_duration:.2f}s"
 
 
337
  }
338
  )
339
  except Exception as e:
@@ -341,7 +381,6 @@ async def text_to_speech(
341
  if isinstance(e, HTTPException):
342
  raise
343
  raise HTTPException(status_code=500, detail=f"Synthesis failed: {e}")
344
-
345
  @app.post("/synthesize/stream")
346
  async def stream_text_to_speech_cloning(
347
  text: str = Form(..., min_length=1, max_length=5000),
@@ -350,15 +389,16 @@ async def stream_text_to_speech_cloning(
350
  output_format: str = Form("mp3", pattern="^(wav|mp3|flac)$"),
351
  reference_audio: UploadFile = File(...)):
352
  """
353
- Sentence-by-Sentence Streaming using a high-performance, asyncio-native
354
- producer-consumer pipeline.
355
  """
356
  if not hasattr(app.state, 'tts_wrapper'):
357
  raise HTTPException(status_code=503, detail="Service unavailable: Model not loaded")
358
 
359
  async def stream_generator():
360
  loop = asyncio.get_event_loop()
361
- q = asyncio.Queue(maxsize=2)
 
 
362
 
363
  async def producer():
364
  try:
@@ -366,7 +406,7 @@ async def stream_text_to_speech_cloning(
366
  ref_audio_bytes = converted_wav_buffer.getvalue()
367
  audio_hash = hashlib.sha256(ref_audio_bytes).hexdigest()
368
 
369
- # Use LRU cache like blocking endpoint
370
  ref_s = await loop.run_in_executor(
371
  tts_executor,
372
  app.state.tts_wrapper._get_or_create_reference_encoding,
@@ -375,39 +415,52 @@ async def stream_text_to_speech_cloning(
375
  )
376
 
377
  sentences = app.state.tts_wrapper._split_text_into_chunks(text)
 
378
 
379
  def process_chunk(sentence_text):
380
  with torch.no_grad():
381
  audio_chunk = app.state.tts_wrapper.tts_model.infer(sentence_text, ref_s, reference_text)
382
  return app.state.tts_wrapper._convert_to_streamable_format(audio_chunk, output_format)
383
 
384
- # Schedule all chunks to be processed in the background.
385
  for sentence in sentences:
386
  task = loop.run_in_executor(tts_executor, process_chunk, sentence)
387
- await q.put(task) # Put the FUTURE, not the result, in the queue.
388
-
389
  except Exception as e:
390
  logger.error(f"Error in producer task: {e}")
391
  await q.put(e)
392
  finally:
393
- await q.put(None) # Signal that all tasks have been scheduled.
394
 
395
  producer_task = asyncio.create_task(producer())
396
 
397
- # The CONSUMER's job is to wait for each result and yield it.
398
- while True:
 
 
 
 
 
399
  result = await q.get()
400
- if result is None:
401
- break
402
 
403
  if isinstance(result, Exception):
404
  logger.error(f"Terminating stream due to producer error: {result}")
405
  raise result
406
 
407
- # Await the result of the background task
 
 
 
408
  chunk_bytes = await result
409
  yield chunk_bytes
 
410
 
 
 
 
 
 
411
  await producer_task
412
 
413
  return StreamingResponse(
 
299
  output_format: str = Form("wav", pattern="^(wav|mp3|flac)$"),
300
  reference_audio: UploadFile = File(...)):
301
  """
302
+ MAXIMUM SPEED TTS endpoint with full parallel processing.
303
  """
304
  if not hasattr(app.state, 'tts_wrapper'):
305
  raise HTTPException(status_code=503, detail="Service unavailable: Model not loaded")
306
 
307
  start_time = time.time()
308
  try:
309
+ # PARALLEL STEP 1: Convert audio AND split text concurrently
310
+ converted_wav_buffer, sentences = await asyncio.gather(
311
+ convert_to_wav_in_memory(reference_audio),
312
+ asyncio.get_event_loop().run_in_executor(
313
+ tts_executor,
314
+ app.state.tts_wrapper._split_text_into_chunks,
315
+ text
316
+ )
317
+ )
318
+
319
  ref_audio_bytes = converted_wav_buffer.getvalue()
320
+ audio_hash = hashlib.sha256(ref_audio_bytes).hexdigest()
321
+ logger.info(f"🚀 MAX PARALLEL: Processing {len(sentences)} chunks")
322
+
323
+ # ✅ PARALLEL STEP 2: Get reference encoding
324
+ ref_s = await asyncio.get_event_loop().run_in_executor(
325
+ tts_executor,
326
+ app.state.tts_wrapper._get_or_create_reference_encoding,
327
+ audio_hash,
328
+ ref_audio_bytes
329
  )
330
+
331
+ # ✅ MAX PARALLEL STEP 3: Process ALL chunks simultaneously
332
+ loop = asyncio.get_event_loop()
333
+
334
+ def process_single_chunk(sentence_text):
335
+ with torch.no_grad():
336
+ return app.state.tts_wrapper.tts_model.infer(sentence_text, ref_s, reference_text)
337
+
338
+ # Schedule ALL chunks in parallel (limited by MAX_WORKERS)
339
+ tasks = []
340
+ for sentence in sentences:
341
+ task = loop.run_in_executor(tts_executor, process_single_chunk, sentence)
342
+ tasks.append(task)
343
+
344
+ # Wait for ALL chunks to complete
345
+ chunk_audios = await asyncio.gather(*tasks)
346
+
347
+ # ✅ Combine all audio chunks (fast numpy concatenation)
348
+ combined_audio = np.concatenate(chunk_audios) if chunk_audios else np.array([])
349
+
350
+ # ✅ PARALLEL STEP 4: Convert format while calculating stats
351
+ audio_bytes, audio_duration = await asyncio.gather(
352
+ asyncio.get_event_loop().run_in_executor(
353
+ tts_executor,
354
+ app.state.tts_wrapper._convert_to_streamable_format,
355
+ combined_audio,
356
+ output_format
357
+ ),
358
+ asyncio.get_event_loop().run_in_executor(
359
+ tts_executor,
360
+ lambda: len(combined_audio) / SAMPLE_RATE
361
+ )
362
  )
363
 
364
  processing_time = time.time() - start_time
365
+
366
+ logger.info(f"✅ MAX SPEED Synthesis: {processing_time:.2f}s for {audio_duration:.2f}s audio ({len(sentences)} chunks)")
367
+
368
  return Response(
369
  content=audio_bytes,
370
  media_type=f"audio/{'mpeg' if output_format == 'mp3' else output_format}",
371
  headers={
372
  "Content-Disposition": f"attachment; filename=tts_output.{output_format}",
373
  "X-Processing-Time": f"{processing_time:.2f}s",
374
+ "X-Audio-Duration": f"{audio_duration:.2f}s",
375
+ "X-Parallel-Chunks": str(len(sentences)),
376
+ "X-Speed-Ratio": f"{audio_duration/processing_time:.2f}x" # Real-time factor
377
  }
378
  )
379
  except Exception as e:
 
381
  if isinstance(e, HTTPException):
382
  raise
383
  raise HTTPException(status_code=500, detail=f"Synthesis failed: {e}")
 
384
  @app.post("/synthesize/stream")
385
  async def stream_text_to_speech_cloning(
386
  text: str = Form(..., min_length=1, max_length=5000),
 
389
  output_format: str = Form("mp3", pattern="^(wav|mp3|flac)$"),
390
  reference_audio: UploadFile = File(...)):
391
  """
392
+ TRUE Real-Time Streaming with 2 workers: Optimized for continuous audio.
 
393
  """
394
  if not hasattr(app.state, 'tts_wrapper'):
395
  raise HTTPException(status_code=503, detail="Service unavailable: Model not loaded")
396
 
397
  async def stream_generator():
398
  loop = asyncio.get_event_loop()
399
+
400
+ # ✅ Perfect queue size for 2 workers
401
+ q = asyncio.Queue(maxsize=3) # Store 3 ready chunks for smooth streaming
402
 
403
  async def producer():
404
  try:
 
406
  ref_audio_bytes = converted_wav_buffer.getvalue()
407
  audio_hash = hashlib.sha256(ref_audio_bytes).hexdigest()
408
 
409
+ # Get reference encoding (uses 1 worker temporarily)
410
  ref_s = await loop.run_in_executor(
411
  tts_executor,
412
  app.state.tts_wrapper._get_or_create_reference_encoding,
 
415
  )
416
 
417
  sentences = app.state.tts_wrapper._split_text_into_chunks(text)
418
+ logger.info(f"Streaming {len(sentences)} chunks with 2 workers")
419
 
420
  def process_chunk(sentence_text):
421
  with torch.no_grad():
422
  audio_chunk = app.state.tts_wrapper.tts_model.infer(sentence_text, ref_s, reference_text)
423
  return app.state.tts_wrapper._convert_to_streamable_format(audio_chunk, output_format)
424
 
425
+ # SCHEDULE ALL TASKS IMMEDIATELY
426
  for sentence in sentences:
427
  task = loop.run_in_executor(tts_executor, process_chunk, sentence)
428
+ await q.put(task) # Queue futures immediately
429
+
430
  except Exception as e:
431
  logger.error(f"Error in producer task: {e}")
432
  await q.put(e)
433
  finally:
434
+ await q.put(None) # Signal end of tasks
435
 
436
  producer_task = asyncio.create_task(producer())
437
 
438
+ # EFFICIENT CONSUMER for 2 workers
439
+ pending_tasks = set()
440
+ completed_count = 0
441
+ total_chunks = len(app.state.tts_wrapper._split_text_into_chunks(text))
442
+
443
+ while completed_count < total_chunks:
444
+ # Get next item from queue
445
  result = await q.get()
 
 
446
 
447
  if isinstance(result, Exception):
448
  logger.error(f"Terminating stream due to producer error: {result}")
449
  raise result
450
 
451
+ if result is None:
452
+ break # No more tasks coming
453
+
454
+ # ✅ Process this chunk immediately
455
  chunk_bytes = await result
456
  yield chunk_bytes
457
+ completed_count += 1
458
 
459
+ # ✅ Check if we can process next chunk without waiting
460
+ # This ensures continuous streaming
461
+ if completed_count < total_chunks and not q.empty():
462
+ continue # Immediately process next ready chunk
463
+
464
  await producer_task
465
 
466
  return StreamingResponse(