Spaces:
Paused
Paused
| import logging | |
| import ssl | |
| from aiosmtpd.controller import Controller | |
| from aiosmtpd.handlers import Sink | |
| from aiosmtpd.smtp import SMTP, AuthResult, LoginPassword | |
| from email.policy import default | |
| from typing import Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("smtp_server") | |
| # SMTP Server Configuration | |
| SMTP_HOST = "0.0.0.0" # Bind to all interfaces | |
| SMTP_PORT = 587 # Standard SMTP submission port | |
| TLS_CERTFILE = "cert.pem" # Path to your SSL certificate | |
| TLS_KEYFILE = "key.pem" # Path to your SSL private key | |
| # Authentication Credentials (for demo purposes; use a secure vault in production) | |
| VALID_USERS = { | |
| "user1": "password1", | |
| "user2": "password2", | |
| } | |
| class AuthenticatedSMTPHandler(Sink): | |
| async def handle_AUTH(self, server: SMTP, args: str, mechanism: str) -> Optional[AuthResult]: | |
| if mechanism.upper() != "LOGIN": | |
| return AuthResult(success=False, handled=False) | |
| return None | |
| async def handle_AUTH_LOGIN(self, server: SMTP, args: str) -> Optional[AuthResult]: | |
| try: | |
| username, password = args.split() | |
| if username in VALID_USERS and VALID_USERS[username] == password: | |
| return AuthResult(success=True, auth_data=LoginPassword(username, password)) | |
| else: | |
| return AuthResult(success=False, message="Invalid credentials") | |
| except Exception as e: | |
| logger.error(f"Authentication error: {e}") | |
| return AuthResult(success=False, message="Authentication failed") | |
| async def handle_DATA(self, server: SMTP, session, envelope): | |
| # Log the received email | |
| logger.info(f"Received email from: {envelope.mail_from}") | |
| logger.info(f"Recipients: {envelope.rcpt_tos}") | |
| logger.info(f"Message data:\n{envelope.content.decode('utf-8')}") | |
| return "250 Message accepted for delivery" | |
| def start_smtp_server(): | |
| # Create SSL context for TLS | |
| ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) | |
| ssl_context.load_cert_chain(TLS_CERTFILE, TLS_KEYFILE) | |
| # Create the SMTP handler | |
| handler = AuthenticatedSMTPHandler() | |
| # Start the SMTP server with TLS and authentication | |
| controller = Controller( | |
| handler, | |
| hostname=SMTP_HOST, | |
| port=SMTP_PORT, | |
| enable_SMTPUTF8=True, | |
| ssl_context=ssl_context, | |
| auth_required=True, | |
| ) | |
| logger.info(f"Starting SMTP server on {SMTP_HOST}:{SMTP_PORT} with TLS and authentication...") | |
| controller.start() | |
| try: | |
| # Keep the server running | |
| while True: | |
| pass | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down SMTP server...") | |
| controller.stop() | |
| if __name__ == "__main__": | |
| start_smtp_server() |