Spaces:
Running
Running
| import gradio as gr | |
| import google.generativeai as genai | |
| from gtts import gTTS | |
| import pyttsx3 | |
| import tempfile | |
| import os | |
| from uuid import uuid4 | |
| import time | |
| import asyncio | |
| from pydub import AudioSegment | |
| try: | |
| import edge_tts | |
| EDGE_TTS_AVAILABLE = True | |
| except ImportError: | |
| EDGE_TTS_AVAILABLE = False | |
| print("Edge TTS not available, using fallback options") | |
| # Voice configurations for different speakers | |
| VOICE_CONFIGS = { | |
| "2_speakers": [ | |
| {"name": "Alex", "voice": "en-US-AriaNeural", "gender": "female"}, | |
| {"name": "Brian", "voice": "en-US-GuyNeural", "gender": "male"} | |
| ], | |
| "3_speakers": [ | |
| {"name": "Sarah", "voice": "en-US-JennyNeural", "gender": "female"}, | |
| {"name": "Mike", "voice": "en-US-BrandonNeural", "gender": "male"}, | |
| {"name": "Emma", "voice": "en-US-AriaNeural", "gender": "female"} | |
| ], | |
| "4_speakers": [ | |
| {"name": "Sarah", "voice": "en-US-JennyNeural", "gender": "female"}, | |
| {"name": "Mike", "voice": "en-US-BrandonNeural", "gender": "male"}, | |
| {"name": "Emma", "voice": "en-US-AriaNeural", "gender": "female"}, | |
| {"name": "David", "voice": "en-US-GuyNeural", "gender": "male"} | |
| ] | |
| } | |
| # Initialize Gemini client | |
| client = None | |
| def init_gemini(api_key): | |
| """Initialize Gemini client with API key""" | |
| global client | |
| if api_key and api_key.strip(): | |
| try: | |
| genai.configure(api_key=api_key) | |
| client = genai.GenerativeModel('gemini-1.5-flash') | |
| return "✅ Gemini API connected successfully!" | |
| except Exception as e: | |
| return f"❌ Gemini API error: {str(e)}" | |
| return "ℹ️ Add Gemini API key for AI-powered conversations" | |
| def generate_with_gtts(text, filename): | |
| """Generate speech using Google's gTTS""" | |
| try: | |
| tts = gTTS(text=text, lang='en', slow=False) | |
| tts.save(filename) | |
| return filename, None | |
| except Exception as e: | |
| return None, f"gTTS Error: {str(e)}" | |
| def generate_with_pyttsx3(text, filename): | |
| """Generate speech using system's TTS engine""" | |
| try: | |
| engine = pyttsx3.init() | |
| engine.setProperty('rate', 180) | |
| engine.setProperty('volume', 0.9) | |
| voices = engine.getProperty('voices') | |
| if voices: | |
| for voice in voices: | |
| if 'female' in voice.name.lower() or 'zira' in voice.name.lower(): | |
| engine.setProperty('voice', voice.id) | |
| break | |
| engine.save_to_file(text, filename) | |
| engine.runAndWait() | |
| return filename, None | |
| except Exception as e: | |
| return None, f"pyttsx3 Error: {str(e)}" | |
| async def generate_with_edge_tts(text, voice, filename): | |
| """Generate speech using Microsoft Edge TTS with specific voice""" | |
| if not EDGE_TTS_AVAILABLE: | |
| return None, "Edge TTS not available" | |
| try: | |
| communicate = edge_tts.Communicate(text, voice) | |
| # Save as MP3 since that's Edge TTS default format | |
| mp3_filename = filename.replace('.wav', '.mp3') | |
| await communicate.save(mp3_filename) | |
| return mp3_filename, None | |
| except Exception as e: | |
| return None, f"Edge TTS Error: {str(e)}" | |
| def generate_podcast_script(text, speaker_count, use_gemini): | |
| """Generate a podcast script with multiple speakers""" | |
| if use_gemini and client: | |
| try: | |
| voice_config = VOICE_CONFIGS[f"{speaker_count}_speakers"] | |
| speaker_names = [config["name"] for config in voice_config] | |
| prompt = f"""Create an engaging podcast conversation between {speaker_count} hosts: {', '.join(speaker_names)}. | |
| Transform this text into a natural conversation where each speaker contributes meaningfully. | |
| Guidelines: | |
| - Make it sound like a real podcast discussion | |
| - Each speaker should have distinct perspectives | |
| - Include natural transitions and interactions | |
| - Keep it under 2000 characters total | |
| - Use speaker names clearly (e.g., "Sarah: Hello everyone...") | |
| Original text: {text[:2500]} | |
| Format the output with clear speaker labels like: | |
| {speaker_names[0]}: [text] | |
| {speaker_names[1] if len(speaker_names) > 1 else speaker_names[0]}: [text] | |
| etc.""" | |
| response = client.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| return f"AI generation failed: {str(e)}. Using original text." | |
| # Fallback: simple text with speaker distribution | |
| return text[:1500] + ("..." if len(text) > 1500 else "") | |
| def parse_script_for_speakers(script, speaker_count): | |
| """Parse the script to extract speaker parts""" | |
| try: | |
| voice_config = VOICE_CONFIGS[f"{speaker_count}_speakers"] | |
| speaker_names = [config["name"] for config in voice_config] | |
| parts = [] | |
| lines = script.split('\n') | |
| current_speaker = 0 | |
| current_text = "" | |
| # First, try to find explicit speaker labels | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Check if line starts with a speaker name | |
| speaker_found = False | |
| for i, name in enumerate(speaker_names): | |
| if line.lower().startswith(f"{name.lower()}:"): | |
| if current_text.strip(): | |
| parts.append((current_text.strip(), current_speaker)) | |
| current_speaker = i | |
| current_text = line[len(name)+1:].strip() | |
| speaker_found = True | |
| break | |
| if not speaker_found: | |
| current_text += " " + line | |
| if current_text.strip(): | |
| parts.append((current_text.strip(), current_speaker)) | |
| # If no explicit speakers were found, intelligently distribute text | |
| if not parts or len(parts) < 2: | |
| print(f"No explicit speakers found, distributing text among {speaker_count} speakers") | |
| parts = [] | |
| # Split into sentences and distribute | |
| sentences = [] | |
| for delimiter in ['. ', '! ', '? ']: | |
| if delimiter in script: | |
| sentences = script.split(delimiter) | |
| # Add back the delimiter except for the last sentence | |
| for i in range(len(sentences) - 1): | |
| sentences[i] += delimiter.strip() | |
| break | |
| if not sentences: | |
| sentences = [script] | |
| # Remove empty sentences | |
| sentences = [s.strip() for s in sentences if s.strip()] | |
| if len(sentences) >= speaker_count: | |
| # Distribute sentences among speakers | |
| sentences_per_speaker = len(sentences) // speaker_count | |
| remainder = len(sentences) % speaker_count | |
| start_idx = 0 | |
| for i in range(speaker_count): | |
| # Give extra sentences to first speakers if there's remainder | |
| num_sentences = sentences_per_speaker + (1 if i < remainder else 0) | |
| if start_idx < len(sentences): | |
| end_idx = min(start_idx + num_sentences, len(sentences)) | |
| speaker_sentences = sentences[start_idx:end_idx] | |
| if speaker_sentences: | |
| speaker_text = ' '.join(speaker_sentences) | |
| parts.append((speaker_text, i)) | |
| print(f"Speaker {speaker_names[i]}: {len(speaker_sentences)} sentences") | |
| start_idx = end_idx | |
| else: | |
| # If we have fewer sentences than speakers, alternate between first two speakers | |
| for i, sentence in enumerate(sentences): | |
| speaker_idx = i % min(speaker_count, 2) # Alternate between first 2 speakers | |
| parts.append((sentence, speaker_idx)) | |
| # Ensure we have content and speakers are properly assigned | |
| if not parts: | |
| parts = [(script, 0)] | |
| # Print debug info | |
| print(f"Generated {len(parts)} parts for {speaker_count} speakers:") | |
| for i, (text, speaker_idx) in enumerate(parts): | |
| speaker_name = speaker_names[speaker_idx] | |
| print(f" Part {i+1}: {speaker_name} - {text[:60]}...") | |
| return parts | |
| except Exception as e: | |
| print(f"Error parsing script: {e}") | |
| return [(script, 0)] | |
| async def generate_multi_speaker_audio(script_parts, speaker_count): | |
| """Generate multi-speaker podcast audio""" | |
| if not EDGE_TTS_AVAILABLE: | |
| return None, "Edge TTS not available for multi-speaker" | |
| try: | |
| voice_config = VOICE_CONFIGS[f"{speaker_count}_speakers"] | |
| audio_files = [] | |
| print(f"Generating audio for {len(script_parts)} parts with {speaker_count} speakers") | |
| for i, (speaker_text, speaker_idx) in enumerate(script_parts): | |
| voice = voice_config[speaker_idx]["voice"] | |
| speaker_name = voice_config[speaker_idx]["name"] | |
| temp_filename = f"temp_speaker_{i}_{speaker_name}_{uuid4().hex[:8]}.mp3" | |
| print(f"Part {i+1}: {speaker_name} ({voice}) says: {speaker_text[:50]}...") | |
| result, error = await generate_with_edge_tts(speaker_text, voice, temp_filename) | |
| if result: | |
| audio_files.append(temp_filename) | |
| print(f"✅ Generated audio for {speaker_name}") | |
| else: | |
| print(f"❌ Error generating voice for {speaker_name}: {error}") | |
| # Cleanup and return error | |
| for f in audio_files: | |
| try: | |
| os.unlink(f) | |
| except: | |
| pass | |
| return None, f"Error generating voice for {speaker_name}: {error}" | |
| # Combine all audio files | |
| if len(audio_files) > 1: | |
| print(f"Combining {len(audio_files)} audio files...") | |
| combined_audio = AudioSegment.empty() | |
| for i, audio_file in enumerate(audio_files): | |
| try: | |
| # Load the audio segment - auto-detect format | |
| segment = AudioSegment.from_file(audio_file) | |
| # Add the segment to combined audio | |
| combined_audio += segment | |
| # Add a small pause between speakers (0.5 seconds) | |
| if i < len(audio_files) - 1: # Don't add pause after last segment | |
| pause = AudioSegment.silent(duration=500) # 500ms pause | |
| combined_audio += pause | |
| print(f"✅ Added segment {i+1}") | |
| except Exception as e: | |
| print(f"❌ Error processing audio file {audio_file}: {e}") | |
| # Save combined audio | |
| output_filename = f"combined_podcast_{uuid4().hex[:8]}.wav" | |
| combined_audio.export(output_filename, format="wav") | |
| # Cleanup temporary files | |
| for f in audio_files: | |
| try: | |
| os.unlink(f) | |
| print(f"🗑️ Cleaned up {f}") | |
| except: | |
| pass | |
| print(f"✅ Combined audio saved as {output_filename}") | |
| return output_filename, None | |
| elif len(audio_files) == 1: | |
| # Single audio file, just return it | |
| return audio_files[0], None | |
| else: | |
| return None, "No audio files generated" | |
| except Exception as e: | |
| print(f"❌ Multi-speaker generation error: {str(e)}") | |
| return None, f"Multi-speaker generation error: {str(e)}" | |
| def create_podcast(text, use_gemini, tts_engine, speaker_count, progress=gr.Progress()): | |
| """Main function to create podcast from text with multiple speakers""" | |
| try: | |
| progress(0.1, "Starting processing...") | |
| if not text.strip(): | |
| return None, "❌ Please enter some text first!", "" | |
| progress(0.3, "Generating podcast script...") | |
| podcast_script = generate_podcast_script(text, speaker_count, use_gemini) | |
| progress(0.5, "Parsing script for speakers...") | |
| script_parts = parse_script_for_speakers(podcast_script, speaker_count) | |
| progress(0.7, "Generating audio...") | |
| # Generate audio based on engine choice | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file: | |
| temp_filename = tmp_file.name | |
| if tts_engine == "Multi-Speaker (Edge TTS)" and speaker_count > 1 and EDGE_TTS_AVAILABLE: | |
| # Use Edge TTS for multi-speaker | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| audio_file, error = loop.run_until_complete( | |
| generate_multi_speaker_audio(script_parts, speaker_count) | |
| ) | |
| finally: | |
| loop.close() | |
| elif tts_engine == "gTTS (Online)": | |
| full_text = " ".join([part[0] for part in script_parts]) | |
| audio_file, error = generate_with_gtts(full_text, temp_filename) | |
| else: # pyttsx3 | |
| full_text = " ".join([part[0] for part in script_parts]) | |
| audio_file, error = generate_with_pyttsx3(full_text, temp_filename) | |
| if error: | |
| return None, f"❌ {error}", podcast_script | |
| progress(0.9, "Finalizing...") | |
| # Read the generated audio file | |
| with open(audio_file, 'rb') as f: | |
| audio_data = f.read() | |
| # Clean up | |
| try: | |
| os.unlink(audio_file) | |
| except: | |
| pass | |
| progress(1.0, "Complete!") | |
| return audio_data, "✅ Podcast generated successfully!", podcast_script | |
| except Exception as e: | |
| return None, f"❌ Audio generation failed: {str(e)}", "" | |
| def get_speaker_info(speaker_count): | |
| """Get speaker information for display""" | |
| if speaker_count == 1: | |
| return "**Single Speaker Mode**: Solo narration with one voice" | |
| voice_config = VOICE_CONFIGS[f"{speaker_count}_speakers"] | |
| info = f"**{speaker_count} Speaker Mode**:\n" | |
| for i, config in enumerate(voice_config): | |
| info += f"🎤 **{config['name']}** ({config['gender']} voice)\n" | |
| return info | |
| # Create the Gradio interface | |
| def create_interface(): | |
| with gr.Blocks(title="🎙️ Multi-Speaker Podcast Generator", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 🎙️ Multi-Speaker Podcast Generator") | |
| gr.Markdown("Transform your text into engaging podcast conversations with multiple realistic voices!") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # API Configuration | |
| gr.Markdown("## 🔑 API Configuration") | |
| api_key = gr.Textbox( | |
| label="Gemini API Key (Optional)", | |
| type="password", | |
| placeholder="Enter your Google Gemini API key...", | |
| info="Get a free key from https://aistudio.google.com/" | |
| ) | |
| api_status = gr.Textbox( | |
| label="API Status", | |
| interactive=False, | |
| value="ℹ️ Add Gemini API key for AI-powered conversations" | |
| ) | |
| # Input Text | |
| gr.Markdown("## 📝 Input Text") | |
| input_text = gr.Textbox( | |
| label="Your Content", | |
| placeholder="Paste your article, blog post, or any text here...", | |
| lines=6 | |
| ) | |
| # Configuration | |
| gr.Markdown("## ⚙️ Configuration") | |
| speaker_count = gr.Radio( | |
| label="Number of Speakers", | |
| choices=[1, 2, 3, 4], | |
| value=2, | |
| info="Choose how many voices for your podcast" | |
| ) | |
| use_gemini = gr.Checkbox( | |
| label="Use AI for conversation generation", | |
| value=True, | |
| info="Creates natural conversations (requires API key)" | |
| ) | |
| tts_engine = gr.Radio( | |
| label="Voice Engine", | |
| choices=[ | |
| "Multi-Speaker (Edge TTS)", | |
| "gTTS (Online)", | |
| "pyttsx3 (Offline)" | |
| ], | |
| value="Multi-Speaker (Edge TTS)" if EDGE_TTS_AVAILABLE else "gTTS (Online)", | |
| info="Edge TTS provides the most realistic conversations" | |
| ) | |
| # Generate Button | |
| generate_btn = gr.Button( | |
| "🎙️ Generate Podcast", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Column(scale=1): | |
| # Speaker Info | |
| speaker_info = gr.Markdown( | |
| get_speaker_info(2), | |
| label="Speaker Information" | |
| ) | |
| # Status and Results | |
| status_msg = gr.HTML( | |
| value="<div style='padding: 10px; background: #e3f2fd; border-radius: 5px; color: #1976d2;'>Ready to generate your podcast!</div>" | |
| ) | |
| with gr.Row(): | |
| audio_output = gr.Audio( | |
| label="Generated Podcast", | |
| visible=False | |
| ) | |
| download_btn = gr.DownloadButton( | |
| "⬇️ Download Podcast", | |
| visible=False | |
| ) | |
| script_output = gr.Textbox( | |
| label="Generated Script", | |
| lines=8, | |
| visible=False | |
| ) | |
| # Event handlers | |
| def update_status(message, success=True): | |
| color = "#1976d2" if success else "#d32f2f" | |
| bg_color = "#e3f2fd" if success else "#ffebee" | |
| return f"<div style='padding: 10px; background: {bg_color}; border-radius: 5px; color: {color};'>{message}</div>" | |
| def generate_podcast_wrapper(text, use_gemini, tts_engine, speaker_count, progress=gr.Progress()): | |
| audio_data, message, script = create_podcast(text, use_gemini, tts_engine, speaker_count, progress) | |
| status_html = update_status(message, success=audio_data is not None) | |
| if audio_data: | |
| # Save audio to temporary file | |
| filename = f"podcast_{speaker_count}speakers_{uuid4().hex[:8]}.wav" | |
| filepath = os.path.join(tempfile.gettempdir(), filename) | |
| with open(filepath, 'wb') as f: | |
| f.write(audio_data) | |
| return [ | |
| status_html, | |
| gr.Audio(value=filepath, visible=True), | |
| gr.DownloadButton(value=filepath, visible=True), | |
| gr.Textbox(value=script, visible=True) | |
| ] | |
| else: | |
| return [ | |
| status_html, | |
| gr.Audio(visible=False), | |
| gr.DownloadButton(visible=False), | |
| gr.Textbox(visible=False) | |
| ] | |
| # Connect events | |
| api_key.change(init_gemini, inputs=api_key, outputs=api_status) | |
| speaker_count.change( | |
| get_speaker_info, | |
| inputs=speaker_count, | |
| outputs=speaker_info | |
| ) | |
| generate_btn.click( | |
| generate_podcast_wrapper, | |
| inputs=[input_text, use_gemini, tts_engine, speaker_count], | |
| outputs=[status_msg, audio_output, download_btn, script_output] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) | |