Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Gmail API-based Email Scraper with OAuth Authentication | |
| """ | |
| import base64 | |
| import re | |
| from datetime import datetime, timedelta | |
| from typing import List, Dict, Optional | |
| from email.mime.text import MIMEText | |
| import googleapiclient.errors | |
| from oauth_manager import oauth_manager | |
| from logger import logger | |
| class GmailAPIScraper: | |
| """Gmail API-based email scraper using OAuth authentication""" | |
| def __init__(self): | |
| """Initialize the Gmail API scraper""" | |
| self.oauth_manager = oauth_manager | |
| def _parse_date_string(self, date_str: str) -> datetime: | |
| """Parse date string in DD-MMM-YYYY format to datetime object""" | |
| try: | |
| return datetime.strptime(date_str, "%d-%b-%Y") | |
| except ValueError: | |
| raise ValueError(f"Invalid date format: {date_str}. Expected DD-MMM-YYYY") | |
| def _format_date_for_query(self, date_obj: datetime) -> str: | |
| """Format datetime object for Gmail API query""" | |
| return date_obj.strftime("%Y/%m/%d") | |
| def _decode_message_part(self, part: Dict) -> str: | |
| """Decode message part content""" | |
| data = part.get('body', {}).get('data', '') | |
| if data: | |
| # Decode base64url | |
| data += '=' * (4 - len(data) % 4) # Add padding if needed | |
| decoded_bytes = base64.urlsafe_b64decode(data) | |
| try: | |
| return decoded_bytes.decode('utf-8') | |
| except UnicodeDecodeError: | |
| return decoded_bytes.decode('utf-8', errors='ignore') | |
| return '' | |
| def _extract_email_content(self, message: Dict) -> str: | |
| """Extract readable content from Gmail API message""" | |
| content = "" | |
| if 'payload' not in message: | |
| return content | |
| payload = message['payload'] | |
| # Handle multipart messages | |
| if 'parts' in payload: | |
| for part in payload['parts']: | |
| mime_type = part.get('mimeType', '') | |
| if mime_type == 'text/plain': | |
| content += self._decode_message_part(part) | |
| elif mime_type == 'text/html': | |
| html_content = self._decode_message_part(part) | |
| # Simple HTML tag removal | |
| clean_text = re.sub(r'<[^>]+>', '', html_content) | |
| content += clean_text | |
| elif mime_type.startswith('multipart/'): | |
| # Handle nested multipart | |
| if 'parts' in part: | |
| for nested_part in part['parts']: | |
| nested_mime = nested_part.get('mimeType', '') | |
| if nested_mime == 'text/plain': | |
| content += self._decode_message_part(nested_part) | |
| else: | |
| # Handle single part messages | |
| mime_type = payload.get('mimeType', '') | |
| if mime_type in ['text/plain', 'text/html']: | |
| raw_content = self._decode_message_part(payload) | |
| if mime_type == 'text/html': | |
| # Simple HTML tag removal | |
| content = re.sub(r'<[^>]+>', '', raw_content) | |
| else: | |
| content = raw_content | |
| return content.strip() | |
| def _get_header_value(self, headers: List[Dict], name: str) -> str: | |
| """Get header value by name""" | |
| for header in headers: | |
| if header.get('name', '').lower() == name.lower(): | |
| return header.get('value', '') | |
| return '' | |
| def _parse_email_message(self, message: Dict) -> Dict: | |
| """Parse Gmail API message into structured format""" | |
| headers = message.get('payload', {}).get('headers', []) | |
| # Extract headers | |
| subject = self._get_header_value(headers, 'Subject') or 'No Subject' | |
| from_header = self._get_header_value(headers, 'From') or 'Unknown Sender' | |
| date_header = self._get_header_value(headers, 'Date') | |
| message_id = self._get_header_value(headers, 'Message-ID') or message.get('id', '') | |
| # Parse date | |
| email_date = datetime.now().strftime("%d-%b-%Y") | |
| email_time = "00:00:00" | |
| if date_header: | |
| try: | |
| # Parse RFC 2822 date format | |
| from email.utils import parsedate_to_datetime | |
| dt_obj = parsedate_to_datetime(date_header) | |
| # Convert to IST (Indian Standard Time) | |
| from zoneinfo import ZoneInfo | |
| ist_dt = dt_obj.astimezone(ZoneInfo("Asia/Kolkata")) | |
| email_date = ist_dt.strftime("%d-%b-%Y") | |
| email_time = ist_dt.strftime("%H:%M:%S") | |
| except Exception as e: | |
| logger.warning(f"Failed to parse date {date_header}: {e}") | |
| # Extract content | |
| content = self._extract_email_content(message) | |
| return { | |
| "date": email_date, | |
| "time": email_time, | |
| "subject": subject, | |
| "from": from_header, | |
| "content": content[:2000], # Limit content length | |
| "message_id": message_id, | |
| "gmail_id": message.get('id', '') | |
| } | |
| def search_emails(self, keyword: str, start_date: str, end_date: str) -> List[Dict]: | |
| """Search emails containing keyword within date range using Gmail API | |
| Args: | |
| keyword: Keyword to search for in emails | |
| start_date: Start date in DD-MMM-YYYY format | |
| end_date: End date in DD-MMM-YYYY format | |
| Returns: | |
| List of email dictionaries | |
| """ | |
| logger.info(f"Searching emails containing '{keyword}' between {start_date} and {end_date}") | |
| # Get Gmail service | |
| service = self.oauth_manager.get_gmail_service() | |
| if not service: | |
| raise Exception("Not authenticated. Please authenticate first using the setup tool.") | |
| try: | |
| # Parse dates | |
| start_dt = self._parse_date_string(start_date) | |
| end_dt = self._parse_date_string(end_date) | |
| # Format dates for Gmail API query | |
| after_date = self._format_date_for_query(start_dt) | |
| before_date = self._format_date_for_query(end_dt + timedelta(days=1)) # Add 1 day for inclusive end | |
| # Build search query | |
| # Gmail API search syntax: https://developers.google.com/gmail/api/guides/filtering | |
| query_parts = [ | |
| f'after:{after_date}', | |
| f'before:{before_date}', | |
| f'({keyword})' # Search in all fields | |
| ] | |
| query = ' '.join(query_parts) | |
| logger.info(f"Gmail API query: {query}") | |
| # Search for messages | |
| results = service.users().messages().list( | |
| userId='me', | |
| q=query, | |
| maxResults=500 # Limit to 500 results | |
| ).execute() | |
| messages = results.get('messages', []) | |
| logger.info(f"Found {len(messages)} messages") | |
| if not messages: | |
| return [] | |
| # Fetch full message details | |
| scraped_emails = [] | |
| for i, msg_ref in enumerate(messages): | |
| try: | |
| logger.info(f"Processing email {i+1}/{len(messages)}") | |
| # Get full message | |
| message = service.users().messages().get( | |
| userId='me', | |
| id=msg_ref['id'], | |
| format='full' | |
| ).execute() | |
| # Parse message | |
| parsed_email = self._parse_email_message(message) | |
| # Verify date range (double-check since Gmail search might be inclusive) | |
| email_dt = self._parse_date_string(parsed_email['date']) | |
| if start_dt <= email_dt <= end_dt: | |
| # Verify keyword presence (case-insensitive) | |
| keyword_lower = keyword.lower() | |
| if any(keyword_lower in text.lower() for text in [ | |
| parsed_email['subject'], | |
| parsed_email['from'], | |
| parsed_email['content'] | |
| ]): | |
| scraped_emails.append(parsed_email) | |
| except googleapiclient.errors.HttpError as e: | |
| logger.error(f"Error fetching message {msg_ref['id']}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Error processing message {msg_ref['id']}: {e}") | |
| continue | |
| # Sort by date (newest first) | |
| scraped_emails.sort( | |
| key=lambda x: datetime.strptime(f"{x['date']} {x['time']}", "%d-%b-%Y %H:%M:%S"), | |
| reverse=True | |
| ) | |
| logger.info(f"Successfully processed {len(scraped_emails)} emails containing '{keyword}'") | |
| return scraped_emails | |
| except googleapiclient.errors.HttpError as e: | |
| logger.error(f"Gmail API error: {e}") | |
| raise Exception(f"Gmail API error: {e}") | |
| except Exception as e: | |
| logger.error(f"Email search failed: {e}") | |
| raise | |
| def get_email_by_id(self, message_id: str) -> Optional[Dict]: | |
| """Get email details by message ID or Gmail ID | |
| Args: | |
| message_id: Either the Message-ID header or Gmail message ID | |
| Returns: | |
| Email dictionary or None if not found | |
| """ | |
| service = self.oauth_manager.get_gmail_service() | |
| if not service: | |
| raise Exception("Not authenticated. Please authenticate first using the setup tool.") | |
| try: | |
| # Try to get message directly by Gmail ID first | |
| try: | |
| message = service.users().messages().get( | |
| userId='me', | |
| id=message_id, | |
| format='full' | |
| ).execute() | |
| return self._parse_email_message(message) | |
| except googleapiclient.errors.HttpError: | |
| # If direct ID lookup fails, search by Message-ID header | |
| pass | |
| # Search by Message-ID header | |
| query = f'rfc822msgid:{message_id}' | |
| results = service.users().messages().list( | |
| userId='me', | |
| q=query, | |
| maxResults=1 | |
| ).execute() | |
| messages = results.get('messages', []) | |
| if not messages: | |
| return None | |
| # Get the message | |
| message = service.users().messages().get( | |
| userId='me', | |
| id=messages[0]['id'], | |
| format='full' | |
| ).execute() | |
| return self._parse_email_message(message) | |
| except Exception as e: | |
| logger.error(f"Failed to get email {message_id}: {e}") | |
| return None | |
| def is_authenticated(self) -> bool: | |
| """Check if user is authenticated""" | |
| return self.oauth_manager.is_authenticated() | |
| def get_user_email(self) -> Optional[str]: | |
| """Get authenticated user's email address""" | |
| return self.oauth_manager.get_user_email() | |
| def authenticate(self) -> bool: | |
| """Trigger interactive authentication""" | |
| return self.oauth_manager.authenticate_interactive() | |
| # Global scraper instance | |
| gmail_scraper = GmailAPIScraper() |