Emailing_Agent / app.py
chburhan64's picture
Update app.py
b464319 verified
import os
import smtplib
import whisper
from email.mime.text import MIMEText
from groq import Groq
from gtts import gTTS
import tempfile
import streamlit as st
from dotenv import load_dotenv
from pydub import AudioSegment
# ==========================
# πŸ” Secrets (stored in .env)
# ==========================
load_dotenv()
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
EMAIL_ADDRESS = os.getenv("EMAIL_ADDRESS")
EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD")
client = Groq(api_key=GROQ_API_KEY)
whisper_model = whisper.load_model("base")
# ==========================
# πŸ“„ MCP Context
# ==========================
def init_context(user_input, recipients, tone="formal"):
return {
"user_input": user_input,
"recipient_list": recipients,
"email_type": tone,
"email_content": "",
"status": ""
}
# ==========================
# ✍️ Email Generator Agent
# ==========================
def email_generator_agent(context):
prompt = f"Write a {context['email_type']} email with the following details:\n\n{context['user_input']}"
try:
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[{"role": "user", "content": prompt}]
)
context["email_content"] = response.choices[0].message.content.strip()
except Exception as e:
context["email_content"] = f"Error generating email: {e}"
return context
# ==========================
# 🌐 Translator Agent
# ==========================
def translate_email(content, target_language):
prompt = f"Translate the following email into {target_language}:\n\n{content}"
try:
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Translation failed: {e}"
# ==========================
# πŸ“€ Email Sender Agent
# ==========================
def email_sender_agent(context):
message = MIMEText(context["email_content"])
message["Subject"] = "AI Agent Email"
message["From"] = EMAIL_ADDRESS
success = 0
for recipient in context["recipient_list"]:
message["To"] = recipient
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
server.send_message(message)
success += 1
except Exception as e:
st.error(f"❌ Failed to send to {recipient}: {e}")
print(f"[DEBUG] Email send error for {recipient}: {e}") # Debug print
context["status"] = f"βœ… Email sent to {success} recipient(s)."
return context
# ==========================
# 🧠 TTS and Playback
# ==========================
def speak_and_download(text, lang="en"):
try:
tts = gTTS(text=text, lang=lang)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
path = fp.name
tts.save(path)
return path
except Exception as e:
st.error(f"TTS error: {e}")
return None
# ==========================
# πŸ“œ Whisper Transcriber
# ==========================
def transcribe_audio(file):
try:
if isinstance(file, str):
audio = AudioSegment.from_file(file)
else:
if file.name.endswith(".mp3"):
audio = AudioSegment.from_mp3(file)
elif file.name.endswith(".wav"):
audio = AudioSegment.from_wav(file)
else:
st.error("Unsupported file type.")
return ""
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
audio.export(tmp.name, format="wav")
result = whisper_model.transcribe(tmp.name)
return result["text"]
except Exception as e:
st.error(f"❌ Transcription error: {e}")
return ""
# ==========================
# πŸš€ Streamlit Interface
# ==========================
st.set_page_config(page_title="Email Generator AI", layout="centered")
st.title("πŸ“§ AI Email Generator with Voice Input")
# Session state init
if "context" not in st.session_state:
st.session_state.context = None
mode = st.radio("Choose Input Mode", ["Text Input", "Upload Audio File", "Paste Full Email"])
recipient_input = st.text_input("Recipient Emails (comma-separated)", "")
tone = st.text_input("Email Tone (Formal, Friendly, Apologetic, etc.)", "Formal")
recipient_list = [r.strip() for r in recipient_input.split(",") if r.strip()]
if mode == "Text Input":
user_request = st.text_area("What should the email say?")
if st.button("Generate Email"):
st.session_state.context = init_context(user_request, recipient_list, tone)
st.session_state.context = email_generator_agent(st.session_state.context)
elif mode == "Paste Full Email":
full_email = st.text_area("Paste your complete email")
if st.button("Use Email"):
st.session_state.context = {
"user_input": "",
"recipient_list": recipient_list,
"email_type": "manual",
"email_content": full_email.strip(),
"status": ""
}
elif mode == "Upload Audio File":
uploaded_audio = st.file_uploader("Upload an audio file (.wav or .mp3)", type=["wav", "mp3"])
if uploaded_audio and st.button("Transcribe and Generate Email"):
transcribed = transcribe_audio(uploaded_audio)
st.success(f"Transcribed Text: {transcribed}")
st.session_state.context = init_context(transcribed, recipient_list, tone)
st.session_state.context = email_generator_agent(st.session_state.context)
# ========== Show Email Output ========== #
if st.session_state.context:
context = st.session_state.context
st.subheader("πŸ“¨ Generated Email")
st.text_area("Email Content", context["email_content"], height=200)
audio_path = speak_and_download(context["email_content"])
if audio_path:
st.audio(audio_path)
# Translation
if st.checkbox("🌐 Translate Email"):
lang = st.text_input("Translate to (e.g., Urdu, Spanish, German)")
lang_map = {
"german": "de",
"spanish": "es",
"urdu": "ur",
"french": "fr",
"english": "en"
}
target_lang_code = lang_map.get(lang.lower(), "en")
if lang and st.button("Translate"):
translated = translate_email(context["email_content"], lang)
st.success("Translated Email:")
st.text_area("Translated Email", translated, height=200)
translated_audio_path = speak_and_download(translated, lang=target_lang_code)
if translated_audio_path:
st.audio(translated_audio_path)
st.session_state.context["email_content"] = translated
# Email Sending
if st.button("βœ‰οΈ Send Email"):
st.session_state.context = email_sender_agent(st.session_state.context)
st.success(st.session_state.context["status"])