Spaces:
Sleeping
Sleeping
| import os | |
| import smtplib | |
| import whisper | |
| from email.mime.text import MIMEText | |
| from groq import Groq | |
| from gtts import gTTS | |
| import tempfile | |
| import streamlit as st | |
| from dotenv import load_dotenv | |
| from pydub import AudioSegment | |
| # ========================== | |
| # π Secrets (stored in .env) | |
| # ========================== | |
| load_dotenv() | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS") | |
| EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD") | |
| client = Groq(api_key=GROQ_API_KEY) | |
| whisper_model = whisper.load_model("base") | |
| # ========================== | |
| # π MCP Context | |
| # ========================== | |
| def init_context(user_input, recipients, tone="formal"): | |
| return { | |
| "user_input": user_input, | |
| "recipient_list": recipients, | |
| "email_type": tone, | |
| "email_content": "", | |
| "status": "" | |
| } | |
| # ========================== | |
| # βοΈ Email Generator Agent | |
| # ========================== | |
| def email_generator_agent(context): | |
| prompt = f"Write a {context['email_type']} email with the following details:\n\n{context['user_input']}" | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| context["email_content"] = response.choices[0].message.content.strip() | |
| except Exception as e: | |
| context["email_content"] = f"Error generating email: {e}" | |
| return context | |
| # ========================== | |
| # π Translator Agent | |
| # ========================== | |
| def translate_email(content, target_language): | |
| prompt = f"Translate the following email into {target_language}:\n\n{content}" | |
| try: | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| return response.choices[0].message.content.strip() | |
| except Exception as e: | |
| return f"Translation failed: {e}" | |
| # ========================== | |
| # π€ Email Sender Agent | |
| # ========================== | |
| def email_sender_agent(context): | |
| message = MIMEText(context["email_content"]) | |
| message["Subject"] = "AI Agent Email" | |
| message["From"] = EMAIL_ADDRESS | |
| success = 0 | |
| for recipient in context["recipient_list"]: | |
| message["To"] = recipient | |
| try: | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: | |
| server.login(EMAIL_ADDRESS, EMAIL_PASSWORD) | |
| server.send_message(message) | |
| success += 1 | |
| except Exception as e: | |
| st.error(f"β Failed to send to {recipient}: {e}") | |
| print(f"[DEBUG] Email send error for {recipient}: {e}") # Debug print | |
| context["status"] = f"β Email sent to {success} recipient(s)." | |
| return context | |
| # ========================== | |
| # π§ TTS and Playback | |
| # ========================== | |
| def speak_and_download(text, lang="en"): | |
| try: | |
| tts = gTTS(text=text, lang=lang) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: | |
| path = fp.name | |
| tts.save(path) | |
| return path | |
| except Exception as e: | |
| st.error(f"TTS error: {e}") | |
| return None | |
| # ========================== | |
| # π Whisper Transcriber | |
| # ========================== | |
| def transcribe_audio(file): | |
| try: | |
| if isinstance(file, str): | |
| audio = AudioSegment.from_file(file) | |
| else: | |
| if file.name.endswith(".mp3"): | |
| audio = AudioSegment.from_mp3(file) | |
| elif file.name.endswith(".wav"): | |
| audio = AudioSegment.from_wav(file) | |
| else: | |
| st.error("Unsupported file type.") | |
| return "" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: | |
| audio.export(tmp.name, format="wav") | |
| result = whisper_model.transcribe(tmp.name) | |
| return result["text"] | |
| except Exception as e: | |
| st.error(f"β Transcription error: {e}") | |
| return "" | |
| # ========================== | |
| # π Streamlit Interface | |
| # ========================== | |
| st.set_page_config(page_title="Email Generator AI", layout="centered") | |
| st.title("π§ AI Email Generator with Voice Input") | |
| # Session state init | |
| if "context" not in st.session_state: | |
| st.session_state.context = None | |
| mode = st.radio("Choose Input Mode", ["Text Input", "Upload Audio File", "Paste Full Email"]) | |
| recipient_input = st.text_input("Recipient Emails (comma-separated)", "") | |
| tone = st.text_input("Email Tone (Formal, Friendly, Apologetic, etc.)", "Formal") | |
| recipient_list = [r.strip() for r in recipient_input.split(",") if r.strip()] | |
| if mode == "Text Input": | |
| user_request = st.text_area("What should the email say?") | |
| if st.button("Generate Email"): | |
| st.session_state.context = init_context(user_request, recipient_list, tone) | |
| st.session_state.context = email_generator_agent(st.session_state.context) | |
| elif mode == "Paste Full Email": | |
| full_email = st.text_area("Paste your complete email") | |
| if st.button("Use Email"): | |
| st.session_state.context = { | |
| "user_input": "", | |
| "recipient_list": recipient_list, | |
| "email_type": "manual", | |
| "email_content": full_email.strip(), | |
| "status": "" | |
| } | |
| elif mode == "Upload Audio File": | |
| uploaded_audio = st.file_uploader("Upload an audio file (.wav or .mp3)", type=["wav", "mp3"]) | |
| if uploaded_audio and st.button("Transcribe and Generate Email"): | |
| transcribed = transcribe_audio(uploaded_audio) | |
| st.success(f"Transcribed Text: {transcribed}") | |
| st.session_state.context = init_context(transcribed, recipient_list, tone) | |
| st.session_state.context = email_generator_agent(st.session_state.context) | |
| # ========== Show Email Output ========== # | |
| if st.session_state.context: | |
| context = st.session_state.context | |
| st.subheader("π¨ Generated Email") | |
| st.text_area("Email Content", context["email_content"], height=200) | |
| audio_path = speak_and_download(context["email_content"]) | |
| if audio_path: | |
| st.audio(audio_path) | |
| # Translation | |
| if st.checkbox("π Translate Email"): | |
| lang = st.text_input("Translate to (e.g., Urdu, Spanish, German)") | |
| lang_map = { | |
| "german": "de", | |
| "spanish": "es", | |
| "urdu": "ur", | |
| "french": "fr", | |
| "english": "en" | |
| } | |
| target_lang_code = lang_map.get(lang.lower(), "en") | |
| if lang and st.button("Translate"): | |
| translated = translate_email(context["email_content"], lang) | |
| st.success("Translated Email:") | |
| st.text_area("Translated Email", translated, height=200) | |
| translated_audio_path = speak_and_download(translated, lang=target_lang_code) | |
| if translated_audio_path: | |
| st.audio(translated_audio_path) | |
| st.session_state.context["email_content"] = translated | |
| # Email Sending | |
| if st.button("βοΈ Send Email"): | |
| st.session_state.context = email_sender_agent(st.session_state.context) | |
| st.success(st.session_state.context["status"]) | |