File size: 13,076 Bytes
c9987a0
 
 
 
 
 
 
 
ddf1138
c9987a0
 
cc2210e
c9987a0
 
 
cc2210e
 
b78efaf
70ac6ad
7b3e5b8
70ac6ad
ddf1138
70ac6ad
aea9671
47c3bc2
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
c9987a0
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c9987a0
 
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70ac6ad
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
c9987a0
 
bd7ae30
70ac6ad
 
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c9987a0
 
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78967a7
 
 
 
 
 
bd7ae30
 
 
 
 
 
 
78967a7
 
 
 
 
 
bd7ae30
 
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
c9987a0
bd7ae30
 
 
 
c9987a0
bd7ae30
 
 
 
c9987a0
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cc2210e
 
 
70ac6ad
bd7ae30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
 
c9987a0
bd7ae30
 
 
 
 
c9987a0
bd7ae30
 
 
 
c9987a0
bd7ae30
c9987a0
bd7ae30
 
c9987a0
bd7ae30
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
 
 
c9987a0
bd7ae30
 
c9987a0
bd7ae30
 
c9987a0
bd7ae30
 
 
aea9671
bd7ae30
 
aea9671
bd7ae30
ddf1138
1a24c7c
 
 
 
 
 
 
 
ddf1138
1a24c7c
ddf1138
 
1a24c7c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
import email
import imaplib
import json
import mimetypes
import os
import re
import smtplib
import time
import base64
from email.header import decode_header
from email.message import EmailMessage
from socket import socket

from bs4 import BeautifulSoup

from Brain.src.common.utils import PROXY_IP, PROXY_PORT

# email variables
EMAIL_SMTP_HOST = "smtp.gmail.com"
EMAIL_SMTP_PORT = 587
EMAIL_IMAP_SERVER = "imap.gmail.com"
EMAIL_SIGNATURE = "This was sent by Rising Brain"


class EmailPlugin:
    def send_email(
        self, sender: str, pwd: str, to: str, subject: str, body: str, to_send: bool
    ) -> str:
        return self.send_email_with_attachment_internal(
            sender=sender,
            pwd=pwd,
            to=to,
            title=subject,
            message=body,
            attachment=None,
            attachment_path=None,
            to_send=to_send,
        )

    def send_email_with_attachment(
        self,
        sender: str,
        pwd: str,
        to: str,
        subject: str,
        body: str,
        filename: str,
        to_send: bool,
    ) -> str:
        attachment_path = filename
        attachment = os.path.basename(filename)
        return self.send_email_with_attachment_internal(
            sender=sender,
            pwd=pwd,
            to=to,
            title=subject,
            message=body,
            attachment_path=attachment_path,
            attachment=attachment,
            to_send=to_send,
        )

    def send_email_with_attachment_internal(
        self,
        sender: str,
        pwd: str,
        to: str,
        title: str,
        message: str,
        attachment_path: str | None,
        attachment: str | None,
        to_send: bool,
    ) -> str:
        """Send an email

        Args:
            sender (str): The email of the sender
            pwd (str): The password of the sender
            to (str): The email of the recipient
            title (str): The title of the email
            message (str): The message content of the email

        Returns:
            str: Any error messages
        """
        email_sender = sender
        email_password = pwd

        msg = EmailMessage()
        msg["Subject"] = title
        msg["From"] = email_sender
        msg["To"] = to

        signature = EMAIL_SIGNATURE
        if signature:
            message += f"\n{signature}"

        msg.set_content(message)

        if attachment_path:
            ctype, encoding = mimetypes.guess_type(attachment_path)
            if ctype is None or encoding is not None:
                # No guess could be made, or the file is encoded (compressed)
                ctype = "application/octet-stream"
            maintype, subtype = ctype.split("/", 1)
            with open(file=attachment_path, mode="rb") as fp:
                msg.add_attachment(
                    fp.read(), maintype=maintype, subtype=subtype, filename=attachment
                )

        if to_send:
            smtp_host = EMAIL_SMTP_HOST
            smtp_port = EMAIL_SMTP_PORT
            # send email
            with smtplib.SMTP(host=smtp_host, port=smtp_port) as smtp:
                smtp.ehlo()
                smtp.starttls()
                smtp.login(user=email_sender, password=email_password)
                smtp.send_message(msg)
                smtp.quit()
            return f"Email was sent to {to}!"
        else:
            conn = self.imap_open(
                imap_folder="[Gmail]/Drafts",
                email_sender=email_sender,
                email_password=email_password,
            )
            conn.append(
                mailbox="[Gmail]/Drafts",
                flags="",
                date_time=imaplib.Time2Internaldate(time.time()),
                message=str(msg).encode("UTF-8"),
            )
            return f"Email went to [Gmail]/Drafts!"

    def read_emails(
        self,
        sender: str,
        pwd: str,
        imap_folder: str = "inbox",
        imap_search_command: str = "UNSEEN",
        limit: int = 5,
        page: int = 1,
    ) -> str:
        """Read emails from an IMAP mailbox.

        This function reads emails from a specified IMAP folder, using a given IMAP search command, limits, and page numbers.
        It returns a list of emails with their details, including the sender, recipient, date, CC, subject, and message body.

        Args:
            sender (str): The email of the sender
            pwd (str): The password of the sender
            imap_folder (str, optional): The name of the IMAP folder to read emails from. Defaults to "inbox".
            imap_search_command (str, optional): The IMAP search command to filter emails. Defaults to "UNSEEN".
            limit (int, optional): Number of email's the function should return. Defaults to 5 emails.
            page (int, optional): The index of the page result the function should resturn. Defaults to 0, the first page.

        Returns:
            str: A list of dictionaries containing email details if there are any matching emails.
        """
        email_sender = sender
        imap_folder = self.adjust_imap_folder_for_gmail(
            imap_folder=imap_folder, email_sender=email_sender
        )
        imap_folder = self.enclose_with_quotes(imap_folder)
        imap_search_ar = self.split_imap_search_command(imap_search_command)
        email_password = pwd

        mark_as_seen = "False"
        if isinstance(mark_as_seen, str):
            mark_as_seen = json.loads(mark_as_seen.lower())

        conn = self.imap_open(
            imap_folder=imap_folder,
            email_sender=email_sender,
            email_password=email_password,
        )

        imap_keyword = imap_search_ar[0]
        if len(imap_search_ar) == 1:
            _, search_data = conn.search(None, imap_keyword)
        else:
            argument = self.enclose_with_quotes(imap_search_ar[1])
            _, search_data = conn.search(None, imap_keyword, argument)

        messages = []
        for num in search_data[0].split():
            if mark_as_seen:
                message_parts = "(RFC822)"
            else:
                message_parts = "(BODY.PEEK[])"
            _, msg_data = conn.fetch(message_set=num, message_parts=message_parts)
            for response_part in msg_data:
                if isinstance(response_part, tuple):
                    msg = email.message_from_bytes(response_part[1])

                    # If the subject has unknown encoding, return blank
                    if msg["Subject"] is not None:
                        subject, encoding = decode_header(msg["Subject"])[0]
                    else:
                        subject = ""
                        encoding = ""

                    if isinstance(subject, bytes):
                        try:
                            # If the subject has unknown encoding, return blank
                            if encoding is not None:
                                subject = subject.decode(encoding)
                            else:
                                subject = ""
                        except [LookupError] as e:
                            pass

                    body = self.get_email_body(msg)
                    # Clean email body
                    body = self.clean_email_body(body)

                    from_address = msg["From"]
                    to_address = msg["To"]
                    date = msg["Date"]
                    cc = msg["CC"] if msg["CC"] else ""

                    messages.append(
                        {
                            "from": from_address,
                            "to": to_address,
                            "date": date,
                            "cc": cc,
                            "subject": subject,
                            "body": body,
                        }
                    )

        conn.logout()
        if not messages:
            messages.append(
                {
                    "from": "",
                    "to": "",
                    "date": "",
                    "cc": "",
                    "subject": "",
                    "body": "There are no Emails",
                }
            )
            return json.dumps(messages)

        # Confirm that integer parameters are the right type
        limit = int(limit)
        page = int(page)

        # Validate parameter values
        if limit < 1:
            raise ValueError("Error: The message limit should be 1 or greater")

        page_count = len(messages) // limit + (len(messages) % limit > 0)

        if page < 1 or page > page_count:
            raise ValueError(
                "Error: The page value references a page that is not part of the results"
            )

        # Calculate paginated indexes
        start_index = len(messages) - (page * limit + 1)
        end_index = start_index + limit
        start_index = max(start_index, 0)

        # Return paginated indexes
        if start_index == end_index:
            return json.dumps([messages[start_index]])
        else:
            return json.dumps(messages[start_index:end_index])

    def adjust_imap_folder_for_gmail(self, imap_folder: str, email_sender: str) -> str:
        if "@gmail" in email_sender.lower() or "@googlemail" in email_sender.lower():
            if "sent" in imap_folder.lower():
                return '"[Gmail]/Sent Mail"'
            if "draft" in imap_folder.lower():
                return "[Gmail]/Drafts"
        return imap_folder

    def imap_open(
        self, imap_folder: str, email_sender: str, email_password: str
    ) -> imaplib.IMAP4_SSL:
        # Create a new socket object for later connections as a proxy

        # IMAP Server Connect
        imap_server = EMAIL_IMAP_SERVER
        conn = imaplib.IMAP4_SSL(imap_server)
        conn.login(user=email_sender, password=email_password)
        conn.select(imap_folder)
        return conn

    def get_email_body(self, msg: email.message.Message) -> str:
        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get("Content-Disposition"))
                if (
                    content_type == "text/plain"
                    and "attachment" not in content_disposition
                ):
                    # If the email body has unknown encoding, return null
                    try:
                        return part.get_payload(decode=True).decode()
                    except UnicodeDecodeError as e:
                        pass
        else:
            try:
                # If the email body has unknown encoding, return null
                return msg.get_payload(decode=True).decode()
            except UnicodeDecodeError as e:
                pass

    def enclose_with_quotes(self, s):
        # Check if string contains whitespace
        has_whitespace = bool(re.search(r"\s", s))

        # Check if string is already enclosed by quotes
        is_enclosed = s.startswith(("'", '"')) and s.endswith(("'", '"'))

        # If string has whitespace and is not enclosed by quotes, enclose it with double quotes
        if has_whitespace and not is_enclosed:
            return f'"{s}"'
        else:
            return s

    def split_imap_search_command(self, input_string):
        input_string = input_string.strip()
        parts = input_string.split(maxsplit=1)
        parts = [part.strip() for part in parts]

        return parts

    def clean_email_body(self, email_body):
        """Remove formating and URL's from an email's body

        Args:
            email_body (str, optional): The email's body

        Returns:
            str: The email's body without any formating or URL's
        """

        # If body is None, return an empty string
        if email_body is None:
            email_body = ""

        # Remove any HTML tags
        email_body = BeautifulSoup(email_body, "html.parser")
        email_body = email_body.get_text()

        # Remove return characters
        email_body = "".join(email_body.splitlines())

        # Remove extra spaces
        email_body = " ".join(email_body.split())

        # Remove unicode characters
        email_body = email_body.encode("ascii", "ignore")
        email_body = email_body.decode("utf-8", "ignore")

        # Remove any remaining URL's
        email_body = re.sub(r"http\S+", "", email_body)

        return email_body

    def write_attachment(self, filename: str, file_content: str) -> (str, str):
        # create folder for temporarily saving attached file
        milliseconds = int(time.time() * 1000)
        file_path = f"Brain/assets/{milliseconds}/{filename}"
        file_directory = f"Brain/assets/{milliseconds}"
        os.mkdir(file_directory)

        # file write
        file_content = base64.b64decode(file_content).decode("utf-8")
        file = open(file_path, "w")
        file.write(file_content)
        file.close()
        return file_path, file_directory