Spaces:
Build error
Build error
| import streamlit as st | |
| import time | |
| import re | |
| import os | |
| import tempfile | |
| import pypdf | |
| from pydub import AudioSegment, effects | |
| import difflib | |
| #CORRECTED IMPORT | |
| from utils import ( | |
| generate_script, | |
| generate_audio_mp3, | |
| mix_with_bg_music, | |
| DialogueItem, | |
| run_research_agent, | |
| generate_report | |
| ) | |
| from prompts import SYSTEM_PROMPT | |
| from qa import transcribe_audio_deepgram, handle_qa_exchange | |
| MAX_QA_QUESTIONS = 5 # up to 5 voice/text questions | |
| def parse_user_edited_transcript(edited_text: str, host_name: str, guest_name: str): | |
| pattern = r"\*\*(.+?)\*\*:\s*(.+)" | |
| matches = re.findall(pattern, edited_text) | |
| items = [] | |
| if not matches: | |
| raw_name = host_name or "Jane" | |
| text_line = edited_text.strip() | |
| speaker = "Jane" | |
| if raw_name.lower() == guest_name.lower(): | |
| speaker = "John" | |
| item = DialogueItem( | |
| speaker=speaker, | |
| display_speaker=raw_name, | |
| text=text_line | |
| ) | |
| items.append(item) | |
| return items | |
| for (raw_name, text_line) in matches: | |
| if raw_name.lower() == host_name.lower(): | |
| speaker = "Jane" | |
| elif raw_name.lower() == guest_name.lower(): | |
| speaker = "John" | |
| else: | |
| speaker = "Jane" | |
| item = DialogueItem( | |
| speaker=speaker, | |
| display_speaker=raw_name, | |
| text=text_line | |
| ) | |
| items.append(item) | |
| return items | |
| def regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path=None): | |
| audio_segments = [] | |
| transcript = "" | |
| crossfade_duration = 50 # ms | |
| for item in dialogue_items: | |
| audio_file = generate_audio_mp3(item.text, item.speaker) | |
| seg = AudioSegment.from_file(audio_file, format="mp3") | |
| audio_segments.append(seg) | |
| transcript += f"**{item.display_speaker}**: {item.text}\n\n" | |
| os.remove(audio_file) | |
| if not audio_segments: | |
| return None, "No audio segments were generated." | |
| combined_spoken = audio_segments[0] | |
| for seg in audio_segments[1:]: | |
| combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) | |
| final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
| final_mix.export(temp_audio.name, format="mp3") | |
| final_mp3_path = temp_audio.name | |
| with open(final_mp3_path, "rb") as f: | |
| audio_bytes = f.read() | |
| os.remove(final_mp3_path) | |
| return audio_bytes, transcript | |
| def generate_podcast( | |
| research_topic_input, | |
| tone, | |
| length_minutes, | |
| host_name, | |
| host_desc, | |
| guest_name, | |
| guest_desc, | |
| user_specs, | |
| sponsor_content, | |
| sponsor_style, | |
| custom_bg_music_path | |
| ): | |
| if not research_topic_input: | |
| return None, "Please enter a topic to research for the podcast." | |
| text = st.session_state.get("report_content", "") # Get report content | |
| if not text: | |
| return None, "Please generate a research report first, or enter a topic." | |
| extra_instructions = [] | |
| if host_name or guest_name: | |
| host_line = f"Host: {host_name or 'Jane'} - {host_desc or 'a curious host'}." | |
| guest_line = f"Guest: {guest_name or 'John'} - {guest_desc or 'an expert'}." | |
| extra_instructions.append(f"{host_line}\n{guest_line}") | |
| if user_specs.strip(): | |
| extra_instructions.append(f"Additional User Instructions: {user_specs}") | |
| if sponsor_content.strip(): | |
| extra_instructions.append( | |
| f"Sponsor Content Provided (should be under ~30 seconds):\n{sponsor_content}" | |
| ) | |
| combined_instructions = "\n\n".join(extra_instructions).strip() | |
| full_prompt = SYSTEM_PROMPT | |
| if combined_instructions: | |
| full_prompt += f"\n\n# Additional Instructions\n{combined_instructions}\n" | |
| # Add language-specific instructions | |
| if st.session_state.get("language_selection") == "Hinglish": | |
| full_prompt += "\n\nPlease generate the script in Romanized Hindi.\n" | |
| # Add similar instruction here for Hindi | |
| try: | |
| script = generate_script( | |
| full_prompt, | |
| text, | |
| tone, | |
| f"{length_minutes} Mins", | |
| host_name=host_name or "Jane", | |
| guest_name=guest_name or "John", | |
| sponsor_style=sponsor_style, | |
| sponsor_provided=bool(sponsor_content.strip()) | |
| ) | |
| # If language is Hinglish, transliterate script dialogues to IAST | |
| if st.session_state.get("language_selection") == "Hinglish": | |
| from indic_transliteration.sanscript import transliterate, DEVANAGARI, IAST | |
| for dialogue_item in script.dialogue: | |
| dialogue_item.text = transliterate(dialogue_item.text, DEVANAGARI, IAST) | |
| except Exception as e: | |
| return None, f"Error generating script: {str(e)}" | |
| audio_segments = [] | |
| transcript = "" | |
| crossfade_duration = 50 | |
| try: | |
| for item in script.dialogue: | |
| language = st.session_state.get("language_selection", "English (American)") | |
| if language in ["English (Indian)", "Hinglish", "Hindi"]: | |
| tts_speaker = "John" if item.display_speaker.lower() == (guest_name or "John").lower() else "Jane" | |
| else: | |
| tts_speaker = item.speaker | |
| audio_file = generate_audio_mp3(item.text, tts_speaker) | |
| seg = AudioSegment.from_file(audio_file, format="mp3") | |
| audio_segments.append(seg) | |
| transcript += f"**{item.display_speaker}**: {item.text}\n\n" | |
| os.remove(audio_file) | |
| if not audio_segments: | |
| return None, "No audio segments generated." | |
| combined_spoken = audio_segments[0] | |
| for seg in audio_segments[1:]: | |
| combined_spoken = combined_spoken.append(seg, crossfade=crossfade_duration) | |
| final_mix = mix_with_bg_music(combined_spoken, custom_bg_music_path) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as temp_audio: | |
| final_mix.export(temp_audio.name, format="mp3") | |
| final_mp3_path = temp_audio.name | |
| with open(final_mp3_path, "rb") as f: | |
| audio_bytes = f.read() | |
| os.remove(final_mp3_path) | |
| return audio_bytes, transcript | |
| except Exception as e: | |
| return None, f"Error generating audio: {str(e)}" | |
| def highlight_differences(original: str, edited: str) -> str: | |
| matcher = difflib.SequenceMatcher(None, original.split(), edited.split()) | |
| highlighted = [] | |
| for opcode, i1, i2, j1, j2 in matcher.get_opcodes(): | |
| if opcode == 'equal': | |
| highlighted.extend(original.split()[i1:i2]) | |
| elif opcode in ('replace', 'insert'): | |
| added_words = edited.split()[j1:j2] | |
| highlighted.extend([f'<span style="color:red">{word}</span>' for word in added_words]) | |
| elif opcode == 'delete': | |
| pass | |
| return ' '.join(highlighted) | |
| def main(): | |
| st.set_page_config( | |
| page_title="MyPod v3: AI-Powered Podcast & Research", | |
| layout="centered" | |
| ) | |
| st.markdown(""" | |
| <style> | |
| .stFileUploader>div>div>div { | |
| transform: scale(0.9); | |
| } | |
| footer { | |
| text-align: center; | |
| padding: 1em 0; | |
| font-size: 0.8em; | |
| color: #888; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| logo_col, title_col = st.columns([1, 10]) | |
| with logo_col: | |
| st.image("logomypod.jpg", width=70) | |
| with title_col: | |
| st.markdown("## MyPod v3: AI-Powered Podcast & Research") | |
| st.markdown(""" | |
| Welcome to **MyPod**, your go-to AI-powered podcast generator and research report tool! 🎉 | |
| MyPod now offers two main functionalities: | |
| 1. **Generate Research Reports:** Provide a research topic, and MyPod will use its AI-powered research agent to create a comprehensive, well-structured research report in PDF format. | |
| 2. **Generate Podcasts:** Transform your research topic (or the generated report) into an engaging, human-sounding podcast. | |
| Select your desired mode below and let the magic happen! | |
| """) | |
| with st.expander("How to Use"): | |
| st.markdown(""" | |
| **For Research Reports:** | |
| <ol style="font-size:18px;"> | |
| <li>Select "Generate Research Report".</li> | |
| <li>Enter your research topic.</li> | |
| <li>Click 'Generate Report'.</li> | |
| <li>MyPod will use its AI agent to research the topic and create a PDF report.</li> | |
| <li>Once generated, you can view and download the report.</li> | |
| </ol> | |
| **For Podcasts:** | |
| <ol style="font-size:18px;"> | |
| <li>Select "Generate Podcast".</li> | |
| <li>Enter the research topic (this will be used as the basis for the podcast). OR FIRST GENERATE A REPORT AND THEN SELECT PODCAST.</li> | |
| <li>Choose the tone, language, and target duration.</li> | |
| <li>Add custom names and descriptions for the speakers (optional).</li> | |
| <li>Add sponsored content (optional).</li> | |
| <li>Click 'Generate Podcast'.</li> | |
| </ol> | |
| """, unsafe_allow_html=True) | |
| # --- Main Mode Selection --- | |
| mode = st.radio("Choose a Mode:", ["Generate Research Report", "Generate Podcast"]) | |
| # --- Research Report Section --- | |
| if mode == "Generate Research Report": | |
| st.markdown("### Generate Research Report") | |
| research_topic_input = st.text_input("Enter your research topic:") | |
| report_button = st.button("Generate Report") | |
| if report_button: | |
| if not research_topic_input: | |
| st.error("Please enter a research topic.") | |
| else: | |
| with st.spinner("Researching and generating report... This may take several minutes."): | |
| try: | |
| report_content = run_research_agent(research_topic_input) | |
| st.session_state["report_content"] = report_content | |
| # Display report (basic text for now) | |
| st.markdown("### Generated Report Preview") | |
| st.text_area("Report Content", value=report_content, height=300) | |
| # Generate PDF and offer download | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmpfile: | |
| pdf_path = tmpfile.name | |
| generate_report(report_content, filename=pdf_path) # Generate PDF | |
| with open(pdf_path, "rb") as f: | |
| pdf_bytes = f.read() | |
| os.remove(pdf_path) # Clean up temp file | |
| st.download_button( | |
| label="Download Report (PDF)", | |
| data=pdf_bytes, | |
| file_name=f"{research_topic_input}_report.pdf", | |
| mime="application/pdf" | |
| ) | |
| st.success("Report generated successfully!") | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| # --- Podcast Generation Section --- | |
| elif mode == "Generate Podcast": | |
| st.markdown("### Generate Podcast") | |
| research_topic_input = st.text_input("Enter research topic for the podcast (or use a generated report):") | |
| tone = st.radio("Tone", ["Casual", "Formal", "Humorous", "Youthful"], index=0) | |
| length_minutes = st.slider("Podcast Length (in minutes)", 1, 60, 3) | |
| language = st.selectbox( | |
| "Choose Language and Accent", | |
| ["English (American)", "English (Indian)", "Hinglish", "Hindi"], | |
| index=0 | |
| ) | |
| st.session_state["language_selection"] = language | |
| st.markdown("### Customize Your Podcast (Optional)") | |
| with st.expander("Set Host & Guest Names/Descriptions (Optional)"): | |
| host_name = st.text_input("Female Host Name (leave blank for 'Jane')") | |
| host_desc = st.text_input("Female Host Description (Optional)") | |
| guest_name = st.text_input("Male Guest Name (leave blank for 'John')") | |
| guest_desc = st.text_input("Male Guest Description (Optional)") | |
| user_specs = st.text_area("Any special instructions or prompts for the script? (Optional)", "") | |
| sponsor_content = st.text_area("Sponsored Content / Ad (Optional)", "") | |
| sponsor_style = st.selectbox("Sponsor Integration Style", ["Separate Break", "Blended"]) | |
| custom_bg_music_file = st.file_uploader("Upload Custom Background Music (Optional)", type=["mp3", "wav"]) | |
| custom_bg_music_path = None | |
| if custom_bg_music_file: | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(custom_bg_music_file.name)[1]) as tmp: | |
| tmp.write(custom_bg_music_file.read()) | |
| custom_bg_music_path = tmp.name | |
| if "audio_bytes" not in st.session_state: | |
| st.session_state["audio_bytes"] = None | |
| if "transcript" not in st.session_state: | |
| st.session_state["transcript"] = None | |
| if "transcript_original" not in st.session_state: | |
| st.session_state["transcript_original"] = None | |
| if "qa_count" not in st.session_state: | |
| st.session_state["qa_count"] = 0 | |
| if "conversation_history" not in st.session_state: | |
| st.session_state["conversation_history"] = "" | |
| generate_button = st.button("Generate Podcast") | |
| if generate_button: | |
| progress_bar = st.progress(0) | |
| progress_text = st.empty() | |
| progress_messages = [ | |
| "🔍 Analyzing your input...", | |
| "📝 Crafting the perfect script...", | |
| "🎙️ Generating high-quality audio...", | |
| "🎶 Adding the finishing touches..." | |
| ] | |
| progress_text.write(progress_messages[0]) | |
| progress_bar.progress(0) | |
| time.sleep(1.0) | |
| progress_text.write(progress_messages[1]) | |
| progress_bar.progress(25) | |
| time.sleep(1.0) | |
| progress_text.write(progress_messages[2]) | |
| progress_bar.progress(50) | |
| time.sleep(1.0) | |
| progress_text.write(progress_messages[3]) | |
| progress_bar.progress(75) | |
| time.sleep(1.0) | |
| audio_bytes, transcript = generate_podcast( | |
| research_topic_input, | |
| tone, | |
| length_minutes, | |
| host_name, | |
| host_desc, | |
| guest_name, | |
| guest_desc, | |
| user_specs, | |
| sponsor_content, | |
| sponsor_style, | |
| custom_bg_music_path | |
| ) | |
| progress_bar.progress(100) | |
| progress_text.write("✅ Done!") | |
| if audio_bytes is None: | |
| st.error(transcript) | |
| st.session_state["audio_bytes"] = None | |
| st.session_state["transcript"] = None | |
| st.session_state["transcript_original"] = None | |
| else: | |
| st.success("Podcast generated successfully!") | |
| st.session_state["audio_bytes"] = audio_bytes | |
| st.session_state["transcript"] = transcript | |
| st.session_state["transcript_original"] = transcript | |
| st.session_state["qa_count"] = 0 | |
| st.session_state["conversation_history"] = "" | |
| if st.session_state.get("audio_bytes"): | |
| st.audio(st.session_state["audio_bytes"], format='audio/mp3') | |
| st.download_button( | |
| label="Download Podcast (MP3)", | |
| data=st.session_state["audio_bytes"], | |
| file_name="my_podcast.mp3", | |
| mime="audio/mpeg" | |
| ) | |
| st.markdown("### Generated Transcript (Editable)") | |
| edited_text = st.text_area( | |
| "Feel free to tweak lines, fix errors, or reword anything.", | |
| value=st.session_state["transcript"], | |
| height=300 | |
| ) | |
| if st.session_state.get("transcript_original"): | |
| highlighted_transcript = highlight_differences( | |
| st.session_state["transcript_original"], | |
| edited_text | |
| ) | |
| st.markdown("### **Edited Transcript Highlights**", unsafe_allow_html=True) | |
| st.markdown(highlighted_transcript, unsafe_allow_html=True) | |
| if st.button("Regenerate Audio From Edited Text"): | |
| regen_bar = st.progress(0) | |
| regen_text = st.empty() | |
| regen_text.write("🔄 Regenerating your podcast with the edits...") | |
| regen_bar.progress(25) | |
| time.sleep(1.0) | |
| regen_text.write("🔧 Adjusting the script based on your changes...") | |
| regen_bar.progress(50) | |
| time.sleep(1.0) | |
| dialogue_items = parse_user_edited_transcript( | |
| edited_text, | |
| host_name or "Jane", | |
| guest_name or "John" | |
| ) | |
| new_audio_bytes, new_transcript = regenerate_audio_from_dialogue(dialogue_items, custom_bg_music_path) | |
| regen_bar.progress(75) | |
| time.sleep(1.0) | |
| if new_audio_bytes is None: | |
| regen_bar.progress(100) | |
| st.error(new_transcript) | |
| else: | |
| regen_bar.progress(100) | |
| regen_text.write("✅ Regeneration complete!") | |
| st.success("Regenerated audio below:") | |
| st.session_state["audio_bytes"] = new_audio_bytes | |
| st.session_state["transcript"] = new_transcript | |
| st.session_state["transcript_original"] = new_transcript | |
| st.audio(new_audio_bytes, format='audio/mp3') | |
| st.download_button( | |
| label="Download Edited Podcast (MP3)", | |
| data=new_audio_bytes, | |
| file_name="my_podcast_edited.mp3", | |
| mime="audio/mpeg" | |
| ) | |
| st.markdown("### Updated Transcript") | |
| st.markdown(new_transcript) | |
| st.markdown("## Post-Podcast Q&A") | |
| used_questions = st.session_state.get("qa_count", 0) | |
| remaining = MAX_QA_QUESTIONS - used_questions | |
| if remaining > 0: | |
| st.write(f"You can ask up to {remaining} more question(s).") | |
| typed_q = st.text_input("Type your follow-up question:") | |
| audio_q = st.audio_input("Or record an audio question (WAV)") | |
| if st.button("Submit Q&A"): | |
| if used_questions >= MAX_QA_QUESTIONS: | |
| st.warning("You have reached the Q&A limit.") | |
| else: | |
| question_text = typed_q.strip() | |
| if audio_q is not None: | |
| suffix = ".wav" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: | |
| tmp.write(audio_q.read()) | |
| local_audio_path = tmp.name | |
| st.write("Transcribing your audio question...") | |
| audio_transcript = transcribe_audio_deepgram(local_audio_path) | |
| if audio_transcript: | |
| question_text = audio_transcript | |
| if not question_text: | |
| st.warning("No question found (text or audio).") | |
| else: | |
| st.write("Generating an answer...") | |
| ans_audio, ans_text = handle_qa_exchange(question_text) | |
| if ans_audio: | |
| st.audio(ans_audio, format='audio/mp3') | |
| st.markdown(f"**John**: {ans_text}") | |
| st.session_state["qa_count"] = used_questions + 1 | |
| else: | |
| st.warning("No response could be generated.") | |
| else: | |
| st.write("You have used all 5 Q&A opportunities.") | |
| st.markdown("<footer>©2025 MyPod. All rights reserved.</footer>", unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |