Spaces:
Sleeping
Sleeping
| import imaplib | |
| import email | |
| from email.header import decode_header | |
| import os | |
| from dotenv import load_dotenv | |
| from bs4 import BeautifulSoup # A library for parsing HTML | |
| # Load the environment variables from your .env file | |
| load_dotenv() | |
| def fetch_latest_emails(num_emails=5): | |
| try: | |
| # Get credentials securely from the .env file | |
| username = os.getenv("GMAIL_USER") | |
| password = os.getenv("GMAIL_PASSWORD") | |
| if not username or not password: | |
| print("Error: GMAIL_USER and GMAIL_PASSWORD must be set in the .env file.") | |
| return [] | |
| # Connect to Gmail's server | |
| mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
| mail.login(username, password) | |
| mail.select("inbox") | |
| # Search for all emails and get their IDs | |
| result, data = mail.search(None, "ALL") | |
| mail_ids = data[0].split() | |
| emails = [] | |
| # Fetch the most recent emails based on num_emails | |
| for i in mail_ids[-num_emails:]: | |
| result, data = mail.fetch(i, "(RFC822)") | |
| raw_email = data[0][1] | |
| msg = email.message_from_bytes(raw_email) | |
| # Decode the subject line safely | |
| subject, encoding = decode_header(msg["Subject"])[0] | |
| if isinstance(subject, bytes): | |
| subject = subject.decode(encoding or "utf-8", errors="ignore") | |
| # Get the sender's address | |
| from_ = msg.get("From", "") | |
| # --- Improved Body Parsing --- | |
| body = "" | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| content_disposition = str(part.get("Content-Disposition")) | |
| # Skip attachments | |
| if "attachment" not in content_disposition: | |
| if content_type == "text/plain": | |
| try: | |
| body = part.get_payload(decode=True).decode() | |
| break # Found plain text, stop searching | |
| except: | |
| continue | |
| elif content_type == "text/html": | |
| # If we only find HTML, parse it to get clean text | |
| try: | |
| html_body = part.get_payload(decode=True).decode() | |
| soup = BeautifulSoup(html_body, "html.parser") | |
| body = soup.get_text() | |
| except: | |
| continue | |
| else: | |
| # For non-multipart emails, just get the payload | |
| try: | |
| body = msg.get_payload(decode=True).decode() | |
| except: | |
| body = "" | |
| emails.append({ | |
| "From": from_, | |
| "Subject": subject, | |
| "Body": body.strip() | |
| }) | |
| mail.logout() | |
| # Return emails in reverse order to show the newest one first | |
| return emails[::-1] | |
| except Exception as e: | |
| print(f"An error occurred while fetching emails: {e}") | |
| return [] | |
| # NEW, ROBUST CODE for the end of gmail_fetcher.py | |
| if __name__ == "__main__": | |
| latest_emails = fetch_latest_emails() | |
| if latest_emails: | |
| print(f"Successfully fetched {len(latest_emails)} emails.") | |
| for e in latest_emails: | |
| # Safely encode and decode each part to prevent crashes in the terminal | |
| safe_from = e['From'].encode('utf-8', 'replace').decode('utf-8') | |
| safe_subject = e['Subject'].encode('utf-8', 'replace').decode('utf-8') | |
| safe_body = e['Body'][:100].encode('utf-8', 'replace').decode('utf-8') | |
| print(f"From: {safe_from}\nSubject: {safe_subject}\nBody: {safe_body}...\n") | |