jupiter0913 commited on
Commit
bd7ae30
·
1 Parent(s): f965d61

feature(#98): code formatting in gmail manage module

Browse files
Brain/src/rising_plugin/gmail/manage_gmail.py CHANGED
@@ -12,320 +12,342 @@ from email.message import EmailMessage
12
  from bs4 import BeautifulSoup
13
 
14
 
15
- def send_email(
16
- sender: str, pwd: str, to: str, subject: str, body: str, to_send: bool
17
- ) -> str:
18
- return send_email_with_attachment_internal(
19
- sender, pwd, to, subject, body, None, None, to_send
20
- )
21
-
22
-
23
- def send_email_with_attachment(
24
- sender: str,
25
- pwd: str,
26
- to: str,
27
- subject: str,
28
- body: str,
29
- filename: str,
30
- to_send: bool,
31
- ) -> str:
32
- attachment_path = filename
33
- attachment = os.path.basename(filename)
34
- return send_email_with_attachment_internal(
35
- sender, pwd, to, subject, body, attachment_path, attachment, to_send
36
- )
37
-
38
-
39
- def send_email_with_attachment_internal(
40
- sender: str,
41
- pwd: str,
42
- to: str,
43
- title: str,
44
- message: str,
45
- attachment_path: str,
46
- attachment: str,
47
- to_send: bool,
48
- ) -> str:
49
- """Send an email
50
-
51
- Args:
52
- sender (str): The email of the sender
53
- pwd (str): The password of the sender
54
- to (str): The email of the recipient
55
- title (str): The title of the email
56
- message (str): The message content of the email
57
-
58
- Returns:
59
- str: Any error messages
60
- """
61
- email_sender = sender
62
- email_password = pwd
63
-
64
- msg = EmailMessage()
65
- msg["Subject"] = title
66
- msg["From"] = email_sender
67
- msg["To"] = to
68
-
69
- signature = os.getenv("EMAIL_SIGNATURE")
70
- if signature:
71
- message += f"\n{signature}"
72
-
73
- msg.set_content(message)
74
-
75
- if attachment_path:
76
- ctype, encoding = mimetypes.guess_type(attachment_path)
77
- if ctype is None or encoding is not None:
78
- # No guess could be made, or the file is encoded (compressed)
79
- ctype = "application/octet-stream"
80
- maintype, subtype = ctype.split("/", 1)
81
- with open(attachment_path, "rb") as fp:
82
- msg.add_attachment(
83
- fp.read(), maintype=maintype, subtype=subtype, filename=attachment
84
- )
85
 
86
- if to_send:
87
- smtp_host = os.getenv("EMAIL_SMTP_HOST")
88
- smtp_port = os.getenv("EMAIL_SMTP_PORT")
89
- # send email
90
- with smtplib.SMTP(smtp_host, smtp_port) as smtp:
91
- smtp.ehlo()
92
- smtp.starttls()
93
- smtp.login(email_sender, email_password)
94
- smtp.send_message(msg)
95
- smtp.quit()
96
- return f"Email was sent to {to}!"
97
- else:
98
- conn = imap_open("[Gmail]/Drafts", email_sender, email_password)
99
- conn.append(
100
- "[Gmail]/Drafts",
101
- "",
102
- imaplib.Time2Internaldate(time.time()),
103
- str(msg).encode("UTF-8"),
 
 
 
104
  )
105
- return f"Email went to [Gmail]/Drafts!"
106
-
107
-
108
- def read_emails(
109
- sender: str,
110
- pwd: str,
111
- imap_folder: str = "inbox",
112
- imap_search_command: str = "UNSEEN",
113
- limit: int = 5,
114
- page: int = 1,
115
- ) -> str:
116
- """Read emails from an IMAP mailbox.
117
-
118
- This function reads emails from a specified IMAP folder, using a given IMAP search command, limits, and page numbers.
119
- It returns a list of emails with their details, including the sender, recipient, date, CC, subject, and message body.
120
-
121
- Args:
122
- sender (str): The email of the sender
123
- pwd (str): The password of the sender
124
- imap_folder (str, optional): The name of the IMAP folder to read emails from. Defaults to "inbox".
125
- imap_search_command (str, optional): The IMAP search command to filter emails. Defaults to "UNSEEN".
126
- limit (int, optional): Number of email's the function should return. Defaults to 5 emails.
127
- page (int, optional): The index of the page result the function should resturn. Defaults to 0, the first page.
128
-
129
- Returns:
130
- str: A list of dictionaries containing email details if there are any matching emails.
131
- """
132
- email_sender = sender
133
- imap_folder = adjust_imap_folder_for_gmail(imap_folder, email_sender)
134
- imap_folder = enclose_with_quotes(imap_folder)
135
- imap_search_ar = split_imap_search_command(imap_search_command)
136
- email_password = pwd
137
-
138
- mark_as_seen = "False"
139
- if isinstance(mark_as_seen, str):
140
- mark_as_seen = json.loads(mark_as_seen.lower())
141
-
142
- conn = imap_open(imap_folder, email_sender, email_password)
143
-
144
- imap_keyword = imap_search_ar[0]
145
- if len(imap_search_ar) == 1:
146
- _, search_data = conn.search(None, imap_keyword)
147
- else:
148
- argument = enclose_with_quotes(imap_search_ar[1])
149
- _, search_data = conn.search(None, imap_keyword, argument)
150
-
151
- messages = []
152
- for num in search_data[0].split():
153
- if mark_as_seen:
154
- message_parts = "(RFC822)"
155
- else:
156
- message_parts = "(BODY.PEEK[])"
157
- _, msg_data = conn.fetch(num, message_parts)
158
- for response_part in msg_data:
159
- if isinstance(response_part, tuple):
160
- msg = email.message_from_bytes(response_part[1])
161
-
162
- # If the subject has unknown encoding, return blank
163
- if msg["Subject"] is not None:
164
- subject, encoding = decode_header(msg["Subject"])[0]
165
- else:
166
- subject = ""
167
- encoding = ""
168
-
169
- if isinstance(subject, bytes):
170
- try:
171
- # If the subject has unknown encoding, return blank
172
- if encoding is not None:
173
- subject = subject.decode(encoding)
174
- else:
175
- subject = ""
176
- except [LookupError] as e:
177
- pass
178
 
179
- body = get_email_body(msg)
180
- # Clean email body
181
- body = clean_email_body(body)
182
-
183
- from_address = msg["From"]
184
- to_address = msg["To"]
185
- date = msg["Date"]
186
- cc = msg["CC"] if msg["CC"] else ""
187
-
188
- messages.append(
189
- {
190
- "From": from_address,
191
- "To": to_address,
192
- "Date": date,
193
- "CC": cc,
194
- "Subject": subject,
195
- "Message Body": body,
196
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
197
  )
198
 
199
- conn.logout()
200
- if not messages:
201
- messages.append(
202
- {
203
- "From": "",
204
- "To": "",
205
- "Date": "",
206
- "CC": "",
207
- "Subject": "",
208
- "Message Body": "There are no Emails",
209
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
210
  )
211
- return json.dumps(messages)
212
-
213
- # Confirm that integer parameters are the right type
214
- limit = int(limit)
215
- page = int(page)
216
 
217
- # Validate parameter values
218
- if limit < 1:
219
- raise ValueError("Error: The message limit should be 1 or greater")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
 
221
- page_count = len(messages) // limit + (len(messages) % limit > 0)
 
 
222
 
223
- if page < 1 or page > page_count:
224
- raise ValueError(
225
- "Error: The page value references a page that is not part of the results"
226
- )
227
 
228
- # Calculate paginated indexes
229
- start_index = len(messages) - (page * limit + 1)
230
- end_index = start_index + limit
231
- start_index = max(start_index, 0)
232
-
233
- # Return paginated indexes
234
- if start_index == end_index:
235
- return json.dumps([messages[start_index]])
236
- else:
237
- return json.dumps(messages[start_index:end_index])
238
-
239
-
240
- def adjust_imap_folder_for_gmail(imap_folder: str, email_sender: str) -> str:
241
- if "@gmail" in email_sender.lower() or "@googlemail" in email_sender.lower():
242
- if "sent" in imap_folder.lower():
243
- return '"[Gmail]/Sent Mail"'
244
- if "draft" in imap_folder.lower():
245
- return "[Gmail]/Drafts"
246
- return imap_folder
247
-
248
-
249
- def imap_open(
250
- imap_folder: str, email_sender: str, email_password: str
251
- ) -> imaplib.IMAP4_SSL:
252
- imap_server = os.getenv("EMAIL_IMAP_SERVER")
253
- conn = imaplib.IMAP4_SSL(imap_server)
254
- conn.login(email_sender, email_password)
255
- conn.select(imap_folder)
256
- return conn
257
-
258
-
259
- def get_email_body(msg: email.message.Message) -> str:
260
- if msg.is_multipart():
261
- for part in msg.walk():
262
- content_type = part.get_content_type()
263
- content_disposition = str(part.get("Content-Disposition"))
264
- if content_type == "text/plain" and "attachment" not in content_disposition:
265
- # If the email body has unknown encoding, return null
266
- try:
267
- return part.get_payload(decode=True).decode()
268
- except UnicodeDecodeError as e:
269
- pass
270
- else:
271
- try:
272
- # If the email body has unknown encoding, return null
273
- return msg.get_payload(decode=True).decode()
274
- except UnicodeDecodeError as e:
275
- pass
276
 
 
 
 
 
277
 
278
- def enclose_with_quotes(s):
279
- # Check if string contains whitespace
280
- has_whitespace = bool(re.search(r"\s", s))
 
281
 
282
- # Check if string is already enclosed by quotes
283
- is_enclosed = s.startswith(("'", '"')) and s.endswith(("'", '"'))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
 
285
- # If string has whitespace and is not enclosed by quotes, enclose it with double quotes
286
- if has_whitespace and not is_enclosed:
287
- return f'"{s}"'
288
- else:
289
- return s
290
 
 
 
291
 
292
- def split_imap_search_command(input_string):
293
- input_string = input_string.strip()
294
- parts = input_string.split(maxsplit=1)
295
- parts = [part.strip() for part in parts]
 
296
 
297
- return parts
 
 
 
298
 
 
299
 
300
- def clean_email_body(email_body):
301
- """Remove formating and URL's from an email's body
302
 
303
- Args:
304
- email_body (str, optional): The email's body
305
 
306
- Returns:
307
- str: The email's body without any formating or URL's
308
- """
309
 
310
- # If body is None, return an empty string
311
- if email_body is None:
312
- email_body = ""
313
 
314
- # Remove any HTML tags
315
- email_body = BeautifulSoup(email_body, "html.parser")
316
- email_body = email_body.get_text()
317
 
318
- # Remove return characters
319
- email_body = "".join(email_body.splitlines())
320
 
321
- # Remove extra spaces
322
- email_body = " ".join(email_body.split())
323
 
324
- # Remove unicode characters
325
- email_body = email_body.encode("ascii", "ignore")
326
- email_body = email_body.decode("utf-8", "ignore")
327
 
328
- # Remove any remaining URL's
329
- email_body = re.sub(r"http\S+", "", email_body)
330
 
331
- return email_body
 
12
  from bs4 import BeautifulSoup
13
 
14
 
15
+ class EmailManager:
16
+ def send_email(
17
+ self, sender: str, pwd: str, to: str, subject: str, body: str, to_send: bool
18
+ ) -> str:
19
+ return self.send_email_with_attachment_internal(
20
+ sender=sender,
21
+ pwd=pwd,
22
+ to=to,
23
+ title=subject,
24
+ message=body,
25
+ attachment=None,
26
+ attachment_path=None,
27
+ to_send=to_send,
28
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
29
 
30
+ def send_email_with_attachment(
31
+ self,
32
+ sender: str,
33
+ pwd: str,
34
+ to: str,
35
+ subject: str,
36
+ body: str,
37
+ filename: str,
38
+ to_send: bool,
39
+ ) -> str:
40
+ attachment_path = filename
41
+ attachment = os.path.basename(filename)
42
+ return self.send_email_with_attachment_internal(
43
+ sender=sender,
44
+ pwd=pwd,
45
+ to=to,
46
+ title=subject,
47
+ message=body,
48
+ attachment_path=attachment_path,
49
+ attachment=attachment,
50
+ to_send=to_send,
51
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ def send_email_with_attachment_internal(
54
+ self,
55
+ sender: str,
56
+ pwd: str,
57
+ to: str,
58
+ title: str,
59
+ message: str,
60
+ attachment_path: str | None,
61
+ attachment: str | None,
62
+ to_send: bool,
63
+ ) -> str:
64
+ """Send an email
65
+
66
+ Args:
67
+ sender (str): The email of the sender
68
+ pwd (str): The password of the sender
69
+ to (str): The email of the recipient
70
+ title (str): The title of the email
71
+ message (str): The message content of the email
72
+
73
+ Returns:
74
+ str: Any error messages
75
+ """
76
+ email_sender = sender
77
+ email_password = pwd
78
+
79
+ msg = EmailMessage()
80
+ msg["Subject"] = title
81
+ msg["From"] = email_sender
82
+ msg["To"] = to
83
+
84
+ signature = os.getenv("EMAIL_SIGNATURE")
85
+ if signature:
86
+ message += f"\n{signature}"
87
+
88
+ msg.set_content(message)
89
+
90
+ if attachment_path:
91
+ ctype, encoding = mimetypes.guess_type(attachment_path)
92
+ if ctype is None or encoding is not None:
93
+ # No guess could be made, or the file is encoded (compressed)
94
+ ctype = "application/octet-stream"
95
+ maintype, subtype = ctype.split("/", 1)
96
+ with open(file=attachment_path, mode="rb") as fp:
97
+ msg.add_attachment(
98
+ fp.read(), maintype=maintype, subtype=subtype, filename=attachment
99
  )
100
 
101
+ if to_send:
102
+ smtp_host = os.getenv("EMAIL_SMTP_HOST")
103
+ smtp_port = os.getenv("EMAIL_SMTP_PORT")
104
+ # send email
105
+ with smtplib.SMTP(host=smtp_host, port=smtp_port) as smtp:
106
+ smtp.ehlo()
107
+ smtp.starttls()
108
+ smtp.login(user=email_sender, password=email_password)
109
+ smtp.send_message(msg)
110
+ smtp.quit()
111
+ return f"Email was sent to {to}!"
112
+ else:
113
+ conn = self.imap_open(
114
+ imap_folder="[Gmail]/Drafts",
115
+ email_sender=email_sender,
116
+ email_password=email_password,
117
+ )
118
+ conn.append(
119
+ mailbox="[Gmail]/Drafts",
120
+ flags="",
121
+ date_time=imaplib.Time2Internaldate(time.time()),
122
+ message=str(msg).encode("UTF-8"),
123
+ )
124
+ return f"Email went to [Gmail]/Drafts!"
125
+
126
+ def read_emails(
127
+ self,
128
+ sender: str,
129
+ pwd: str,
130
+ imap_folder: str = "inbox",
131
+ imap_search_command: str = "UNSEEN",
132
+ limit: int = 5,
133
+ page: int = 1,
134
+ ) -> str:
135
+ """Read emails from an IMAP mailbox.
136
+
137
+ This function reads emails from a specified IMAP folder, using a given IMAP search command, limits, and page numbers.
138
+ It returns a list of emails with their details, including the sender, recipient, date, CC, subject, and message body.
139
+
140
+ Args:
141
+ sender (str): The email of the sender
142
+ pwd (str): The password of the sender
143
+ imap_folder (str, optional): The name of the IMAP folder to read emails from. Defaults to "inbox".
144
+ imap_search_command (str, optional): The IMAP search command to filter emails. Defaults to "UNSEEN".
145
+ limit (int, optional): Number of email's the function should return. Defaults to 5 emails.
146
+ page (int, optional): The index of the page result the function should resturn. Defaults to 0, the first page.
147
+
148
+ Returns:
149
+ str: A list of dictionaries containing email details if there are any matching emails.
150
+ """
151
+ email_sender = sender
152
+ imap_folder = self.adjust_imap_folder_for_gmail(
153
+ imap_folder=imap_folder, email_sender=email_sender
154
+ )
155
+ imap_folder = self.enclose_with_quotes(imap_folder)
156
+ imap_search_ar = self.split_imap_search_command(imap_search_command)
157
+ email_password = pwd
158
+
159
+ mark_as_seen = "False"
160
+ if isinstance(mark_as_seen, str):
161
+ mark_as_seen = json.loads(mark_as_seen.lower())
162
+
163
+ conn = self.imap_open(
164
+ imap_folder=imap_folder,
165
+ email_sender=email_sender,
166
+ email_password=email_password,
167
  )
 
 
 
 
 
168
 
169
+ imap_keyword = imap_search_ar[0]
170
+ if len(imap_search_ar) == 1:
171
+ _, search_data = conn.search(None, imap_keyword)
172
+ else:
173
+ argument = self.enclose_with_quotes(imap_search_ar[1])
174
+ _, search_data = conn.search(None, imap_keyword, argument)
175
+
176
+ messages = []
177
+ for num in search_data[0].split():
178
+ if mark_as_seen:
179
+ message_parts = "(RFC822)"
180
+ else:
181
+ message_parts = "(BODY.PEEK[])"
182
+ _, msg_data = conn.fetch(message_set=num, message_parts=message_parts)
183
+ for response_part in msg_data:
184
+ if isinstance(response_part, tuple):
185
+ msg = email.message_from_bytes(response_part[1])
186
+
187
+ # If the subject has unknown encoding, return blank
188
+ if msg["Subject"] is not None:
189
+ subject, encoding = decode_header(msg["Subject"])[0]
190
+ else:
191
+ subject = ""
192
+ encoding = ""
193
+
194
+ if isinstance(subject, bytes):
195
+ try:
196
+ # If the subject has unknown encoding, return blank
197
+ if encoding is not None:
198
+ subject = subject.decode(encoding)
199
+ else:
200
+ subject = ""
201
+ except [LookupError] as e:
202
+ pass
203
+
204
+ body = self.get_email_body(msg)
205
+ # Clean email body
206
+ body = self.clean_email_body(body)
207
+
208
+ from_address = msg["From"]
209
+ to_address = msg["To"]
210
+ date = msg["Date"]
211
+ cc = msg["CC"] if msg["CC"] else ""
212
+
213
+ messages.append(
214
+ {
215
+ "From": from_address,
216
+ "To": to_address,
217
+ "Date": date,
218
+ "CC": cc,
219
+ "Subject": subject,
220
+ "Message Body": body,
221
+ }
222
+ )
223
+
224
+ conn.logout()
225
+ if not messages:
226
+ messages.append(
227
+ {
228
+ "From": "",
229
+ "To": "",
230
+ "Date": "",
231
+ "CC": "",
232
+ "Subject": "",
233
+ "Message Body": "There are no Emails",
234
+ }
235
+ )
236
+ return json.dumps(messages)
237
 
238
+ # Confirm that integer parameters are the right type
239
+ limit = int(limit)
240
+ page = int(page)
241
 
242
+ # Validate parameter values
243
+ if limit < 1:
244
+ raise ValueError("Error: The message limit should be 1 or greater")
 
245
 
246
+ page_count = len(messages) // limit + (len(messages) % limit > 0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
247
 
248
+ if page < 1 or page > page_count:
249
+ raise ValueError(
250
+ "Error: The page value references a page that is not part of the results"
251
+ )
252
 
253
+ # Calculate paginated indexes
254
+ start_index = len(messages) - (page * limit + 1)
255
+ end_index = start_index + limit
256
+ start_index = max(start_index, 0)
257
 
258
+ # Return paginated indexes
259
+ if start_index == end_index:
260
+ return json.dumps([messages[start_index]])
261
+ else:
262
+ return json.dumps(messages[start_index:end_index])
263
+
264
+ def adjust_imap_folder_for_gmail(self, imap_folder: str, email_sender: str) -> str:
265
+ if "@gmail" in email_sender.lower() or "@googlemail" in email_sender.lower():
266
+ if "sent" in imap_folder.lower():
267
+ return '"[Gmail]/Sent Mail"'
268
+ if "draft" in imap_folder.lower():
269
+ return "[Gmail]/Drafts"
270
+ return imap_folder
271
+
272
+ def imap_open(
273
+ self, imap_folder: str, email_sender: str, email_password: str
274
+ ) -> imaplib.IMAP4_SSL:
275
+ imap_server = os.getenv("EMAIL_IMAP_SERVER")
276
+ conn = imaplib.IMAP4_SSL(imap_server)
277
+ conn.login(user=email_sender, password=email_password)
278
+ conn.select(imap_folder)
279
+ return conn
280
+
281
+ def get_email_body(self, msg: email.message.Message) -> str:
282
+ if msg.is_multipart():
283
+ for part in msg.walk():
284
+ content_type = part.get_content_type()
285
+ content_disposition = str(part.get("Content-Disposition"))
286
+ if (
287
+ content_type == "text/plain"
288
+ and "attachment" not in content_disposition
289
+ ):
290
+ # If the email body has unknown encoding, return null
291
+ try:
292
+ return part.get_payload(decode=True).decode()
293
+ except UnicodeDecodeError as e:
294
+ pass
295
+ else:
296
+ try:
297
+ # If the email body has unknown encoding, return null
298
+ return msg.get_payload(decode=True).decode()
299
+ except UnicodeDecodeError as e:
300
+ pass
301
 
302
+ def enclose_with_quotes(self, s):
303
+ # Check if string contains whitespace
304
+ has_whitespace = bool(re.search(r"\s", s))
 
 
305
 
306
+ # Check if string is already enclosed by quotes
307
+ is_enclosed = s.startswith(("'", '"')) and s.endswith(("'", '"'))
308
 
309
+ # If string has whitespace and is not enclosed by quotes, enclose it with double quotes
310
+ if has_whitespace and not is_enclosed:
311
+ return f'"{s}"'
312
+ else:
313
+ return s
314
 
315
+ def split_imap_search_command(self, input_string):
316
+ input_string = input_string.strip()
317
+ parts = input_string.split(maxsplit=1)
318
+ parts = [part.strip() for part in parts]
319
 
320
+ return parts
321
 
322
+ def clean_email_body(self, email_body):
323
+ """Remove formating and URL's from an email's body
324
 
325
+ Args:
326
+ email_body (str, optional): The email's body
327
 
328
+ Returns:
329
+ str: The email's body without any formating or URL's
330
+ """
331
 
332
+ # If body is None, return an empty string
333
+ if email_body is None:
334
+ email_body = ""
335
 
336
+ # Remove any HTML tags
337
+ email_body = BeautifulSoup(email_body, "html.parser")
338
+ email_body = email_body.get_text()
339
 
340
+ # Remove return characters
341
+ email_body = "".join(email_body.splitlines())
342
 
343
+ # Remove extra spaces
344
+ email_body = " ".join(email_body.split())
345
 
346
+ # Remove unicode characters
347
+ email_body = email_body.encode("ascii", "ignore")
348
+ email_body = email_body.decode("utf-8", "ignore")
349
 
350
+ # Remove any remaining URL's
351
+ email_body = re.sub(r"http\S+", "", email_body)
352
 
353
+ return email_body