Spaces:
Paused
Paused
| import datetime | |
| import json | |
| import logging | |
| import email | |
| from email.message import Message | |
| from email.mime.multipart import MIMEMultipart | |
| from email.mime.text import MIMEText | |
| from models import EmailModel | |
| _logger = logging.getLogger(__name__) | |
| _logger.setLevel(logging.INFO) | |
| def get_email_model(msg: Message): | |
| subparts = [ | |
| get_email_model(subpart) | |
| for subpart in msg.get_payload() | |
| ] if msg.is_multipart() else [] | |
| body = "" if msg.is_multipart() else msg._payload | |
| return EmailModel( | |
| headers={k: v for k, v in msg.items()}, | |
| body=body, | |
| content_type=msg.get_content_type(), | |
| size=len(body) + sum(subpart.size for subpart in subparts), | |
| subparts=subparts, | |
| ) | |
| def parse_email(raw: str) -> EmailModel: | |
| try: | |
| msg = email.message_from_string(raw) | |
| return get_email_model(msg) | |
| except Exception as e: | |
| _logger.error(f"Could not parse email: {e}") | |
| return EmailModel( | |
| headers={}, | |
| body="could not parse email", | |
| content_type="text/plain", | |
| size=len("could not parse email"), | |
| subparts=[], | |
| ) | |
| def generate_email_model(item: dict) -> EmailModel: | |
| email_json = json.loads(item["raw"]) | |
| message = MIMEMultipart() | |
| if email_json.get("version") == "v2": | |
| message['From'] = f'{email_json["from_name"]} <{item["address"]}>' if email_json.get("from_name") else item["address"] | |
| message['To'] = f'{email_json["to_name"]} <{email_json["to_mail"]}>' if email_json.get("to_name") else email_json["to_mail"] | |
| message.attach(MIMEText( | |
| email_json["content"], | |
| "html" if email_json.get("is_html") else "plain" | |
| )) | |
| else: | |
| message['From'] = f'{email_json["from"]["name"]} <{email_json["from"]["email"]}>' | |
| message['To'] = ", ".join( | |
| [f"{to['name']} <{to['email']}>" for to in email_json["personalizations"][0]["to"]]) | |
| message.attach(MIMEText( | |
| email_json["content"][0]["value"], | |
| "html" if "html" in email_json["content"][0]["type"] else "plain" | |
| )) | |
| message['Subject'] = email_json["subject"] | |
| message["Date"] = datetime.datetime.strptime( | |
| item["created_at"], "%Y-%m-%d %H:%M:%S" | |
| ).strftime("%a, %d %b %Y %H:%M:%S +0000") | |
| return parse_email(message.as_string()) | |