Spaces:
Running
Running
| import imaplib | |
| import email | |
| import os | |
| __all__ = ['GetEmails'] | |
| class GetEmails: | |
| dependencies = ['os'] | |
| inputSchema = { | |
| "name": "GetEmails", | |
| "description": "Reads emails from an IMAP server.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "num_emails": { | |
| "type": "integer", | |
| "description": "Number of emails to retrieve (default: 5).", | |
| "default": 5 | |
| } | |
| }, | |
| "required": [] | |
| } | |
| } | |
| def __init__(self): | |
| # Replace with your actual email provider's IMAP server address | |
| self.imap_server = "imap.gmail.com" # e.g., imap.gmail.com | |
| self.email_address = os.environ.get("EMAIL_ADDRESS") # Replace with your email address | |
| self.password = os.environ.get("EMAIL_PASSWORD") # Replace with your password or app password | |
| def run(self, **kwargs): | |
| num_emails = kwargs.get("num_emails", 5) | |
| try: | |
| # Connect to the IMAP server | |
| mail = imaplib.IMAP4_SSL(self.imap_server) | |
| mail.login(self.email_address, self.password) | |
| mail.select("inbox") # You can select other mailboxes like "sent" | |
| # Search for emails (ALL = all emails, or use specific criteria) | |
| result, data = mail.search(None, "ALL") | |
| mail_ids = data[0].split() | |
| # Get the most recent emails | |
| recent_emails = mail_ids[-num_emails:] | |
| email_messages = [] | |
| for num in recent_emails: | |
| result, msg_data = mail.fetch(num, '(RFC822)') | |
| raw_email = msg_data[0][1] | |
| msg = email.message_from_bytes(raw_email) | |
| # Decode email headers and body | |
| try: | |
| subject = email.header.decode_header(msg["Subject"])[0][0] | |
| if isinstance(subject, bytes): | |
| subject = subject.decode() | |
| except: | |
| subject = "No Subject" | |
| try: | |
| from_ = email.header.decode_header(msg.get("From"))[0][0] | |
| if isinstance(from_, bytes): | |
| from_ = from_.decode() | |
| except: | |
| from_ = "Unknown Sender" | |
| # Extract the body of the email | |
| body = "" # Initialize body to an empty string | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| ctype = part.get_content_type() | |
| cdispo = str(part.get("Content-Disposition")) | |
| if ctype == "text/plain" and "attachment" not in cdispo: | |
| body = part.get_payload(decode=True).decode() | |
| break | |
| else: | |
| body = msg.get_payload(decode=True).decode() | |
| email_messages.append({ | |
| "subject": subject, | |
| "from": from_, | |
| "body": body | |
| }) | |
| mail.close() | |
| mail.logout() | |
| return { | |
| "status": "success", | |
| "message": "Emails retrieved successfully.", | |
| "output": email_messages | |
| } | |
| except Exception as e: | |
| return { | |
| "status": "error", | |
| "message": f"An error occurred: {str(e)}", | |
| "output": None | |
| } | |