Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced Email Scraper with Intelligent Caching | |
| """ | |
| import os | |
| import imaplib | |
| import json | |
| from email import message_from_bytes | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| from zoneinfo import ZoneInfo | |
| from email.utils import parsedate_to_datetime | |
| from typing import List, Dict | |
| load_dotenv() | |
| # Email credentials | |
| APP_PASSWORD = os.getenv("APP_PASSWORD") | |
| EMAIL_ID = os.getenv("EMAIL_ID") | |
| EMAIL_DB_FILE = "email_db.json" | |
| def _imap_connect(): | |
| """Connect to Gmail IMAP server""" | |
| try: | |
| mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
| mail.login(EMAIL_ID, APP_PASSWORD) | |
| mail.select('"[Gmail]/All Mail"') | |
| return mail | |
| except Exception as e: | |
| print(f"IMAP connection failed: {e}") | |
| raise | |
| def _email_to_clean_text(msg): | |
| """Extract clean text from email message""" | |
| # Try HTML first | |
| html_content = None | |
| text_content = None | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| if content_type == "text/html": | |
| try: | |
| html_content = part.get_payload(decode=True).decode(errors="ignore") | |
| except: | |
| continue | |
| elif content_type == "text/plain": | |
| try: | |
| text_content = part.get_payload(decode=True).decode(errors="ignore") | |
| except: | |
| continue | |
| else: | |
| # Non-multipart message | |
| content_type = msg.get_content_type() | |
| try: | |
| content = msg.get_payload(decode=True).decode(errors="ignore") | |
| if content_type == "text/html": | |
| html_content = content | |
| else: | |
| text_content = content | |
| except: | |
| pass | |
| # Clean HTML content | |
| if html_content: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| return soup.get_text(separator=' ', strip=True) | |
| elif text_content: | |
| return text_content.strip() | |
| else: | |
| return "" | |
| def _load_email_db() -> Dict: | |
| """Load email database from file""" | |
| if not os.path.exists(EMAIL_DB_FILE): | |
| return {} | |
| try: | |
| with open(EMAIL_DB_FILE, "r") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError): | |
| print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") | |
| return {} | |
| def _save_email_db(db: Dict): | |
| """Save email database to file""" | |
| try: | |
| with open(EMAIL_DB_FILE, "w") as f: | |
| json.dump(db, f, indent=2) | |
| except IOError as e: | |
| print(f"Error saving database: {e}") | |
| raise | |
| def _date_to_imap_format(date_str: str) -> str: | |
| """Convert DD-MMM-YYYY to IMAP date format""" | |
| try: | |
| dt = datetime.strptime(date_str, "%d-%b-%Y") | |
| return dt.strftime("%d-%b-%Y") | |
| except ValueError: | |
| raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") | |
| def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: | |
| """Check if email date is within the specified range""" | |
| try: | |
| email_dt = datetime.strptime(email_date, "%d-%b-%Y") | |
| start_dt = datetime.strptime(start_date, "%d-%b-%Y") | |
| end_dt = datetime.strptime(end_date, "%d-%b-%Y") | |
| return start_dt <= email_dt <= end_dt | |
| except ValueError: | |
| return False | |
| def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: | |
| """ | |
| Scrape emails from specific sender within date range | |
| Uses intelligent caching to avoid re-scraping | |
| """ | |
| print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") | |
| # Load existing database | |
| db = _load_email_db() | |
| sender_email = sender_email.lower().strip() | |
| # Check if we have cached emails for this sender | |
| if sender_email in db: | |
| cached_emails = db[sender_email].get("emails", []) | |
| # Filter cached emails by date range | |
| filtered_emails = [ | |
| email for email in cached_emails | |
| if _is_date_in_range(email["date"], start_date, end_date) | |
| ] | |
| # Check if we need to scrape more recent emails | |
| last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") | |
| today = datetime.today().strftime("%d-%b-%Y") | |
| if last_scraped == today and filtered_emails: | |
| print(f"Using cached emails (last scraped: {last_scraped})") | |
| return filtered_emails | |
| # Need to scrape emails | |
| try: | |
| mail = _imap_connect() | |
| # Prepare IMAP search criteria | |
| start_imap = _date_to_imap_format(start_date) | |
| # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
| end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
| end_imap = end_dt.strftime("%d-%b-%Y") | |
| search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' | |
| print(f"IMAP search: {search_criteria}") | |
| # Search for emails | |
| status, data = mail.search(None, search_criteria) | |
| if status != 'OK': | |
| raise Exception(f"IMAP search failed: {status}") | |
| email_ids = data[0].split() | |
| print(f"Found {len(email_ids)} emails") | |
| scraped_emails = [] | |
| # Process each email | |
| for i, email_id in enumerate(email_ids): | |
| try: | |
| print(f"Processing email {i+1}/{len(email_ids)}") | |
| # Fetch email | |
| status, msg_data = mail.fetch(email_id, "(RFC822)") | |
| if status != 'OK': | |
| continue | |
| # Parse email | |
| msg = message_from_bytes(msg_data[0][1]) | |
| # Extract information | |
| subject = msg.get("Subject", "No Subject") | |
| content = _email_to_clean_text(msg) | |
| # Parse date | |
| date_header = msg.get("Date", "") | |
| if date_header: | |
| try: | |
| dt_obj = parsedate_to_datetime(date_header) | |
| # Convert to IST | |
| ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
| email_date = ist_dt.strftime("%d-%b-%Y") | |
| email_time = ist_dt.strftime("%H:%M:%S") | |
| except: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| else: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| # Get message ID for deduplication | |
| message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
| scraped_emails.append({ | |
| "date": email_date, | |
| "time": email_time, | |
| "subject": subject, | |
| "content": content[:2000], # Limit content length | |
| "message_id": message_id | |
| }) | |
| except Exception as e: | |
| print(f"Error processing email {email_id}: {e}") | |
| continue | |
| mail.logout() | |
| # Update database | |
| if sender_email not in db: | |
| db[sender_email] = {"emails": [], "last_scraped": ""} | |
| # Merge with existing emails (avoid duplicates) | |
| existing_emails = db[sender_email].get("emails", []) | |
| existing_ids = {email.get("message_id") for email in existing_emails} | |
| new_emails = [ | |
| email for email in scraped_emails | |
| if email["message_id"] not in existing_ids | |
| ] | |
| # Update database | |
| db[sender_email]["emails"] = existing_emails + new_emails | |
| db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") | |
| # Save database | |
| _save_email_db(db) | |
| # Return filtered results | |
| all_emails = db[sender_email]["emails"] | |
| filtered_emails = [ | |
| email for email in all_emails | |
| if _is_date_in_range(email["date"], start_date, end_date) | |
| ] | |
| print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") | |
| return filtered_emails | |
| except Exception as e: | |
| print(f"Email scraping failed: {e}") | |
| raise | |
| # Test the scraper | |
| if __name__ == "__main__": | |
| # Test scraping | |
| try: | |
| emails = scrape_emails_from_sender( | |
| "noreply@example.com", | |
| "01-Jun-2025", | |
| "07-Jun-2025" | |
| ) | |
| print(f"\nFound {len(emails)} emails:") | |
| for email in emails[:3]: # Show first 3 | |
| print(f"- {email['date']} {email['time']}: {email['subject']}") | |
| except Exception as e: | |
| print(f"Test failed: {e}") |