yukee1992 commited on
Commit
05428d2
·
verified ·
1 Parent(s): 2efdbd8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +151 -638
app.py CHANGED
@@ -14,12 +14,12 @@ from pydantic import BaseModel
14
  import torch
15
  import numpy as np
16
 
17
- # Configure environment
18
  os.makedirs("/tmp/voices", exist_ok=True)
19
  os.makedirs("/tmp/output", exist_ok=True)
20
 
21
  # Initialize FastAPI app
22
- app = FastAPI(title="Enhanced TTS API", description="API for text-to-speech with multiple voice styles and voice cloning")
23
 
24
  # Add CORS middleware
25
  app.add_middleware(
@@ -36,71 +36,35 @@ DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
36
 
37
  print(f"✅ Using device: {DEVICE}")
38
 
39
- # Available models with different voice styles
40
  AVAILABLE_MODELS = {
41
- "xtts-v2": {
42
- "name": "XTTS-v2",
43
- "model_name": "tts_models/multilingual/multi-dataset/xtts_v2",
44
- "description": "Multilingual model with voice cloning support",
45
- "languages": ["en", "es", "fr", "de", "it", "pt", "pl", "tr", "ru", "nl", "cs", "ar", "zh-cn", "ja", "hu", "ko"],
46
- "voice_cloning": True,
47
- "default_voice": "female_01"
48
- },
49
  "tacotron2-ddc": {
50
  "name": "Tacotron2-DDC",
51
  "model_name": "tts_models/en/ljspeech/tacotron2-DDC",
52
- "description": "High-quality English TTS (fast and reliable)",
53
- "languages": ["en"],
54
- "voice_cloning": False,
55
- "default_voice": "default"
56
- },
57
- "glow-tts": {
58
- "name": "Glow-TTS",
59
- "model_name": "tts_models/en/ljspeech/glow-tts",
60
- "description": "Fast and high-quality English TTS",
61
  "languages": ["en"],
62
  "voice_cloning": False,
63
- "default_voice": "default"
 
64
  }
65
  }
66
 
67
- # Built-in voice styles for XTTS-v2 with better descriptions
68
- BUILTIN_VOICES = {
69
- "female_01": {
70
- "name": "Female Voice 1",
71
- "gender": "female",
72
- "language": "multilingual",
73
- "description": "Clear and natural female voice"
74
- },
75
- "female_02": {
76
- "name": "Female Voice 2",
77
- "gender": "female",
78
- "language": "multilingual",
79
- "description": "Warm and friendly female voice"
80
- },
81
- "female_03": {
82
- "name": "Female Voice 3",
83
- "gender": "female",
84
- "language": "multilingual",
85
- "description": "Professional and articulate female voice"
86
- },
87
- "male_01": {
88
- "name": "Male Voice 1",
89
- "gender": "male",
90
- "language": "multilingual",
91
- "description": "Deep and clear male voice"
92
  },
93
- "male_02": {
94
- "name": "Male Voice 2",
95
- "gender": "male",
96
- "language": "multilingual",
97
- "description": "Friendly and approachable male voice"
98
  },
99
- "default": {
100
- "name": "Default Voice",
101
- "gender": "neutral",
102
- "language": "multilingual",
103
- "description": "Balanced and natural voice"
104
  }
105
  }
106
 
@@ -108,69 +72,89 @@ BUILTIN_VOICES = {
108
  tts = None
109
  model_loaded = False
110
  current_model = ""
111
- voice_cloning_supported = False
112
  model_loading = False
113
- model_load_attempts = 0
114
- active_model_config = None
115
 
116
  # Pydantic models
117
  class TTSRequest(BaseModel):
118
  text: str
119
  project_id: str
120
- voice_name: Optional[str] = "female_01"
121
- language: Optional[str] = "en"
122
- model_type: Optional[str] = "xtts-v2"
123
  speed: Optional[float] = 1.0
124
- temperature: Optional[float] = 0.75
125
 
126
  class BatchTTSRequest(BaseModel):
127
  texts: List[str]
128
  project_id: str
129
- voice_name: Optional[str] = "female_01"
130
- language: Optional[str] = "en"
131
- model_type: Optional[str] = "xtts-v2"
132
  speed: Optional[float] = 1.0
133
- temperature: Optional[float] = 0.75
134
 
135
- class VoiceCloneRequest(BaseModel):
136
- project_id: str
137
- voice_name: str
138
- description: Optional[str] = ""
139
- model_type: Optional[str] = "xtts-v2"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
140
 
141
- class VoiceStyleRequest(BaseModel):
142
- voice_name: str
143
- style: str
144
- intensity: Optional[float] = 1.0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
145
 
146
- # Enhanced helper functions
147
  def clean_text(text):
148
- """Clean text for TTS generation with better handling"""
149
  import re
150
 
151
  if not text or not isinstance(text, str):
152
  return "Hello"
153
 
154
- # Remove any problematic characters but keep basic punctuation and multilingual characters
155
- text = re.sub(r'[^\w\s\.\,\!\?\-\'\"\:\;\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', '', text)
156
-
157
- # Replace multiple spaces with single space
158
  text = re.sub(r'\s+', ' ', text)
159
 
160
- # Ensure text ends with punctuation if it's a sentence
161
  if len(text) > 10 and not re.search(r'[\.\!\?]$', text):
162
  text = text + '.'
163
 
164
  text = text.strip()
165
 
166
- # If text is empty after cleaning, use default
167
  if not text:
168
  text = "Hello world"
169
 
170
  return text
171
 
172
  def upload_to_oci(file_path: str, filename: str, project_id: str, file_type="voiceover"):
173
- """Upload file to OCI using your existing API with subfolder support"""
174
  try:
175
  if not OCI_UPLOAD_API_URL:
176
  return None, "OCI upload API URL not configured"
@@ -198,144 +182,24 @@ def upload_to_oci(file_path: str, filename: str, project_id: str, file_type="voi
198
  except Exception as e:
199
  return None, f"Upload error: {str(e)}"
200
 
201
- def upload_to_oci_with_retry(file_path: str, filename: str, project_id: str, file_type="voiceover", max_retries=3):
202
- """Upload file to OCI with retry logic"""
203
- for attempt in range(max_retries):
204
- try:
205
- print(f"🔄 Upload attempt {attempt + 1} of {max_retries} for {filename}")
206
- result, error = upload_to_oci(file_path, filename, project_id, file_type)
207
-
208
- if error:
209
- if attempt < max_retries - 1:
210
- wait_time = 2 ** attempt
211
- print(f"⏳ Upload failed, retrying in {wait_time}s: {error}")
212
- time.sleep(wait_time)
213
- continue
214
- else:
215
- return None, error
216
- else:
217
- return result, None
218
-
219
- except Exception as e:
220
- if attempt < max_retries - 1:
221
- wait_time = 2 ** attempt
222
- print(f"⏳ Upload exception, retrying in {wait_time}s: {str(e)}")
223
- time.sleep(wait_time)
224
- continue
225
- else:
226
- return None, f"Upload failed after {max_retries} attempts: {str(e)}"
227
-
228
- return None, "Upload failed: unexpected error"
229
-
230
- def get_voice_path(voice_name: str):
231
- """Get path to voice file with enhanced voice management"""
232
- if voice_name == "default":
233
- return None
234
-
235
- # Check if it's a built-in voice
236
- if voice_name in BUILTIN_VOICES:
237
- return None
238
-
239
- voice_path = Path(f"/tmp/voices/{voice_name}")
240
- if voice_path.is_dir():
241
- samples = list(voice_path.glob("sample_*.wav"))
242
- return str(samples[0]) if samples else None
243
- else:
244
- voice_file = Path(f"/tmp/voices/{voice_name}.wav")
245
- return str(voice_file) if voice_file.exists() else None
246
-
247
- def clone_voice(voice_name: str, audio_files: List[str], description: str = ""):
248
- """Enhanced voice cloning with better sample management"""
249
- try:
250
- print(f"🎙️ Cloning voice: {voice_name}")
251
-
252
- voice_dir = f"/tmp/voices/{voice_name}"
253
- os.makedirs(voice_dir, exist_ok=True)
254
-
255
- # Save metadata about the cloned voice
256
- metadata = {
257
- "name": voice_name,
258
- "description": description,
259
- "samples_count": len(audio_files),
260
- "created_at": datetime.now().isoformat(),
261
- "samples": []
262
- }
263
-
264
- for i, audio_file in enumerate(audio_files):
265
- dest_path = f"{voice_dir}/sample_{i+1:02d}.wav"
266
- shutil.copy2(audio_file, dest_path)
267
- metadata["samples"].append({
268
- "sample_id": i+1,
269
- "filename": f"sample_{i+1:02d}.wav",
270
- "file_size": os.path.getsize(dest_path)
271
- })
272
- print(f" Copied sample {i+1} to: {dest_path}")
273
-
274
- # Save metadata
275
- with open(f"{voice_dir}/metadata.json", "w") as f:
276
- import json
277
- json.dump(metadata, f, indent=2)
278
-
279
- print(f"✅ Voice cloning completed for {voice_name} with {len(audio_files)} samples")
280
- return True, f"Voice '{voice_name}' is ready for use with {len(audio_files)} samples"
281
-
282
- except Exception as e:
283
- return False, f"Voice cloning failed: {str(e)}"
284
-
285
- def supports_voice_cloning():
286
- """Check if the current model supports voice cloning"""
287
- return voice_cloning_supported
288
-
289
- def save_wav(audio, file_path, sample_rate=22050):
290
- """Save audio to WAV file manually"""
291
- try:
292
- # Try soundfile first
293
- try:
294
- import soundfile as sf
295
- sf.write(file_path, audio, sample_rate)
296
- return True
297
- except ImportError:
298
- print("⚠️ soundfile not available, using fallback method")
299
-
300
- # Fallback: use wave library
301
- import wave
302
- import numpy as np
303
-
304
- # Ensure audio is numpy array
305
- if isinstance(audio, list):
306
- audio = np.array(audio)
307
-
308
- # Convert to 16-bit PCM
309
- audio_int16 = (audio * 32767).astype(np.int16)
310
-
311
- with wave.open(file_path, 'wb') as wav_file:
312
- wav_file.setnchannels(1) # Mono
313
- wav_file.setsampwidth(2) # 16-bit
314
- wav_file.setframerate(sample_rate) # Sample rate
315
- wav_file.writeframes(audio_int16.tobytes())
316
-
317
- return True
318
-
319
- except Exception as e:
320
- print(f"❌ Failed to save WAV: {e}")
321
- return False
322
-
323
- def load_tts_model(model_type="xtts-v2"):
324
- """ROBUST MODEL LOADING: Proper XTTS-v2 handling"""
325
- global tts, model_loaded, current_model, voice_cloning_supported, model_loading, model_load_attempts, active_model_config
326
 
327
  if model_loading:
328
  print("⏳ Model is already being loaded...")
329
  return False
330
 
331
  if model_type not in AVAILABLE_MODELS:
332
- print(f"❌ Model type '{model_type}' not found. Available: {list(AVAILABLE_MODELS.keys())}")
333
  return False
334
 
335
  model_loading = True
336
- model_load_attempts += 1
337
 
338
  try:
 
 
 
339
  from TTS.api import TTS
340
 
341
  # Handle TOS acceptance automatically
@@ -352,47 +216,25 @@ def load_tts_model(model_type="xtts-v2"):
352
  # Load the selected model
353
  tts = TTS(model_config["model_name"]).to(DEVICE)
354
 
355
- # Mark as loaded immediately
 
 
 
 
 
 
 
356
  model_loaded = True
357
  current_model = model_config["model_name"]
358
- voice_cloning_supported = model_config["voice_cloning"]
359
- active_model_config = model_config
360
 
361
  print(f"✅ {model_config['name']} loaded successfully!")
362
- print(f" Voice cloning: {'✅ Supported' if voice_cloning_supported else '❌ Not supported'}")
363
- print(f" Languages: {', '.join(model_config['languages'])}")
364
-
365
- # Try a simple test but don't fail if it doesn't work
366
- try:
367
- test_path = "/tmp/test_output.wav"
368
- if model_config["voice_cloning"]:
369
- # For XTTS-v2, test without speaker_wav to use built-in voices
370
- tts.tts_to_file(
371
- text="This is a test of the voice system.",
372
- file_path=test_path,
373
- language="en"
374
- )
375
- else:
376
- # For non-voice-cloning models
377
- tts.tts_to_file(text="This is a test of the voice system.", file_path=test_path)
378
-
379
- if os.path.exists(test_path):
380
- os.remove(test_path)
381
- print("✅ Model test completed successfully!")
382
- else:
383
- print("⚠️ Test file not created, but model is loaded")
384
- except Exception as test_error:
385
- print(f"⚠️ Model test failed but model is loaded: {test_error}")
386
 
387
  return True
388
 
389
  except Exception as e:
390
- print(f"❌ {model_config['name']} model failed to load: {e}")
391
- # Fallback to Tacotron2 if XTTS fails
392
- if model_type == "xtts-v2":
393
- print("🔄 Falling back to Tacotron2...")
394
- model_loading = False # Reset loading state
395
- return load_tts_model("tacotron2-ddc")
396
  return False
397
 
398
  finally:
@@ -404,51 +246,29 @@ def load_tts_model(model_type="xtts-v2"):
404
  finally:
405
  model_loading = False
406
 
407
- def validate_language(language: str, model_type: str) -> bool:
408
- """Validate if language is supported by the current model"""
409
- if model_type not in AVAILABLE_MODELS:
410
- return False
411
- return language in AVAILABLE_MODELS[model_type]["languages"]
412
-
413
- # Enhanced API endpoints
414
  @app.post("/api/tts")
415
  async def generate_tts(request: TTSRequest):
416
- """ENHANCED TTS generation with better voice quality and naturalness"""
417
  try:
418
- # Lazy load model on first request
 
 
 
419
  if not model_loaded:
420
- if not load_tts_model(request.model_type):
421
  return {
422
  "status": "error",
423
- "message": f"TTS model '{request.model_type}' failed to load. Please check the logs.",
424
  "requires_tos_acceptance": True,
425
  "tos_url": "https://coqui.ai/cpml.txt"
426
  }
427
 
428
  print(f"📥 TTS request for project: {request.project_id}")
429
- print(f" Model: {request.model_type}")
430
  print(f" Text length: {len(request.text)} characters")
431
- print(f" Voice: {request.voice_name}")
432
- print(f" Language: {request.language}")
433
- print(f" Speed: {request.speed}")
434
-
435
- # Validate language
436
- if not validate_language(request.language, request.model_type):
437
- return {
438
- "status": "error",
439
- "message": f"Language '{request.language}' is not supported by {request.model_type}. Supported languages: {', '.join(active_model_config['languages'])}",
440
- "supported_languages": active_model_config['languages']
441
- }
442
 
443
- # Check if voice cloning is requested but not supported
444
- if request.voice_name != "default" and request.voice_name not in BUILTIN_VOICES and not supports_voice_cloning():
445
- return {
446
- "status": "error",
447
- "message": "Voice cloning is not supported with the current model. Please use 'xtts-v2' model for voice cloning.",
448
- "model": current_model
449
- }
450
-
451
- # Generate unique filename with sequential naming
452
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
453
  filename = f"voiceover_{timestamp}.wav"
454
  output_path = f"/tmp/output/{filename}"
@@ -456,91 +276,29 @@ async def generate_tts(request: TTSRequest):
456
  # Ensure output directory exists
457
  os.makedirs(os.path.dirname(output_path), exist_ok=True)
458
 
459
- # Get voice path - only for custom cloned voices
460
- speaker_wav = None
461
- if request.voice_name not in BUILTIN_VOICES and request.voice_name != "default":
462
- speaker_wav = get_voice_path(request.voice_name)
463
- if not speaker_wav:
464
- return {
465
- "status": "error",
466
- "message": f"Voice '{request.voice_name}' not found. Available voices: {list(BUILTIN_VOICES.keys()) + [v for v in await list_voices_internal()]}"
467
- }
468
-
469
- print(f"🔊 Generating TTS to: {output_path}")
470
- if speaker_wav:
471
- print(f"🎙️ Using custom voice: {request.voice_name}")
472
- else:
473
- print(f"🎙️ Using built-in voice: {request.voice_name}")
474
-
475
- # Clean the text before generation
476
  cleaned_text = clean_text(request.text)
477
- print(f"📝 Original text: '{request.text}'")
478
- print(f"📝 Cleaned text: '{cleaned_text}'")
479
 
480
- # Generate TTS based on model capabilities - WITH ERROR HANDLING
481
  try:
482
- if supports_voice_cloning():
483
- # XTTS model with voice cloning support
484
- if speaker_wav:
485
- # Custom voice with speaker file
486
- tts.tts_to_file(
487
- text=cleaned_text,
488
- speaker_wav=speaker_wav,
489
- language=request.language,
490
- file_path=output_path
491
- )
492
- else:
493
- # Built-in XTTS voice (no speaker_wav)
494
- tts.tts_to_file(
495
- text=cleaned_text,
496
- language=request.language,
497
- file_path=output_path
498
- )
499
- else:
500
- # Non-voice-cloning models
501
- tts.tts_to_file(
502
- text=cleaned_text,
503
- file_path=output_path
504
- )
505
  except Exception as tts_error:
506
  print(f"❌ TTS generation failed: {tts_error}")
507
- # Try alternative approach
508
- try:
509
- print("🔄 Trying alternative TTS generation method...")
510
- if supports_voice_cloning():
511
- if speaker_wav:
512
- audio = tts.tts(
513
- text=cleaned_text,
514
- speaker_wav=speaker_wav,
515
- language=request.language
516
- )
517
- else:
518
- audio = tts.tts(
519
- text=cleaned_text,
520
- language=request.language
521
- )
522
- else:
523
- audio = tts.tts(text=cleaned_text)
524
-
525
- # Save manually
526
- if not save_wav(audio, output_path):
527
- raise Exception("Failed to save audio file")
528
-
529
- except Exception as alt_error:
530
- print(f"❌ Alternative method also failed: {alt_error}")
531
- raise alt_error
532
 
533
  # Verify the file was created
534
  if not os.path.exists(output_path):
535
- raise Exception(f"TTS failed to create output file: {output_path}")
536
 
537
  file_size = os.path.getsize(output_path)
538
  print(f"✅ TTS generated: {output_path} ({file_size} bytes)")
539
 
540
  # Upload to OCI
541
- upload_result, error = upload_to_oci_with_retry(
542
- output_path, filename, request.project_id, "voiceover"
543
- )
544
 
545
  if error:
546
  print(f"❌ OCI upload failed: {error}")
@@ -554,7 +312,7 @@ async def generate_tts(request: TTSRequest):
554
 
555
  print(f"✅ Upload successful: {filename}")
556
 
557
- # Clean up local file
558
  try:
559
  os.remove(output_path)
560
  print(f"🧹 Cleaned up local file: {output_path}")
@@ -567,98 +325,38 @@ async def generate_tts(request: TTSRequest):
567
  "filename": filename,
568
  "oci_path": upload_result.get("path", f"{request.project_id}/voiceover/{filename}"),
569
  "model_used": current_model,
570
- "model_type": request.model_type,
571
- "voice_cloning_used": supports_voice_cloning() and speaker_wav is not None,
572
- "voice_style": request.voice_name
573
  }
574
 
575
  except Exception as e:
576
  print(f"❌ TTS generation error: {str(e)}")
577
  return {
578
  "status": "error",
579
- "message": f"TTS generation failed: {str(e)}",
580
- "model": current_model,
581
- "model_type": request.model_type if 'request' in locals() else "unknown",
582
- "voice_cloning_supported": supports_voice_cloning()
583
  }
584
 
585
- async def list_voices_internal():
586
- """Internal function to list available voices"""
587
- voices_dir = Path("/tmp/voices")
588
- voices = []
589
-
590
- for item in voices_dir.iterdir():
591
- if item.is_dir():
592
- samples = list(item.glob("sample_*.wav"))
593
- voices.append(item.name)
594
- elif item.is_file() and item.suffix == ".wav":
595
- voices.append(item.stem)
596
-
597
- return voices
598
-
599
- @app.get("/api/models")
600
- async def list_models():
601
- """List available TTS models"""
602
- return {
603
- "status": "success",
604
- "models": AVAILABLE_MODELS,
605
- "current_model": current_model if model_loaded else None,
606
- "model_loaded": model_loaded
607
- }
608
-
609
- @app.post("/api/set-model")
610
- async def set_model(model_type: str = Form(...)):
611
- """Switch between different TTS models"""
612
- if model_type not in AVAILABLE_MODELS:
613
- raise HTTPException(status_code=400, detail=f"Model type '{model_type}' not found. Available: {list(AVAILABLE_MODELS.keys())}")
614
-
615
- success = load_tts_model(model_type)
616
-
617
- if success:
618
- return {
619
- "status": "success",
620
- "message": f"Model switched to {AVAILABLE_MODELS[model_type]['name']}",
621
- "model": current_model,
622
- "voice_cloning_supported": voice_cloning_supported
623
- }
624
- else:
625
- raise HTTPException(status_code=500, detail=f"Failed to load model: {model_type}")
626
-
627
- @app.get("/api/builtin-voices")
628
- async def get_builtin_voices():
629
- """Get list of built-in voice styles"""
630
- return {
631
- "status": "success",
632
- "voices": BUILTIN_VOICES,
633
- "voice_cloning_supported": voice_cloning_supported
634
- }
635
-
636
  @app.post("/api/batch-tts")
637
  async def batch_generate_tts(request: BatchTTSRequest):
638
- """Enhanced batch TTS with model selection"""
639
  try:
640
- # Lazy load model
 
641
  if not model_loaded:
642
- if not load_tts_model(request.model_type):
643
- raise HTTPException(status_code=500, detail=f"TTS model '{request.model_type}' failed to load")
644
 
645
  print(f"📥 Batch TTS request for {len(request.texts)} texts")
646
 
647
  results = []
648
  for i, text in enumerate(request.texts):
649
  try:
650
- # Create individual TTS request
651
  single_request = TTSRequest(
652
  text=text,
653
  project_id=request.project_id,
654
- voice_name=request.voice_name,
655
- language=request.language,
656
- model_type=request.model_type,
657
- speed=request.speed,
658
- temperature=request.temperature
659
  )
660
 
661
- # Use the single TTS endpoint
662
  result = await generate_tts(single_request)
663
  results.append({
664
  "text_index": i,
@@ -679,246 +377,61 @@ async def batch_generate_tts(request: BatchTTSRequest):
679
  "status": "completed",
680
  "project_id": request.project_id,
681
  "results": results,
682
- "model_used": current_model,
683
- "model_type": request.model_type,
684
- "voice_cloning": supports_voice_cloning() and request.voice_name != "default"
685
  }
686
 
687
  except Exception as e:
688
  print(f"❌ Batch TTS generation error: {str(e)}")
689
  raise HTTPException(status_code=500, detail=f"Batch TTS generation failed: {str(e)}")
690
 
691
- @app.post("/api/clone-voice")
692
- async def api_clone_voice(
693
- project_id: str = Form(...),
694
- voice_name: str = Form(...),
695
- description: str = Form(""),
696
- files: List[UploadFile] = File(...),
697
- model_type: str = Form("xtts-v2")
698
- ):
699
- """Enhanced voice cloning with model validation"""
700
- try:
701
- # Ensure we're using a model that supports voice cloning
702
- if model_type != "xtts-v2":
703
- raise HTTPException(
704
- status_code=400,
705
- detail="Voice cloning is only supported with the 'xtts-v2' model. Please switch to XTTS-v2 for voice cloning."
706
- )
707
-
708
- # Load XTTS model if not already loaded
709
- if not model_loaded or current_model != AVAILABLE_MODELS["xtts-v2"]["model_name"]:
710
- if not load_tts_model("xtts-v2"):
711
- raise HTTPException(status_code=500, detail="XTTS-v2 model failed to load. Voice cloning requires XTTS-v2.")
712
-
713
- # Save uploaded files temporarily
714
- temp_files = []
715
- for i, file in enumerate(files):
716
- if not file.filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
717
- raise HTTPException(status_code=400, detail="Only audio files are allowed")
718
-
719
- temp_path = f"/tmp/{uuid.uuid4()}_{file.filename}"
720
- with open(temp_path, "wb") as f:
721
- content = await file.read()
722
- f.write(content)
723
- temp_files.append(temp_path)
724
-
725
- success, message = clone_voice(voice_name, temp_files, description)
726
-
727
- # Clean up temporary files
728
- for temp_file in temp_files:
729
- try:
730
- os.remove(temp_file)
731
- except:
732
- pass
733
-
734
- if success:
735
- return {
736
- "status": "success",
737
- "message": message,
738
- "voice_name": voice_name,
739
- "model_used": current_model
740
- }
741
- else:
742
- raise HTTPException(status_code=500, detail=message)
743
-
744
- except Exception as e:
745
- print(f"❌ Voice cloning error: {str(e)}")
746
- raise HTTPException(status_code=500, detail=f"Voice cloning failed: {str(e)}")
747
-
748
- @app.post("/api/upload-voice")
749
- async def upload_voice_sample(
750
- project_id: str = Form(...),
751
- voice_name: str = Form(...),
752
- file: UploadFile = File(...)
753
- ):
754
- """Upload a voice sample for cloning"""
755
- try:
756
- print(f"📥 Voice upload request: {voice_name} for project {project_id}")
757
-
758
- # Check if voice cloning is supported
759
- if not supports_voice_cloning():
760
- raise HTTPException(
761
- status_code=400,
762
- detail="Voice cloning is not supported with the current model. Please use the XTTS model for voice cloning."
763
- )
764
-
765
- # Validate file type
766
- if not file.filename.lower().endswith(('.wav', '.mp3', '.ogg', '.flac')):
767
- raise HTTPException(status_code=400, detail="Only audio files are allowed")
768
-
769
- # Save voice sample
770
- voice_path = f"/tmp/voices/{voice_name}.wav"
771
- with open(voice_path, "wb") as f:
772
- content = await file.read()
773
- f.write(content)
774
-
775
- print(f"✅ Voice sample saved: {voice_path}")
776
-
777
- return {
778
- "status": "success",
779
- "message": "Voice sample uploaded successfully",
780
- "voice_name": voice_name,
781
- "local_path": voice_path
782
- }
783
-
784
- except Exception as e:
785
- print(f"❌ Voice upload error: {str(e)}")
786
- raise HTTPException(status_code=500, detail=f"Voice upload failed: {str(e)}")
787
-
788
- @app.get("/api/voices")
789
- async def list_voices():
790
- """List available voices with enhanced information"""
791
- try:
792
- voices_dir = Path("/tmp/voices")
793
- voices = []
794
-
795
- # Add built-in voices
796
- for voice_id, voice_info in BUILTIN_VOICES.items():
797
- voices.append({
798
- "name": voice_id,
799
- "display_name": voice_info["name"],
800
- "type": "builtin",
801
- "gender": voice_info["gender"],
802
- "language": voice_info["language"],
803
- "samples_count": 0,
804
- "created_at": "built-in"
805
- })
806
-
807
- # Add cloned voices
808
- for item in voices_dir.iterdir():
809
- if item.is_dir():
810
- samples = list(item.glob("sample_*.wav"))
811
- # Try to load metadata
812
- metadata_path = item / "metadata.json"
813
- metadata = {}
814
- if metadata_path.exists():
815
- try:
816
- with open(metadata_path, 'r') as f:
817
- import json
818
- metadata = json.load(f)
819
- except:
820
- pass
821
-
822
- voices.append({
823
- "name": item.name,
824
- "display_name": metadata.get("name", item.name),
825
- "type": "cloned",
826
- "gender": "custom",
827
- "language": "multilingual",
828
- "samples_count": len(samples),
829
- "description": metadata.get("description", ""),
830
- "created_at": metadata.get("created_at", datetime.fromtimestamp(item.stat().st_ctime).isoformat())
831
- })
832
- elif item.is_file() and item.suffix == ".wav":
833
- voices.append({
834
- "name": item.stem,
835
- "display_name": item.stem,
836
- "type": "uploaded",
837
- "gender": "custom",
838
- "language": "unknown",
839
- "samples_count": 1,
840
- "created_at": datetime.fromtimestamp(item.stat().st_ctime).isoformat()
841
- })
842
-
843
- return {
844
- "status": "success",
845
- "voices": voices,
846
- "voice_cloning_supported": supports_voice_cloning(),
847
- "current_model": current_model
848
- }
849
-
850
- except Exception as e:
851
- print(f"❌ List voices error: {str(e)}")
852
- raise HTTPException(status_code=500, detail=f"Failed to list voices: {str(e)}")
853
 
854
  @app.get("/api/health")
855
  async def health_check():
856
- """Enhanced health check with model information"""
 
 
857
  return {
858
- "status": "healthy" if model_loaded else "loading",
859
  "tts_loaded": model_loaded,
860
  "model": current_model,
861
- "model_config": active_model_config,
862
- "voice_cloning_supported": voice_cloning_supported,
863
- "device": DEVICE,
864
- "load_attempts": model_load_attempts,
865
- "timestamp": datetime.now().isoformat()
866
  }
867
 
868
- @app.post("/api/reload-model")
869
- async def reload_model(model_type: str = Form("xtts-v2")):
870
- """Enhanced model reload with model selection"""
871
- global tts, model_loaded, current_model, voice_cloning_supported
872
-
873
- if model_type not in AVAILABLE_MODELS:
874
- raise HTTPException(status_code=400, detail=f"Model type '{model_type}' not found")
875
-
876
- # Clear current model
877
- tts = None
878
- model_loaded = False
879
- current_model = ""
880
- voice_cloning_supported = False
881
-
882
- # Try to reload specified model
883
- success = load_tts_model(model_type)
884
-
885
- return {
886
- "status": "success" if success else "error",
887
- "message": f"Model {model_type} reloaded successfully" if success else f"Failed to reload model {model_type}",
888
- "model_loaded": model_loaded,
889
- "model": current_model,
890
- "voice_cloning_supported": voice_cloning_supported
891
- }
892
 
893
  @app.get("/")
894
  async def root():
895
- """Enhanced root endpoint with model information"""
896
  return {
897
- "message": "Enhanced TTS API with Multiple Voice Styles and Voice Cloning",
898
- "endpoints": {
899
- "POST /api/tts": "Generate TTS for a single text",
900
- "POST /api/batch-tts": "Generate TTS for multiple texts",
901
- "POST /api/upload-voice": "Upload a voice sample for cloning",
902
- "POST /api/clone-voice": "Clone a voice from multiple samples",
903
- "GET /api/voices": "List available voices",
904
- "GET /api/builtin-voices": "List built-in voice styles",
905
- "GET /api/models": "List available TTS models",
906
- "POST /api/set-model": "Switch between TTS models",
907
- "GET /api/health": "Health check",
908
- "POST /api/reload-model": "Reload TTS model"
909
- },
910
  "model_loaded": model_loaded,
911
- "model_name": current_model if model_loaded else "None",
912
- "model_type": list(AVAILABLE_MODELS.keys())[0] if active_model_config else "None",
913
- "voice_cloning_supported": supports_voice_cloning(),
914
- "builtin_voices_count": len(BUILTIN_VOICES)
915
  }
916
 
917
  if __name__ == "__main__":
918
  import uvicorn
919
- print("🚀 Starting Enhanced TTS API with Multiple Voice Styles and Voice Cloning...")
920
- print("📊 API endpoints available at: http://localhost:7860/")
921
- print("💡 Model will be loaded on first request to save memory")
922
- print("🎵 Available models:", list(AVAILABLE_MODELS.keys()))
923
- print("🗣️ Built-in voices:", list(BUILTIN_VOICES.keys()))
924
  uvicorn.run(app, host="0.0.0.0", port=7860)
 
14
  import torch
15
  import numpy as np
16
 
17
+ # Configure environment with storage limits
18
  os.makedirs("/tmp/voices", exist_ok=True)
19
  os.makedirs("/tmp/output", exist_ok=True)
20
 
21
  # Initialize FastAPI app
22
+ app = FastAPI(title="Storage-Optimized TTS API", description="API for text-to-speech with storage management")
23
 
24
  # Add CORS middleware
25
  app.add_middleware(
 
36
 
37
  print(f"✅ Using device: {DEVICE}")
38
 
39
+ # STORAGE OPTIMIZATION: Use only ONE high-quality model to save space
40
  AVAILABLE_MODELS = {
 
 
 
 
 
 
 
 
41
  "tacotron2-ddc": {
42
  "name": "Tacotron2-DDC",
43
  "model_name": "tts_models/en/ljspeech/tacotron2-DDC",
44
+ "description": "High-quality English TTS (Excellent natural voice)",
 
 
 
 
 
 
 
 
45
  "languages": ["en"],
46
  "voice_cloning": False,
47
+ "size_mb": 150, # Approximate size
48
+ "quality": "excellent"
49
  }
50
  }
51
 
52
+ # Simple voice styles for the single model
53
+ VOICE_STYLES = {
54
+ "default": {
55
+ "name": "Default Voice",
56
+ "description": "Clear and natural English voice",
57
+ "gender": "neutral"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  },
59
+ "clear": {
60
+ "name": "Clear Voice",
61
+ "description": "Very clear and articulate voice",
62
+ "gender": "neutral"
 
63
  },
64
+ "professional": {
65
+ "name": "Professional Voice",
66
+ "description": "Professional and authoritative voice",
67
+ "gender": "neutral"
 
68
  }
69
  }
70
 
 
72
  tts = None
73
  model_loaded = False
74
  current_model = ""
 
75
  model_loading = False
 
 
76
 
77
  # Pydantic models
78
  class TTSRequest(BaseModel):
79
  text: str
80
  project_id: str
81
+ voice_style: Optional[str] = "default"
 
 
82
  speed: Optional[float] = 1.0
 
83
 
84
  class BatchTTSRequest(BaseModel):
85
  texts: List[str]
86
  project_id: str
87
+ voice_style: Optional[str] = "default"
 
 
88
  speed: Optional[float] = 1.0
 
89
 
90
+ # Storage management functions
91
+ def cleanup_old_files():
92
+ """Clean up old files to free up space"""
93
+ try:
94
+ # Clean output files older than 1 hour
95
+ output_dir = Path("/tmp/output")
96
+ if output_dir.exists():
97
+ for file in output_dir.glob("*.wav"):
98
+ if file.stat().st_mtime < time.time() - 3600: # 1 hour
99
+ file.unlink()
100
+ print(f"🧹 Cleaned up old file: {file}")
101
+
102
+ # Clean voice files older than 24 hours
103
+ voices_dir = Path("/tmp/voices")
104
+ if voices_dir.exists():
105
+ for file in voices_dir.rglob("*.wav"):
106
+ if file.stat().st_mtime < time.time() - 86400: # 24 hours
107
+ file.unlink()
108
+ print(f"🧹 Cleaned up old voice file: {file}")
109
+
110
+ # Check storage usage
111
+ check_storage_usage()
112
+
113
+ except Exception as e:
114
+ print(f"⚠️ Cleanup error: {e}")
115
 
116
+ def check_storage_usage():
117
+ """Check and log storage usage"""
118
+ try:
119
+ import shutil
120
+
121
+ # Check available space in /tmp
122
+ total, used, free = shutil.disk_usage("/tmp")
123
+ print(f"💾 Storage: {free // (2**30)}GB free of {total // (2**30)}GB total")
124
+
125
+ # Warn if running low
126
+ if free < 2 * (2**30): # Less than 2GB free
127
+ print("🚨 WARNING: Low storage space!")
128
+ return False
129
+ return True
130
+
131
+ except Exception as e:
132
+ print(f"⚠️ Storage check error: {e}")
133
+ return True
134
 
 
135
  def clean_text(text):
136
+ """Clean text for TTS generation"""
137
  import re
138
 
139
  if not text or not isinstance(text, str):
140
  return "Hello"
141
 
142
+ # Remove any problematic characters but keep basic punctuation
143
+ text = re.sub(r'[^\w\s\.\,\!\?\-\'\"\:\;]', '', text)
 
 
144
  text = re.sub(r'\s+', ' ', text)
145
 
 
146
  if len(text) > 10 and not re.search(r'[\.\!\?]$', text):
147
  text = text + '.'
148
 
149
  text = text.strip()
150
 
 
151
  if not text:
152
  text = "Hello world"
153
 
154
  return text
155
 
156
  def upload_to_oci(file_path: str, filename: str, project_id: str, file_type="voiceover"):
157
+ """Upload file to OCI"""
158
  try:
159
  if not OCI_UPLOAD_API_URL:
160
  return None, "OCI upload API URL not configured"
 
182
  except Exception as e:
183
  return None, f"Upload error: {str(e)}"
184
 
185
+ def load_tts_model(model_type="tacotron2-ddc"):
186
+ """Load TTS model with storage optimization"""
187
+ global tts, model_loaded, current_model, model_loading
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
 
189
  if model_loading:
190
  print("⏳ Model is already being loaded...")
191
  return False
192
 
193
  if model_type not in AVAILABLE_MODELS:
194
+ print(f"❌ Model type '{model_type}' not found.")
195
  return False
196
 
197
  model_loading = True
 
198
 
199
  try:
200
+ # Clean up before loading new model
201
+ cleanup_old_files()
202
+
203
  from TTS.api import TTS
204
 
205
  # Handle TOS acceptance automatically
 
216
  # Load the selected model
217
  tts = TTS(model_config["model_name"]).to(DEVICE)
218
 
219
+ # Test the model
220
+ test_path = "/tmp/test_output.wav"
221
+ tts.tts_to_file(text="Test", file_path=test_path)
222
+
223
+ if os.path.exists(test_path):
224
+ os.remove(test_path)
225
+ print("✅ Model tested successfully!")
226
+
227
  model_loaded = True
228
  current_model = model_config["model_name"]
 
 
229
 
230
  print(f"✅ {model_config['name']} loaded successfully!")
231
+ print(f" Size: ~{model_config['size_mb']}MB")
232
+ print(f" Quality: {model_config['quality']}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
233
 
234
  return True
235
 
236
  except Exception as e:
237
+ print(f"❌ Model failed to load: {e}")
 
 
 
 
 
238
  return False
239
 
240
  finally:
 
246
  finally:
247
  model_loading = False
248
 
249
+ # API endpoints
 
 
 
 
 
 
250
  @app.post("/api/tts")
251
  async def generate_tts(request: TTSRequest):
252
+ """Generate TTS with storage optimization"""
253
  try:
254
+ # Clean up before processing
255
+ cleanup_old_files()
256
+
257
+ # Lazy load model
258
  if not model_loaded:
259
+ if not load_tts_model("tacotron2-ddc"):
260
  return {
261
  "status": "error",
262
+ "message": "TTS model failed to load. Please check storage space.",
263
  "requires_tos_acceptance": True,
264
  "tos_url": "https://coqui.ai/cpml.txt"
265
  }
266
 
267
  print(f"📥 TTS request for project: {request.project_id}")
268
+ print(f" Voice Style: {request.voice_style}")
269
  print(f" Text length: {len(request.text)} characters")
 
 
 
 
 
 
 
 
 
 
 
270
 
271
+ # Generate unique filename
 
 
 
 
 
 
 
 
272
  timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
273
  filename = f"voiceover_{timestamp}.wav"
274
  output_path = f"/tmp/output/{filename}"
 
276
  # Ensure output directory exists
277
  os.makedirs(os.path.dirname(output_path), exist_ok=True)
278
 
279
+ # Clean the text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
  cleaned_text = clean_text(request.text)
281
+ print(f"📝 Text: '{cleaned_text}'")
 
282
 
283
+ # Generate TTS
284
  try:
285
+ tts.tts_to_file(
286
+ text=cleaned_text,
287
+ file_path=output_path
288
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  except Exception as tts_error:
290
  print(f"❌ TTS generation failed: {tts_error}")
291
+ raise tts_error
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
 
293
  # Verify the file was created
294
  if not os.path.exists(output_path):
295
+ raise Exception(f"TTS failed to create output file")
296
 
297
  file_size = os.path.getsize(output_path)
298
  print(f"✅ TTS generated: {output_path} ({file_size} bytes)")
299
 
300
  # Upload to OCI
301
+ upload_result, error = upload_to_oci(output_path, filename, request.project_id)
 
 
302
 
303
  if error:
304
  print(f"❌ OCI upload failed: {error}")
 
312
 
313
  print(f"✅ Upload successful: {filename}")
314
 
315
+ # Clean up local file immediately after upload
316
  try:
317
  os.remove(output_path)
318
  print(f"🧹 Cleaned up local file: {output_path}")
 
325
  "filename": filename,
326
  "oci_path": upload_result.get("path", f"{request.project_id}/voiceover/{filename}"),
327
  "model_used": current_model,
328
+ "voice_style": request.voice_style
 
 
329
  }
330
 
331
  except Exception as e:
332
  print(f"❌ TTS generation error: {str(e)}")
333
  return {
334
  "status": "error",
335
+ "message": f"TTS generation failed: {str(e)}"
 
 
 
336
  }
337
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
338
  @app.post("/api/batch-tts")
339
  async def batch_generate_tts(request: BatchTTSRequest):
340
+ """Batch TTS with storage optimization"""
341
  try:
342
+ cleanup_old_files()
343
+
344
  if not model_loaded:
345
+ if not load_tts_model("tacotron2-ddc"):
346
+ raise HTTPException(status_code=500, detail="TTS model failed to load")
347
 
348
  print(f"📥 Batch TTS request for {len(request.texts)} texts")
349
 
350
  results = []
351
  for i, text in enumerate(request.texts):
352
  try:
 
353
  single_request = TTSRequest(
354
  text=text,
355
  project_id=request.project_id,
356
+ voice_style=request.voice_style,
357
+ speed=request.speed
 
 
 
358
  )
359
 
 
360
  result = await generate_tts(single_request)
361
  results.append({
362
  "text_index": i,
 
377
  "status": "completed",
378
  "project_id": request.project_id,
379
  "results": results,
380
+ "model_used": current_model
 
 
381
  }
382
 
383
  except Exception as e:
384
  print(f"❌ Batch TTS generation error: {str(e)}")
385
  raise HTTPException(status_code=500, detail=f"Batch TTS generation failed: {str(e)}")
386
 
387
+ @app.get("/api/voice-styles")
388
+ async def get_voice_styles():
389
+ """Get available voice styles"""
390
+ return {
391
+ "status": "success",
392
+ "voice_styles": VOICE_STYLES,
393
+ "current_model": current_model if model_loaded else None
394
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
395
 
396
  @app.get("/api/health")
397
  async def health_check():
398
+ """Health check with storage info"""
399
+ storage_ok = check_storage_usage()
400
+
401
  return {
402
+ "status": "healthy" if model_loaded and storage_ok else "warning",
403
  "tts_loaded": model_loaded,
404
  "model": current_model,
405
+ "storage_ok": storage_ok,
406
+ "device": DEVICE
 
 
 
407
  }
408
 
409
+ @app.post("/api/cleanup")
410
+ async def manual_cleanup():
411
+ """Manual cleanup endpoint"""
412
+ try:
413
+ cleanup_old_files()
414
+ return {
415
+ "status": "success",
416
+ "message": "Cleanup completed successfully"
417
+ }
418
+ except Exception as e:
419
+ raise HTTPException(status_code=500, detail=f"Cleanup failed: {str(e)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
420
 
421
  @app.get("/")
422
  async def root():
423
+ """Root endpoint"""
424
  return {
425
+ "message": "Storage-Optimized TTS API",
 
 
 
 
 
 
 
 
 
 
 
 
426
  "model_loaded": model_loaded,
427
+ "model": current_model if model_loaded else "None",
428
+ "storage_optimized": True
 
 
429
  }
430
 
431
  if __name__ == "__main__":
432
  import uvicorn
433
+ print("🚀 Starting Storage-Optimized TTS API...")
434
+ print("💾 Storage management enabled")
435
+ print("🔊 Using Tacotron2-DDC for best quality/size ratio")
436
+ check_storage_usage()
 
437
  uvicorn.run(app, host="0.0.0.0", port=7860)