Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced Email Scraper with Intelligent Caching | |
| """ | |
| import os | |
| import imaplib | |
| import json | |
| from email import message_from_bytes | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| from zoneinfo import ZoneInfo | |
| from email.utils import parsedate_to_datetime | |
| from typing import List, Dict | |
| load_dotenv() | |
| # Email credentials | |
| APP_PASSWORD = os.getenv("APP_PASSWORD") | |
| EMAIL_ID = os.getenv("EMAIL_ID") | |
| print("EMAIL_ID: ", EMAIL_ID) | |
| EMAIL_DB_FILE = "email_db.json" | |
| def validate_email_setup(): | |
| """Validate email setup and credentials""" | |
| print("=== Email Setup Validation ===") | |
| # Check .env file existence | |
| env_file_exists = os.path.exists('.env') | |
| print(f".env file exists: {'β Yes' if env_file_exists else 'β No'}") | |
| if not env_file_exists: | |
| print("β No .env file found! Create one with:") | |
| print(" EMAIL_ID=your_email@gmail.com") | |
| print(" APP_PASSWORD=your_16_char_app_password") | |
| print(" OPENAI_API_KEY=your_openai_key") | |
| return False | |
| # Check environment variables | |
| issues = [] | |
| if not EMAIL_ID: | |
| issues.append("EMAIL_ID not set or empty") | |
| elif '@' not in EMAIL_ID: | |
| issues.append("EMAIL_ID doesn't look like an email address") | |
| elif not EMAIL_ID.endswith('@gmail.com'): | |
| issues.append("EMAIL_ID should be a Gmail address (@gmail.com)") | |
| if not APP_PASSWORD: | |
| issues.append("APP_PASSWORD not set or empty") | |
| elif len(APP_PASSWORD) != 16: | |
| issues.append(f"APP_PASSWORD should be 16 characters, got {len(APP_PASSWORD)}") | |
| elif ' ' in APP_PASSWORD: | |
| issues.append("APP_PASSWORD should not contain spaces (remove spaces from app password)") | |
| if not os.getenv("OPENAI_API_KEY"): | |
| issues.append("OPENAI_API_KEY not set (needed for query processing)") | |
| if issues: | |
| print("β Issues found:") | |
| for issue in issues: | |
| print(f" - {issue}") | |
| return False | |
| else: | |
| print("β All credentials look good!") | |
| return True | |
| def _imap_connect(): | |
| """Connect to Gmail IMAP server""" | |
| print("=== IMAP Connection Debug ===") | |
| # Check if environment variables are loaded | |
| print(f"EMAIL_ID loaded: {'β Yes' if EMAIL_ID else 'β No (None/Empty)'}") | |
| print(f"APP_PASSWORD loaded: {'β Yes' if APP_PASSWORD else 'β No (None/Empty)'}") | |
| if EMAIL_ID: | |
| print(f"Email ID: {EMAIL_ID[:5]}...@{EMAIL_ID.split('@')[1] if '@' in EMAIL_ID else 'INVALID'}") | |
| if APP_PASSWORD: | |
| print(f"App Password length: {len(APP_PASSWORD)} characters") | |
| print(f"App Password format: {'β Looks correct (16 chars)' if len(APP_PASSWORD) == 16 else f'β Expected 16 chars, got {len(APP_PASSWORD)}'}") | |
| if not EMAIL_ID or not APP_PASSWORD: | |
| error_msg = "Missing credentials in environment variables!" | |
| print(f"β {error_msg}") | |
| raise Exception(error_msg) | |
| try: | |
| print("π Attempting IMAP SSL connection to imap.gmail.com:993...") | |
| mail = imaplib.IMAP4_SSL("imap.gmail.com") | |
| print("β SSL connection established") | |
| print("π Attempting login...") | |
| result = mail.login(EMAIL_ID, APP_PASSWORD) | |
| print(f"β Login successful: {result}") | |
| print("π Selecting mailbox: [Gmail]/All Mail...") | |
| result = mail.select('"[Gmail]/All Mail"') | |
| print(f"β Mailbox selected: {result}") | |
| print("=== IMAP Connection Successful ===") | |
| return mail | |
| except imaplib.IMAP4.error as e: | |
| print(f"β IMAP Error: {e}") | |
| print("π‘ Possible causes:") | |
| print(" - App Password is incorrect or expired") | |
| print(" - 2FA not enabled on Gmail account") | |
| print(" - IMAP access not enabled in Gmail settings") | |
| print(" - Gmail account locked or requires security verification") | |
| raise | |
| except Exception as e: | |
| print(f"β Connection Error: {e}") | |
| print("π‘ Possible causes:") | |
| print(" - Network connectivity issues") | |
| print(" - Gmail IMAP server temporarily unavailable") | |
| print(" - Firewall blocking IMAP port 993") | |
| raise | |
| def _email_to_clean_text(msg): | |
| """Extract clean text from email message""" | |
| # Try HTML first | |
| html_content = None | |
| text_content = None | |
| if msg.is_multipart(): | |
| for part in msg.walk(): | |
| content_type = part.get_content_type() | |
| if content_type == "text/html": | |
| try: | |
| html_content = part.get_payload(decode=True).decode(errors="ignore") | |
| except: | |
| continue | |
| elif content_type == "text/plain": | |
| try: | |
| text_content = part.get_payload(decode=True).decode(errors="ignore") | |
| except: | |
| continue | |
| else: | |
| # Non-multipart message | |
| content_type = msg.get_content_type() | |
| try: | |
| content = msg.get_payload(decode=True).decode(errors="ignore") | |
| if content_type == "text/html": | |
| html_content = content | |
| else: | |
| text_content = content | |
| except: | |
| pass | |
| # Clean HTML content | |
| if html_content: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| # Remove script and style elements | |
| for script in soup(["script", "style"]): | |
| script.decompose() | |
| return soup.get_text(separator=' ', strip=True) | |
| elif text_content: | |
| return text_content.strip() | |
| else: | |
| return "" | |
| def _load_email_db() -> Dict: | |
| """Load email database from file""" | |
| if not os.path.exists(EMAIL_DB_FILE): | |
| return {} | |
| try: | |
| with open(EMAIL_DB_FILE, "r") as f: | |
| return json.load(f) | |
| except (json.JSONDecodeError, IOError): | |
| print(f"Warning: Could not load {EMAIL_DB_FILE}, starting with empty database") | |
| return {} | |
| def _save_email_db(db: Dict): | |
| """Save email database to file""" | |
| try: | |
| with open(EMAIL_DB_FILE, "w") as f: | |
| json.dump(db, f, indent=2) | |
| except IOError as e: | |
| print(f"Error saving database: {e}") | |
| raise | |
| def _date_to_imap_format(date_str: str) -> str: | |
| """Convert DD-MMM-YYYY to IMAP date format""" | |
| try: | |
| dt = datetime.strptime(date_str, "%d-%b-%Y") | |
| return dt.strftime("%d-%b-%Y") | |
| except ValueError: | |
| raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") | |
| def _is_date_in_range(email_date: str, start_date: str, end_date: str) -> bool: | |
| """Check if email date is within the specified range""" | |
| try: | |
| email_dt = datetime.strptime(email_date, "%d-%b-%Y") | |
| start_dt = datetime.strptime(start_date, "%d-%b-%Y") | |
| end_dt = datetime.strptime(end_date, "%d-%b-%Y") | |
| return start_dt <= email_dt <= end_dt | |
| except ValueError: | |
| return False | |
| def scrape_emails_from_sender(sender_email: str, start_date: str, end_date: str) -> List[Dict]: | |
| """ | |
| Scrape emails from specific sender within date range | |
| Uses intelligent caching to avoid re-scraping | |
| """ | |
| print(f"Scraping emails from {sender_email} between {start_date} and {end_date}") | |
| # Load existing database | |
| db = _load_email_db() | |
| sender_email = sender_email.lower().strip() | |
| # Check if we have cached emails for this sender | |
| if sender_email in db: | |
| cached_emails = db[sender_email].get("emails", []) | |
| # Filter cached emails by date range | |
| filtered_emails = [ | |
| email for email in cached_emails | |
| if _is_date_in_range(email["date"], start_date, end_date) | |
| ] | |
| # Check if we need to scrape more recent emails | |
| last_scraped = db[sender_email].get("last_scraped", "01-Jan-2020") | |
| today = datetime.today().strftime("%d-%b-%Y") | |
| if last_scraped == today and filtered_emails: | |
| print(f"Using cached emails (last scraped: {last_scraped})") | |
| return filtered_emails | |
| # Need to scrape emails | |
| try: | |
| mail = _imap_connect() | |
| # Prepare IMAP search criteria | |
| start_imap = _date_to_imap_format(start_date) | |
| # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
| end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
| end_imap = end_dt.strftime("%d-%b-%Y") | |
| search_criteria = f'(FROM "{sender_email}") SINCE "{start_imap}" BEFORE "{end_imap}"' | |
| print(f"IMAP search: {search_criteria}") | |
| # Search for emails | |
| status, data = mail.search(None, search_criteria) | |
| if status != 'OK': | |
| raise Exception(f"IMAP search failed: {status}") | |
| email_ids = data[0].split() | |
| print(f"Found {len(email_ids)} emails") | |
| scraped_emails = [] | |
| # Process each email | |
| for i, email_id in enumerate(email_ids): | |
| try: | |
| print(f"Processing email {i+1}/{len(email_ids)}") | |
| # Fetch email | |
| status, msg_data = mail.fetch(email_id, "(RFC822)") | |
| if status != 'OK': | |
| continue | |
| # Parse email | |
| msg = message_from_bytes(msg_data[0][1]) | |
| # Extract information | |
| subject = msg.get("Subject", "No Subject") | |
| content = _email_to_clean_text(msg) | |
| # Parse date | |
| date_header = msg.get("Date", "") | |
| if date_header: | |
| try: | |
| dt_obj = parsedate_to_datetime(date_header) | |
| # Convert to IST | |
| ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
| email_date = ist_dt.strftime("%d-%b-%Y") | |
| email_time = ist_dt.strftime("%H:%M:%S") | |
| except: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| else: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| # Get message ID for deduplication | |
| message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
| scraped_emails.append({ | |
| "date": email_date, | |
| "time": email_time, | |
| "subject": subject, | |
| "content": content[:2000], # Limit content length | |
| "message_id": message_id | |
| }) | |
| except Exception as e: | |
| print(f"Error processing email {email_id}: {e}") | |
| continue | |
| mail.logout() | |
| # Update database | |
| if sender_email not in db: | |
| db[sender_email] = {"emails": [], "last_scraped": ""} | |
| # Merge with existing emails (avoid duplicates) | |
| existing_emails = db[sender_email].get("emails", []) | |
| existing_ids = {email.get("message_id") for email in existing_emails} | |
| new_emails = [ | |
| email for email in scraped_emails | |
| if email["message_id"] not in existing_ids | |
| ] | |
| # Update database | |
| db[sender_email]["emails"] = existing_emails + new_emails | |
| db[sender_email]["last_scraped"] = datetime.today().strftime("%d-%b-%Y") | |
| # Save database | |
| _save_email_db(db) | |
| # Return filtered results | |
| all_emails = db[sender_email]["emails"] | |
| filtered_emails = [ | |
| email for email in all_emails | |
| if _is_date_in_range(email["date"], start_date, end_date) | |
| ] | |
| print(f"Scraped {len(new_emails)} new emails, returning {len(filtered_emails)} in date range") | |
| return filtered_emails | |
| except Exception as e: | |
| print(f"Email scraping failed: {e}") | |
| raise | |
| def scrape_emails_by_text_search(keyword: str, start_date: str, end_date: str) -> List[Dict]: | |
| """ | |
| Scrape emails containing a specific keyword (like company name) within date range. | |
| Uses IMAP text search to find emails from senders containing the keyword. | |
| """ | |
| print(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") | |
| # Validate setup first | |
| if not validate_email_setup(): | |
| raise Exception("Email setup validation failed. Please check your .env file and credentials.") | |
| try: | |
| mail = _imap_connect() | |
| # Prepare IMAP search criteria with text search | |
| start_imap = _date_to_imap_format(start_date) | |
| # Add one day to end_date for BEFORE criteria (IMAP BEFORE is exclusive) | |
| end_dt = datetime.strptime(end_date, "%d-%b-%Y") + timedelta(days=1) | |
| end_imap = end_dt.strftime("%d-%b-%Y") | |
| # Search for emails containing the keyword in FROM field or SUBJECT or BODY | |
| # We'll search multiple criteria and combine results | |
| search_criteria_list = [ | |
| f'FROM "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', | |
| f'SUBJECT "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"', | |
| f'BODY "{keyword}" SINCE "{start_imap}" BEFORE "{end_imap}"' | |
| ] | |
| all_email_ids = set() | |
| # Search with multiple criteria to catch emails containing the keyword | |
| for search_criteria in search_criteria_list: | |
| try: | |
| print(f"IMAP search: {search_criteria}") | |
| status, data = mail.search(None, search_criteria) | |
| if status == 'OK' and data[0]: | |
| email_ids = data[0].split() | |
| all_email_ids.update(email_ids) | |
| print(f"Found {len(email_ids)} emails with this criteria") | |
| except Exception as e: | |
| print(f"Search criteria failed: {search_criteria}, error: {e}") | |
| continue | |
| print(f"Total unique emails found: {len(all_email_ids)}") | |
| scraped_emails = [] | |
| # Process each email | |
| for i, email_id in enumerate(all_email_ids): | |
| try: | |
| print(f"Processing email {i+1}/{len(all_email_ids)}") | |
| # Fetch email | |
| status, msg_data = mail.fetch(email_id, "(RFC822)") | |
| if status != 'OK': | |
| continue | |
| # Parse email | |
| msg = message_from_bytes(msg_data[0][1]) | |
| # Extract information | |
| subject = msg.get("Subject", "No Subject") | |
| from_header = msg.get("From", "Unknown Sender") | |
| content = _email_to_clean_text(msg) | |
| # Check if the keyword is actually present (case-insensitive) | |
| keyword_lower = keyword.lower() | |
| if not any(keyword_lower in text.lower() for text in [subject, from_header, content]): | |
| continue | |
| # Parse date | |
| date_header = msg.get("Date", "") | |
| if date_header: | |
| try: | |
| dt_obj = parsedate_to_datetime(date_header) | |
| # Convert to IST | |
| ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
| email_date = ist_dt.strftime("%d-%b-%Y") | |
| email_time = ist_dt.strftime("%H:%M:%S") | |
| except: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| else: | |
| email_date = datetime.today().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| # Double-check date range | |
| if not _is_date_in_range(email_date, start_date, end_date): | |
| continue | |
| # Get message ID for deduplication | |
| message_id = msg.get("Message-ID", f"missing-{email_id.decode()}") | |
| scraped_emails.append({ | |
| "date": email_date, | |
| "time": email_time, | |
| "subject": subject, | |
| "from": from_header, | |
| "content": content[:2000], # Limit content length | |
| "message_id": message_id | |
| }) | |
| except Exception as e: | |
| print(f"Error processing email {email_id}: {e}") | |
| continue | |
| mail.logout() | |
| # Sort by date (newest first) | |
| scraped_emails.sort(key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), reverse=True) | |
| print(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") | |
| return scraped_emails | |
| except Exception as e: | |
| print(f"Email text search failed: {e}") | |
| raise | |
| # Test the scraper | |
| if __name__ == "__main__": | |
| # Test scraping | |
| try: | |
| emails = scrape_emails_from_sender( | |
| "noreply@example.com", | |
| "01-Jun-2025", | |
| "07-Jun-2025" | |
| ) | |
| print(f"\nFound {len(emails)} emails:") | |
| for email in emails[:3]: # Show first 3 | |
| print(f"- {email['date']} {email['time']}: {email['subject']}") | |
| except Exception as e: | |
| print(f"Test failed: {e}") |