SMTP / app.py
Fred808's picture
Update app.py
bebee92 verified
raw
history blame
4.56 kB
import logging
import smtplib
import re
from email.mime.text import MIMEText
from flask import Flask, request, jsonify
from dotenv import load_dotenv
import os
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("bot_brain")
# SMTP Server Configuration
SMTP_HOST = os.getenv("SMTP_HOST", "0.0.0.0")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
CLIENT_SMTP_HOST = os.getenv("CLIENT_SMTP_HOST", "localhost")
SMTP_USER = os.getenv("SMTP_USER", "noreply@yourdomain.com")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "password")
# Global conversation state: maps sender's WhatsApp number to conversation data.
conversations = {}
# Regex for validating email addresses
EMAIL_REGEX = re.compile(r"^[^@]+@[^@]+\.[^@]+$")
def validate_email(email: str) -> bool:
"""Validate an email address using regex."""
return re.match(EMAIL_REGEX, email) is not None
def send_email(from_addr: str, to_addr: str, subject: str, body: str) -> bool:
"""
Sends an email using the SMTP client.
Note: Some SMTP servers restrict the 'From' address.
"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = from_addr
msg['To'] = to_addr
try:
with smtplib.SMTP(CLIENT_SMTP_HOST, SMTP_PORT) as server:
server.starttls() # Upgrade to TLS
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(from_addr, [to_addr], msg.as_string())
logger.info(f"Email sent from {from_addr} to {to_addr}")
return True
except Exception as e:
logger.error(f"Error sending email from {from_addr} to {to_addr}: {e}")
return False
# Flask app acting as the Bot Brain API
app = Flask(__name__)
@app.route("/bot", methods=["POST"])
def bot():
"""
Receives a message from the Node.js server.
The bot implements a conversation flow:
1. Ask for the 'From' email address.
2. Ask for the recipient ('To') email address.
3. Ask for the email subject.
4. Ask for the email body.
When all details are collected, the email is sent.
"""
try:
data = request.json
sender_number = data.get("from", "unknown")
message_body = data.get("message", {}).get("text", {}).get("body", "").strip()
except Exception as e:
logger.error("Invalid JSON payload: %s", e)
return jsonify({"status": "error", "message": "Invalid payload"}), 400
state = conversations.get(sender_number, {})
if not state:
# Start a new conversation
conversations[sender_number] = {"step": "from"}
response_msg = "Welcome to SMTP Assistant! Please provide your 'From' email address."
else:
step = state.get("step")
if step == "from":
if validate_email(message_body):
state["from"] = message_body
state["step"] = "to"
response_msg = "Thanks! Now, please provide the recipient's email address."
else:
response_msg = "Invalid email format. Please provide a valid 'From' email address."
elif step == "to":
if validate_email(message_body):
state["to"] = message_body
state["step"] = "subject"
response_msg = "Great! Please provide the email subject."
else:
response_msg = "Invalid email format. Please provide a valid recipient email address."
elif step == "subject":
state["subject"] = message_body
state["step"] = "body"
response_msg = "Almost done! Please provide the email body."
elif step == "body":
state["body"] = message_body
from_addr = state.get("from")
to_addr = state.get("to")
subject = state.get("subject")
body = state.get("body")
if send_email(from_addr, to_addr, subject, body):
response_msg = f"Email sent successfully from {from_addr} to {to_addr}."
else:
response_msg = "Failed to send email. Please try again later."
# Clear the conversation state after sending
del conversations[sender_number]
else:
response_msg = "Unrecognized step. Let's start over. Please provide your 'From' email address."
conversations[sender_number] = {"step": "from"}
return jsonify({"status": "success", "message": response_msg})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)