import os import imaplib import email from email.header import decode_header from dotenv import load_dotenv import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load environment variables from .env file load_dotenv() # Function to clean text def clean(text): return "".join(c if c.isalnum() else "_" for c in text) class EmailScraper: def __init__(self, username=None, password=None): """ Initialize the EmailScraper with optional username and password. If not provided, credentials are fetched from environment variables. """ self.username = username or os.getenv("GMAIL_USERNAME") self.password = password or os.getenv("GMAIL_PASSWORD") if not self.username or not self.password: raise ValueError("Gmail credentials are not provided or set in environment variables.") def _connect(self): """Connect to the Gmail IMAP server and login.""" imap = imaplib.IMAP4_SSL("imap.gmail.com") imap.login(self.username, self.password) return imap def scrape_emails(self, folder="INBOX"): """Scrape all emails from the specified folder.""" try: imap = self._connect() imap.select(folder) status, messages = imap.search(None, "ALL") messages = messages[0].split() for mail in messages: res, msg = imap.fetch(mail, "(RFC822)") for response in msg: if isinstance(response, tuple): msg = email.message_from_bytes(response[1]) subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else "utf-8") print("Subject:", subject) from_ = msg.get("From") print("From:", from_) if msg.is_multipart(): for part in msg.walk(): try: body = part.get_payload(decode=True).decode() print("Body:", body) except: pass else: body = msg.get_payload(decode=True).decode() print("Body:", body) imap.close() imap.logout() except Exception as e: print("An error occurred:", e) def scrape_latest_emails(self, folder="INBOX", count=5, blocklist=None): """Scrape the latest emails with optional blocklist filtering.""" try: imap = self._connect() imap.select(folder) status, messages = imap.search(None, "ALL") messages = messages[0].split() latest_emails = messages[-count:] email_data = {} blocklist = blocklist or [] for mail in reversed(latest_emails): try: res, msg = imap.fetch(mail, "(RFC822)") for response in msg: if isinstance(response, tuple): msg = email.message_from_bytes(response[1]) subject, encoding = decode_header(msg["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(encoding if encoding else "utf-8") from_ = msg.get("From") date = msg.get("Date") body = "" if msg.is_multipart(): for part in msg.walk(): try: body = part.get_payload(decode=True).decode() break except: pass else: body = msg.get_payload(decode=True).decode() if any(keyword in (subject or "") for keyword in blocklist) or \ any(keyword in (from_ or "") for keyword in blocklist): logging.info(f"Blocked email from: {from_}, subject: {subject}") continue email_data[mail.decode()] = { "subject": subject, "from": from_, "body": body, "date": date, "metadata": { "subject": subject, "from": from_, "date": date } } except Exception as e: logging.error(f"Error processing email ID {mail.decode()}: {e}") imap.close() imap.logout() return email_data except Exception as e: logging.error(f"An error occurred: {e}") return {} if __name__ == "__main__": import json blocklist = ["no-reply@accounts.google.com", "Security alert", "unstop", "linkedin", "kaggle", "Team Unstop", "Canva", "noreply@github.com", "noreply", "feed"] logging.info("Fetching the latest 10,000 emails...") scraper = EmailScraper() emails = scraper.scrape_latest_emails(count=10000, blocklist=blocklist) # Save the emails to a JSON file output_file = "latest_emails.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(emails, f, ensure_ascii=False, indent=4) logging.info(f"Fetched emails saved to {output_file}")