Spaces:
Paused
Paused
| import logging | |
| import smtplib | |
| import re | |
| from email.mime.text import MIMEText | |
| from flask import Flask, request, jsonify | |
| from dotenv import load_dotenv | |
| import os | |
| # Load environment variables | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("bot_brain") | |
| # SMTP Server Configuration | |
| SMTP_HOST = os.getenv("SMTP_HOST", "0.0.0.0") | |
| SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) | |
| CLIENT_SMTP_HOST = os.getenv("CLIENT_SMTP_HOST", "localhost") | |
| SMTP_USER = os.getenv("SMTP_USER", "noreply@yourdomain.com") | |
| SMTP_PASSWORD = os.getenv("SMTP_PASSWORD", "password") | |
| # Global conversation state: maps sender's WhatsApp number to conversation data. | |
| conversations = {} | |
| # Regex for validating email addresses | |
| EMAIL_REGEX = re.compile(r"^[^@]+@[^@]+\.[^@]+$") | |
| def validate_email(email: str) -> bool: | |
| """Validate an email address using regex.""" | |
| return re.match(EMAIL_REGEX, email) is not None | |
| def send_email(from_addr: str, to_addr: str, subject: str, body: str) -> bool: | |
| """ | |
| Sends an email using the SMTP client. | |
| Note: Some SMTP servers restrict the 'From' address. | |
| """ | |
| msg = MIMEText(body) | |
| msg['Subject'] = subject | |
| msg['From'] = from_addr | |
| msg['To'] = to_addr | |
| try: | |
| with smtplib.SMTP(CLIENT_SMTP_HOST, SMTP_PORT) as server: | |
| server.starttls() # Upgrade to TLS | |
| server.login(SMTP_USER, SMTP_PASSWORD) | |
| server.sendmail(from_addr, [to_addr], msg.as_string()) | |
| logger.info(f"Email sent from {from_addr} to {to_addr}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error sending email from {from_addr} to {to_addr}: {e}") | |
| return False | |
| # Flask app acting as the Bot Brain API | |
| app = Flask(__name__) | |
| def bot(): | |
| """ | |
| Receives a message from the Node.js server. | |
| The bot implements a conversation flow: | |
| 1. Ask for the 'From' email address. | |
| 2. Ask for the recipient ('To') email address. | |
| 3. Ask for the email subject. | |
| 4. Ask for the email body. | |
| When all details are collected, the email is sent. | |
| """ | |
| try: | |
| data = request.json | |
| sender_number = data.get("from", "unknown") | |
| message_body = data.get("message", {}).get("text", {}).get("body", "").strip() | |
| except Exception as e: | |
| logger.error("Invalid JSON payload: %s", e) | |
| return jsonify({"status": "error", "message": "Invalid payload"}), 400 | |
| state = conversations.get(sender_number, {}) | |
| if not state: | |
| # Start a new conversation | |
| conversations[sender_number] = {"step": "from"} | |
| response_msg = "Welcome to SMTP Assistant! Please provide your 'From' email address." | |
| else: | |
| step = state.get("step") | |
| if step == "from": | |
| if validate_email(message_body): | |
| state["from"] = message_body | |
| state["step"] = "to" | |
| response_msg = "Thanks! Now, please provide the recipient's email address." | |
| else: | |
| response_msg = "Invalid email format. Please provide a valid 'From' email address." | |
| elif step == "to": | |
| if validate_email(message_body): | |
| state["to"] = message_body | |
| state["step"] = "subject" | |
| response_msg = "Great! Please provide the email subject." | |
| else: | |
| response_msg = "Invalid email format. Please provide a valid recipient email address." | |
| elif step == "subject": | |
| state["subject"] = message_body | |
| state["step"] = "body" | |
| response_msg = "Almost done! Please provide the email body." | |
| elif step == "body": | |
| state["body"] = message_body | |
| from_addr = state.get("from") | |
| to_addr = state.get("to") | |
| subject = state.get("subject") | |
| body = state.get("body") | |
| if send_email(from_addr, to_addr, subject, body): | |
| response_msg = f"Email sent successfully from {from_addr} to {to_addr}." | |
| else: | |
| response_msg = "Failed to send email. Please try again later." | |
| # Clear the conversation state after sending | |
| del conversations[sender_number] | |
| else: | |
| response_msg = "Unrecognized step. Let's start over. Please provide your 'From' email address." | |
| conversations[sender_number] = {"step": "from"} | |
| return jsonify({"status": "success", "message": response_msg}) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=5000) |