stmp_server / smtp_server.py
kpinquan's picture
Upload smtp_server.py
74240ef verified
import asyncio
import logging
import email
import httpx
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP, Session, Envelope, AuthResult, LoginPassword
from config import settings
_logger = logging.getLogger(__name__)
_logger.setLevel(logging.INFO)
class CustomSMTPHandler:
def authenticator(self, server, session, envelope, mechanism, auth_data):
fail_nothandled = AuthResult(success=False, handled=False)
if mechanism not in ("LOGIN", "PLAIN"):
_logger.warning(f"Unsupported mechanism {mechanism}")
return fail_nothandled
if not isinstance(auth_data, LoginPassword):
_logger.warning(f"Invalid auth data {auth_data}")
return fail_nothandled
return AuthResult(success=True, auth_data=auth_data)
async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str:
_logger.info(
f"handle_DATA from {envelope.mail_from} to {envelope.rcpt_tos}"
)
if not isinstance(session.auth_data, LoginPassword):
return '530 Authentication required'
if len(envelope.rcpt_tos) != 1:
return '500 Only one recipient allowed'
# Only one recipient allowed
to_mail = envelope.rcpt_tos[0]
# Parse email
msg = email.message_from_string(envelope.content)
content_list = []
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
charset = part.get_content_charset()
cte = str(part.get('content-transfer-encoding', '')).lower()
if content_type not in ["text/plain", "text/html"]:
_logger.warning(f"Skipping {content_type}")
continue
if cte == "8bit":
value = part.get_payload(decode=False)
else:
payload = part.get_payload(decode=True)
value = payload.decode(charset) if charset else payload
if not value:
continue
content_list.append({
"type": content_type,
"value": value
})
elif msg.get_content_type() in ["text/plain", "text/html"] and msg.get_payload(decode=True):
cte = str(msg.get('content-transfer-encoding', '')).lower()
charset = msg.get_content_charset()
if cte == "8bit":
value = msg.get_payload(decode=False)
else:
payload = msg.get_payload(decode=True)
value = payload.decode(charset) if charset else payload
_logger.info(f"Payload {msg._payload} charset {charset}")
content_list.append({
"type": msg.get_content_type(),
"value": value
})
if not content_list:
return '500 Invalid content'
body = max(
content_list,
key=lambda x: (x["type"] == "text/html", len(x["value"]))
)
from_name, _ = email.utils.parseaddr(
str(email.header.make_header(
email.header.decode_header(msg['From'])
))
)
to_mail_map = {}
for to in str(email.header.make_header(
email.header.decode_header(msg['To'])
)).split(","):
tmp_to_name, tmp_to_mail = email.utils.parseaddr(to)
to_mail_map[tmp_to_mail] = tmp_to_name
_logger.info(f"Parsed mail from {from_name} to {to_mail_map}")
# Send mail
send_body = {
"token": session.auth_data.password.decode(),
"from_name": from_name,
"to_name": to_mail_map.get(to_mail),
"to_mail": to_mail,
"subject": str(email.header.make_header(
email.header.decode_header(msg['Subject'])
)),
"is_html": body["type"] == "text/html",
"content": body["value"],
}
_logger.info(f"Send mail {dict(send_body, token='***')}")
try:
res = httpx.post(
f"{settings.proxy_url}/external/api/send_mail",
json=send_body, headers={
"Content-Type": "application/json"
}
)
if res.status_code != 200:
_logger.error(
"Failed to send mail "
f"code=[{res.status_code}] text=[{res.text}]"
)
return f'500 Internal server error code=[{res.status_code}] text=[{res.text}]'
except Exception as e:
_logger.error(e)
return '500 Internal server error'
return '250 OK'
handler = CustomSMTPHandler()
server = Controller(
handler,
hostname="",
port=settings.port,
auth_require_tls=False,
decode_data=True,
authenticator=handler.authenticator,
auth_exclude_mechanism=["DONT"]
)
async def start():
_logger.info(f"Starting server on port {settings.port}")
server.start()
def start_smtp_server():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(start())
try:
loop.run_forever()
except KeyboardInterrupt:
_logger.info("Got KeyboardInterrupt, stopping")
server.stop()
if __name__ == "__main__":
_logger.info(f"Starting server settings[{settings}]")
start_smtp_server()