Spaces:
Sleeping
Sleeping
| from html.parser import HTMLParser | |
| from io import StringIO | |
| import email | |
| import imaplib | |
| def set_credentials(username, password): | |
| """Sets the IMAP credentials. and check if the credentials are valid. | |
| Args: | |
| username: The Gmail username. | |
| password: The Gmail password. | |
| """ | |
| try: | |
| imap_server = 'imap.gmail.com' | |
| imap_port = 993 | |
| # Create an IMAP connection. | |
| imap_connection = imaplib.IMAP4_SSL(imap_server, imap_port) | |
| # Login to the IMAP server. | |
| imap_connection.login(username, password) | |
| return True | |
| except: | |
| return False | |
| def fetch_emails_from_imap(username, password): | |
| """Fetches emails from IMAP with pagination. | |
| Args: | |
| username: The Gmail username. | |
| password: The Gmail password. | |
| page_number: The current page number. | |
| page_size: The number of emails to display per page. | |
| Returns: | |
| A list of email messages. | |
| """ | |
| imap_server = 'imap.gmail.com' | |
| imap_port = 993 | |
| # Create an IMAP connection. | |
| imap_connection = imaplib.IMAP4_SSL(imap_server, imap_port) | |
| # Login to the IMAP server. | |
| imap_connection.login(username, password) | |
| # print(f"{imap_connection.list()[1][0] = }") | |
| # Select the INBOX mailbox. | |
| imap_connection.select('INBOX', readonly=True) | |
| # Search for all unread emails. | |
| emails = imap_connection.search(None, 'X-GM-RAW "Category:Primary"', "UNSEEN") | |
| # Get the email IDs. | |
| email_ids = emails[1][0].decode().split(' ') | |
| # Get the email messages for the current page. | |
| imap_connection.close() | |
| email_ids.reverse() | |
| return email_ids | |
| def decode_emails(email_ids, start_index, end_index, username, password): | |
| imap_server = 'imap.gmail.com' | |
| imap_port = 993 | |
| # Create an IMAP connection. | |
| imap_connection = imaplib.IMAP4_SSL(imap_server, imap_port) | |
| imap_connection.login(username, password) | |
| imap_connection.select('INBOX', readonly=True) | |
| email_messages = [] | |
| for email_id in email_ids[start_index:end_index]: | |
| email_message = imap_connection.fetch(email_id, '(RFC822)')[1][0][1] | |
| msg = email.message_from_bytes( | |
| email_message | |
| ) | |
| email_subject = msg['subject'] | |
| text, encoding = email.header.decode_header(msg['subject'])[0] | |
| if encoding: | |
| email_subject = text.decode(encoding) | |
| email_from = msg['from'] | |
| email_content = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| if part.get_content_type() == "text/plain": | |
| email_content = part.get_payload(decode=True).decode('utf-8', | |
| errors='ignore') | |
| break | |
| else: | |
| email_content = msg.get_payload(decode=True).decode('utf-8', | |
| errors='ignore') | |
| # Extract Message-ID, In-Reply-To, and References headers | |
| message_id = msg.get("Message-ID", "") | |
| in_reply_to = msg.get("In-Reply-To", "") | |
| # Identify the thread or create a new one | |
| SingleEmail = { | |
| 'Message ID': message_id, | |
| 'from': email_from, | |
| 'subject': email_subject, | |
| 'content': email_content, | |
| 'IsReply': bool(in_reply_to), # Check if it's a reply | |
| 'InReplyTo': in_reply_to, # Add the ID of the parent message | |
| 'StoreReplyThread': [], | |
| # 'summary': llm.summarize(email_content) | |
| } | |
| email_messages.append(SingleEmail) | |
| # Close the IMAP connection. | |
| imap_connection.close() | |
| return email_messages | |
| class MLStripper(HTMLParser): | |
| def __init__(self): | |
| super().__init__() | |
| self.reset() | |
| self.strict = False | |
| self.convert_charrefs = True | |
| self.text = StringIO() | |
| def handle_data(self, d): | |
| self.text.write(d) | |
| def get_data(self): | |
| return self.text.getvalue() | |
| def strip_tags(html): | |
| s = MLStripper() | |
| s.feed(html) | |
| return s.get_data() | |