jupiter0913 commited on
Commit
c9987a0
·
1 Parent(s): e540ba4

feature(#98): implement read and send function to manage email

Browse files
Brain/src/rising_plugin/gmail/manage_gmail.py CHANGED
@@ -1,45 +1,298 @@
1
- from langchain import OpenAI
2
- from langchain.agents import initialize_agent, AgentType
3
- from langchain.tools.gmail.utils import build_resource_service, get_gmail_credentials
4
- from langchain.agents.agent_toolkits import GmailToolkit
5
- from langchain.agents.agent import AgentExecutor
6
-
7
-
8
- def get_agent() -> AgentExecutor:
9
- # Can review scopes here https://developers.google.com/gmail/api/auth/scopes
10
- # For instance, readonly scope is 'https://www.googleapis.com/auth/gmail.readonly'
11
- credentials = get_gmail_credentials(
12
- token_file="token.json",
13
- scopes=["https://mail.google.com/"],
14
- client_secrets_file="credentials.json",
15
- )
16
- api_resource = build_resource_service(credentials=credentials)
17
- toolkit = GmailToolkit(api_resource=api_resource)
18
-
19
- llm = OpenAI(temperature=0)
20
- agent = initialize_agent(
21
- tools=toolkit.get_tools(),
22
- llm=llm,
23
- agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
24
- )
25
- return agent
26
 
 
 
27
 
28
- def read_emails() -> str:
29
- agent = get_agent()
30
- result = agent.run(
31
- "Could you search in my inbox for the latest email?"
 
 
32
  )
33
- return result
34
 
35
 
36
- def write_email(query: str) -> str:
37
- agent = get_agent()
38
- result = agent.run(query)
39
- return result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
 
 
41
 
42
- def send_email(query: str) -> str:
43
- agent = get_agent()
44
- result = agent.run(query)
45
- return result
 
1
+ import email
2
+ import imaplib
3
+ import json
4
+ import mimetypes
5
+ import os
6
+ import re
7
+ import smtplib
8
+ import time
9
+ from email.header import decode_header
10
+ from email.message import EmailMessage
11
+
12
+ from bs4 import BeautifulSoup
13
+
 
 
 
 
 
 
 
 
 
 
 
 
14
 
15
+ def send_email(sender: str, pwd: str, to: str, subject: str, body: str, to_send: bool) -> str:
16
+ return send_email_with_attachment_internal(sender, pwd, to, subject, body, None, None, to_send)
17
 
18
+
19
+ def send_email_with_attachment(sender: str, pwd: str, to: str, subject: str, body: str, filename: str, to_send: bool) -> str:
20
+ attachment_path = filename
21
+ attachment = os.path.basename(filename)
22
+ return send_email_with_attachment_internal(
23
+ sender, pwd, to, subject, body, attachment_path, attachment, to_send
24
  )
 
25
 
26
 
27
+ def send_email_with_attachment_internal(
28
+ sender: str, pwd: str, to: str, title: str, message: str, attachment_path: str, attachment: str, to_send: bool
29
+ ) -> str:
30
+ """Send an email
31
+
32
+ Args:
33
+ sender (str): The email of the sender
34
+ pwd (str): The password of the sender
35
+ to (str): The email of the recipient
36
+ title (str): The title of the email
37
+ message (str): The message content of the email
38
+
39
+ Returns:
40
+ str: Any error messages
41
+ """
42
+ email_sender = sender
43
+ email_password = pwd
44
+
45
+ msg = EmailMessage()
46
+ msg["Subject"] = title
47
+ msg["From"] = email_sender
48
+ msg["To"] = to
49
+
50
+ signature = os.getenv("EMAIL_SIGNATURE")
51
+ if signature:
52
+ message += f"\n{signature}"
53
+
54
+ msg.set_content(message)
55
+
56
+ if attachment_path:
57
+ ctype, encoding = mimetypes.guess_type(attachment_path)
58
+ if ctype is None or encoding is not None:
59
+ # No guess could be made, or the file is encoded (compressed)
60
+ ctype = "application/octet-stream"
61
+ maintype, subtype = ctype.split("/", 1)
62
+ with open(attachment_path, "rb") as fp:
63
+ msg.add_attachment(
64
+ fp.read(), maintype=maintype, subtype=subtype, filename=attachment
65
+ )
66
+
67
+ if to_send:
68
+ smtp_host = os.getenv("EMAIL_SMTP_HOST")
69
+ smtp_port = os.getenv("EMAIL_SMTP_PORT")
70
+ # send email
71
+ with smtplib.SMTP(smtp_host, smtp_port) as smtp:
72
+ smtp.ehlo()
73
+ smtp.starttls()
74
+ smtp.login(email_sender, email_password)
75
+ smtp.send_message(msg)
76
+ smtp.quit()
77
+ return f"Email was sent to {to}!"
78
+ else:
79
+ conn = imap_open("[Gmail]/Drafts", email_sender, email_password)
80
+ conn.append(
81
+ "[Gmail]/Drafts",
82
+ "",
83
+ imaplib.Time2Internaldate(time.time()),
84
+ str(msg).encode("UTF-8"),
85
+ )
86
+ return f"Email went to [Gmail]/Drafts!"
87
+
88
+
89
+ def read_emails(
90
+ sender: str, pwd: str, imap_folder: str = "inbox", imap_search_command: str = "UNSEEN", limit: int = 5,
91
+ page: int = 1) -> str:
92
+ """Read emails from an IMAP mailbox.
93
+
94
+ This function reads emails from a specified IMAP folder, using a given IMAP search command, limits, and page numbers.
95
+ It returns a list of emails with their details, including the sender, recipient, date, CC, subject, and message body.
96
+
97
+ Args:
98
+ sender (str): The email of the sender
99
+ pwd (str): The password of the sender
100
+ imap_folder (str, optional): The name of the IMAP folder to read emails from. Defaults to "inbox".
101
+ imap_search_command (str, optional): The IMAP search command to filter emails. Defaults to "UNSEEN".
102
+ limit (int, optional): Number of email's the function should return. Defaults to 5 emails.
103
+ page (int, optional): The index of the page result the function should resturn. Defaults to 0, the first page.
104
+
105
+ Returns:
106
+ str: A list of dictionaries containing email details if there are any matching emails. Otherwise, returns
107
+ a string indicating that no matching emails were found.
108
+ """
109
+ email_sender = sender
110
+ imap_folder = adjust_imap_folder_for_gmail(imap_folder, email_sender)
111
+ imap_folder = enclose_with_quotes(imap_folder)
112
+ imap_search_ar = split_imap_search_command(imap_search_command)
113
+ email_password = pwd
114
+
115
+ mark_as_seen = "False"
116
+ if isinstance(mark_as_seen, str):
117
+ mark_as_seen = json.loads(mark_as_seen.lower())
118
+
119
+ conn = imap_open(imap_folder, email_sender, email_password)
120
+
121
+ imap_keyword = imap_search_ar[0]
122
+ if len(imap_search_ar) == 1:
123
+ _, search_data = conn.search(None, imap_keyword)
124
+ else:
125
+ argument = enclose_with_quotes(imap_search_ar[1])
126
+ _, search_data = conn.search(None, imap_keyword, argument)
127
+
128
+ messages = []
129
+ for num in search_data[0].split():
130
+ if mark_as_seen:
131
+ message_parts = "(RFC822)"
132
+ else:
133
+ message_parts = "(BODY.PEEK[])"
134
+ _, msg_data = conn.fetch(num, message_parts)
135
+ for response_part in msg_data:
136
+ if isinstance(response_part, tuple):
137
+ msg = email.message_from_bytes(response_part[1])
138
+
139
+ # If the subject has unknown encoding, return blank
140
+ if msg["Subject"] is not None:
141
+ subject, encoding = decode_header(msg["Subject"])[0]
142
+ else:
143
+ subject = ""
144
+ encoding = ""
145
+
146
+ if isinstance(subject, bytes):
147
+ try:
148
+ # If the subject has unknown encoding, return blank
149
+ if encoding is not None:
150
+ subject = subject.decode(encoding)
151
+ else:
152
+ subject = ""
153
+ except [LookupError] as e:
154
+ pass
155
+
156
+ body = get_email_body(msg)
157
+ # Clean email body
158
+ body = clean_email_body(body)
159
+
160
+ from_address = msg["From"]
161
+ to_address = msg["To"]
162
+ date = msg["Date"]
163
+ cc = msg["CC"] if msg["CC"] else ""
164
+
165
+ messages.append(
166
+ {
167
+ "From": from_address,
168
+ "To": to_address,
169
+ "Date": date,
170
+ "CC": cc,
171
+ "Subject": subject,
172
+ "Message Body": body,
173
+ }
174
+ )
175
+
176
+ conn.logout()
177
+ if not messages:
178
+ return (
179
+ f"There are no Emails in your folder `{imap_folder}` "
180
+ f"when searching with imap command `{imap_search_command}`"
181
+ )
182
+
183
+ # Confirm that integer parameters are the right type
184
+ limit = int(limit)
185
+ page = int(page)
186
+
187
+ # Validate parameter values
188
+ if limit < 1:
189
+ raise ValueError("Error: The message limit should be 1 or greater")
190
+
191
+ page_count = len(messages) // limit + (len(messages) % limit > 0)
192
+
193
+ if page < 1 or page > page_count:
194
+ raise ValueError("Error: The page value references a page that is not part of the results")
195
+
196
+ # Calculate paginated indexes
197
+ start_index = len(messages) - (page * limit + 1)
198
+ end_index = start_index + limit
199
+ start_index = max(start_index, 0)
200
+
201
+ # Return paginated indexes
202
+ if start_index == end_index:
203
+ return [messages[start_index]]
204
+ else:
205
+ return messages[start_index:end_index]
206
+
207
+
208
+ def adjust_imap_folder_for_gmail(imap_folder: str, email_sender: str) -> str:
209
+ if "@gmail" in email_sender.lower() or "@googlemail" in email_sender.lower():
210
+ if "sent" in imap_folder.lower():
211
+ return '"[Gmail]/Sent Mail"'
212
+ if "draft" in imap_folder.lower():
213
+ return "[Gmail]/Drafts"
214
+ return imap_folder
215
+
216
+
217
+ def imap_open(
218
+ imap_folder: str, email_sender: str, email_password: str
219
+ ) -> imaplib.IMAP4_SSL:
220
+ imap_server = os.getenv("EMAIL_IMAP_SERVER")
221
+ conn = imaplib.IMAP4_SSL(imap_server)
222
+ conn.login(email_sender, email_password)
223
+ conn.select(imap_folder)
224
+ return conn
225
+
226
+
227
+ def get_email_body(msg: email.message.Message) -> str:
228
+ if msg.is_multipart():
229
+ for part in msg.walk():
230
+ content_type = part.get_content_type()
231
+ content_disposition = str(part.get("Content-Disposition"))
232
+ if content_type == "text/plain" and "attachment" not in content_disposition:
233
+ # If the email body has unknown encoding, return null
234
+ try:
235
+ return part.get_payload(decode=True).decode()
236
+ except UnicodeDecodeError as e:
237
+ pass
238
+ else:
239
+ try:
240
+ # If the email body has unknown encoding, return null
241
+ return msg.get_payload(decode=True).decode()
242
+ except UnicodeDecodeError as e:
243
+ pass
244
+
245
+
246
+ def enclose_with_quotes(s):
247
+ # Check if string contains whitespace
248
+ has_whitespace = bool(re.search(r"\s", s))
249
+
250
+ # Check if string is already enclosed by quotes
251
+ is_enclosed = s.startswith(("'", '"')) and s.endswith(("'", '"'))
252
+
253
+ # If string has whitespace and is not enclosed by quotes, enclose it with double quotes
254
+ if has_whitespace and not is_enclosed:
255
+ return f'"{s}"'
256
+ else:
257
+ return s
258
+
259
+
260
+ def split_imap_search_command(input_string):
261
+ input_string = input_string.strip()
262
+ parts = input_string.split(maxsplit=1)
263
+ parts = [part.strip() for part in parts]
264
+
265
+ return parts
266
+
267
+
268
+ def clean_email_body(email_body):
269
+ """Remove formating and URL's from an email's body
270
+
271
+ Args:
272
+ email_body (str, optional): The email's body
273
+
274
+ Returns:
275
+ str: The email's body without any formating or URL's
276
+ """
277
+
278
+ # If body is None, return an empty string
279
+ if email_body is None: email_body = ""
280
+
281
+ # Remove any HTML tags
282
+ email_body = BeautifulSoup(email_body, "html.parser")
283
+ email_body = email_body.get_text()
284
+
285
+ # Remove return characters
286
+ email_body = "".join(email_body.splitlines())
287
+
288
+ # Remove extra spaces
289
+ email_body = " ".join(email_body.split())
290
+
291
+ # Remove unicode characters
292
+ email_body = email_body.encode("ascii", "ignore")
293
+ email_body = email_body.decode("utf-8", "ignore")
294
 
295
+ # Remove any remaining URL's
296
+ email_body = re.sub(r"http\S+", "", email_body)
297
 
298
+ return email_body
 
 
 
Brain/src/rising_plugin/gmail/token.json DELETED
@@ -1 +0,0 @@
1
- {}