import email import imaplib import json import mimetypes import os import re import smtplib import time import base64 from email.header import decode_header from email.message import EmailMessage from socket import socket from bs4 import BeautifulSoup from Brain.src.common.utils import PROXY_IP, PROXY_PORT # email variables EMAIL_SMTP_HOST = "smtp.gmail.com" EMAIL_SMTP_PORT = 587 EMAIL_IMAP_SERVER = "imap.gmail.com" EMAIL_SIGNATURE = "This was sent by Rising Brain" class EmailPlugin: def send_email( self, sender: str, pwd: str, to: str, subject: str, body: str, to_send: bool ) -> str: return self.send_email_with_attachment_internal( sender=sender, pwd=pwd, to=to, title=subject, message=body, attachment=None, attachment_path=None, to_send=to_send, ) def send_email_with_attachment( self, sender: str, pwd: str, to: str, subject: str, body: str, filename: str, to_send: bool, ) -> str: attachment_path = filename attachment = os.path.basename(filename) return self.send_email_with_attachment_internal( sender=sender, pwd=pwd, to=to, title=subject, message=body, attachment_path=attachment_path, attachment=attachment, to_send=to_send, ) def send_email_with_attachment_internal( self, sender: str, pwd: str, to: str, title: str, message: str, attachment_path: str | None, attachment: str | None, to_send: bool, ) -> str: """Send an email Args: sender (str): The email of the sender pwd (str): The password of the sender to (str): The email of the recipient title (str): The title of the email message (str): The message content of the email Returns: str: Any error messages """ email_sender = sender email_password = pwd msg = EmailMessage() msg["Subject"] = title msg["From"] = email_sender msg["To"] = to signature = EMAIL_SIGNATURE if signature: message += f"\n{signature}" msg.set_content(message) if attachment_path: ctype, encoding = mimetypes.guess_type(attachment_path) if ctype is None or encoding is not None: # No guess could be made, or the file is encoded (compressed) ctype = "application/octet-stream" maintype, subtype = ctype.split("/", 1) with open(file=attachment_path, mode="rb") as fp: msg.add_attachment( fp.read(), maintype=maintype, subtype=subtype, filename=attachment ) if to_send: smtp_host = EMAIL_SMTP_HOST smtp_port = EMAIL_SMTP_PORT # send email with smtplib.SMTP(host=smtp_host, port=smtp_port) as smtp: smtp.ehlo() smtp.starttls() smtp.login(user=email_sender, password=email_password) smtp.send_message(msg) smtp.quit() return f"Email was sent to {to}!" else: conn = self.imap_open( imap_folder="[Gmail]/Drafts", email_sender=email_sender, email_password=email_password, ) conn.append( mailbox="[Gmail]/Drafts", flags="", date_time=imaplib.Time2Internaldate(time.time()), message=str(msg).encode("UTF-8"), ) return f"Email went to [Gmail]/Drafts!" def read_emails( self, sender: str, pwd: str, imap_folder: str = "inbox", imap_search_command: str = "UNSEEN", limit: int = 5, page: int = 1, ) -> str: """Read emails from an IMAP mailbox. This function reads emails from a specified IMAP folder, using a given IMAP search command, limits, and page numbers. It returns a list of emails with their details, including the sender, recipient, date, CC, subject, and message body. Args: sender (str): The email of the sender pwd (str): The password of the sender imap_folder (str, optional): The name of the IMAP folder to read emails from. Defaults to "inbox". imap_search_command (str, optional): The IMAP search command to filter emails. Defaults to "UNSEEN". limit (int, optional): Number of email's the function should return. Defaults to 5 emails. page (int, optional): The index of the page result the function should resturn. Defaults to 0, the first page. Returns: str: A list of dictionaries containing email details if there are any matching emails. """ email_sender = sender imap_folder = self.adjust_imap_folder_for_gmail( imap_folder=imap_folder, email_sender=email_sender ) imap_folder = self.enclose_with_quotes(imap_folder) imap_search_ar = self.split_imap_search_command(imap_search_command) email_password = pwd mark_as_seen = "False" if isinstance(mark_as_seen, str): mark_as_seen = json.loads(mark_as_seen.lower()) conn = self.imap_open( imap_folder=imap_folder, email_sender=email_sender, email_password=email_password, ) imap_keyword = imap_search_ar[0] if len(imap_search_ar) == 1: _, search_data = conn.search(None, imap_keyword) else: argument = self.enclose_with_quotes(imap_search_ar[1]) _, search_data = conn.search(None, imap_keyword, argument) messages = [] for num in search_data[0].split(): if mark_as_seen: message_parts = "(RFC822)" else: message_parts = "(BODY.PEEK[])" _, msg_data = conn.fetch(message_set=num, message_parts=message_parts) for response_part in msg_data: if isinstance(response_part, tuple): msg = email.message_from_bytes(response_part[1]) # If the subject has unknown encoding, return blank if msg["Subject"] is not None: subject, encoding = decode_header(msg["Subject"])[0] else: subject = "" encoding = "" if isinstance(subject, bytes): try: # If the subject has unknown encoding, return blank if encoding is not None: subject = subject.decode(encoding) else: subject = "" except [LookupError] as e: pass body = self.get_email_body(msg) # Clean email body body = self.clean_email_body(body) from_address = msg["From"] to_address = msg["To"] date = msg["Date"] cc = msg["CC"] if msg["CC"] else "" messages.append( { "from": from_address, "to": to_address, "date": date, "cc": cc, "subject": subject, "body": body, } ) conn.logout() if not messages: messages.append( { "from": "", "to": "", "date": "", "cc": "", "subject": "", "body": "There are no Emails", } ) return json.dumps(messages) # Confirm that integer parameters are the right type limit = int(limit) page = int(page) # Validate parameter values if limit < 1: raise ValueError("Error: The message limit should be 1 or greater") page_count = len(messages) // limit + (len(messages) % limit > 0) if page < 1 or page > page_count: raise ValueError( "Error: The page value references a page that is not part of the results" ) # Calculate paginated indexes start_index = len(messages) - (page * limit + 1) end_index = start_index + limit start_index = max(start_index, 0) # Return paginated indexes if start_index == end_index: return json.dumps([messages[start_index]]) else: return json.dumps(messages[start_index:end_index]) def adjust_imap_folder_for_gmail(self, imap_folder: str, email_sender: str) -> str: if "@gmail" in email_sender.lower() or "@googlemail" in email_sender.lower(): if "sent" in imap_folder.lower(): return '"[Gmail]/Sent Mail"' if "draft" in imap_folder.lower(): return "[Gmail]/Drafts" return imap_folder def imap_open( self, imap_folder: str, email_sender: str, email_password: str ) -> imaplib.IMAP4_SSL: # Create a new socket object for later connections as a proxy # IMAP Server Connect imap_server = EMAIL_IMAP_SERVER conn = imaplib.IMAP4_SSL(imap_server) conn.login(user=email_sender, password=email_password) conn.select(imap_folder) return conn def get_email_body(self, msg: email.message.Message) -> str: if msg.is_multipart(): for part in msg.walk(): content_type = part.get_content_type() content_disposition = str(part.get("Content-Disposition")) if ( content_type == "text/plain" and "attachment" not in content_disposition ): # If the email body has unknown encoding, return null try: return part.get_payload(decode=True).decode() except UnicodeDecodeError as e: pass else: try: # If the email body has unknown encoding, return null return msg.get_payload(decode=True).decode() except UnicodeDecodeError as e: pass def enclose_with_quotes(self, s): # Check if string contains whitespace has_whitespace = bool(re.search(r"\s", s)) # Check if string is already enclosed by quotes is_enclosed = s.startswith(("'", '"')) and s.endswith(("'", '"')) # If string has whitespace and is not enclosed by quotes, enclose it with double quotes if has_whitespace and not is_enclosed: return f'"{s}"' else: return s def split_imap_search_command(self, input_string): input_string = input_string.strip() parts = input_string.split(maxsplit=1) parts = [part.strip() for part in parts] return parts def clean_email_body(self, email_body): """Remove formating and URL's from an email's body Args: email_body (str, optional): The email's body Returns: str: The email's body without any formating or URL's """ # If body is None, return an empty string if email_body is None: email_body = "" # Remove any HTML tags email_body = BeautifulSoup(email_body, "html.parser") email_body = email_body.get_text() # Remove return characters email_body = "".join(email_body.splitlines()) # Remove extra spaces email_body = " ".join(email_body.split()) # Remove unicode characters email_body = email_body.encode("ascii", "ignore") email_body = email_body.decode("utf-8", "ignore") # Remove any remaining URL's email_body = re.sub(r"http\S+", "", email_body) return email_body def write_attachment(self, filename: str, file_content: str) -> (str, str): # create folder for temporarily saving attached file milliseconds = int(time.time() * 1000) file_path = f"Brain/assets/{milliseconds}/{filename}" file_directory = f"Brain/assets/{milliseconds}" os.mkdir(file_directory) # file write file_content = base64.b64decode(file_content).decode("utf-8") file = open(file_path, "w") file.write(file_content) file.close() return file_path, file_directory