Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Gradio Email Client App - Web interface for email fetching and management | |
| Supports IMAP and POP3 protocols with secure authentication | |
| Requirements (requirements.txt): | |
| gradio | |
| pandas | |
| """ | |
| import gradio as gr | |
| import imaplib # Built-in Python module | |
| import poplib # Built-in Python module | |
| import email # Built-in Python module | |
| from email.header import decode_header | |
| from email.utils import parsedate_to_datetime | |
| import json | |
| import logging | |
| from datetime import datetime | |
| import pandas as pd | |
| from typing import List, Dict, Tuple, Optional | |
| import re | |
| import ssl | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("gradio-email-app") | |
| class EmailClient: | |
| """Email client supporting both IMAP and POP3""" | |
| def __init__(self): | |
| self.imap_conn = None | |
| self.pop_conn = None | |
| self.protocol = None | |
| self.connected_email = None | |
| def connect_imap(self, server: str, port: int, email_addr: str, password: str, use_ssl: bool = True): | |
| """Connect to IMAP server""" | |
| try: | |
| # Create SSL context for better security | |
| if use_ssl: | |
| context = ssl.create_default_context() | |
| self.imap_conn = imaplib.IMAP4_SSL(server, port, ssl_context=context) | |
| else: | |
| self.imap_conn = imaplib.IMAP4(server, port) | |
| self.imap_conn.login(email_addr, password) | |
| self.protocol = "IMAP" | |
| self.connected_email = email_addr | |
| logger.info(f"Connected to IMAP server: {server}") | |
| return True, f"β Connected to IMAP server: {server}" | |
| except imaplib.IMAP4.error as e: | |
| logger.error(f"IMAP authentication failed: {e}") | |
| return False, f"β IMAP authentication failed. Check your credentials." | |
| except Exception as e: | |
| logger.error(f"IMAP connection failed: {e}") | |
| return False, f"β IMAP connection failed: {str(e)}" | |
| def connect_pop3(self, server: str, port: int, email_addr: str, password: str, use_ssl: bool = True): | |
| """Connect to POP3 server""" | |
| try: | |
| if use_ssl: | |
| context = ssl.create_default_context() | |
| self.pop_conn = poplib.POP3_SSL(server, port, context=context) | |
| else: | |
| self.pop_conn = poplib.POP3(server, port) | |
| self.pop_conn.user(email_addr) | |
| self.pop_conn.pass_(password) | |
| self.protocol = "POP3" | |
| self.connected_email = email_addr | |
| logger.info(f"Connected to POP3 server: {server}") | |
| return True, f"β Connected to POP3 server: {server}" | |
| except poplib.error_proto as e: | |
| logger.error(f"POP3 authentication failed: {e}") | |
| return False, f"β POP3 authentication failed. Check your credentials." | |
| except Exception as e: | |
| logger.error(f"POP3 connection failed: {e}") | |
| return False, f"β POP3 connection failed: {str(e)}" | |
| def get_folders(self) -> Tuple[List[str], str]: | |
| """Get available folders (IMAP only)""" | |
| if self.protocol != "IMAP" or not self.imap_conn: | |
| return [], "β Folders are only available with IMAP connections" | |
| try: | |
| status, folders = self.imap_conn.list() | |
| folder_list = [] | |
| for folder in folders: | |
| # Parse folder name from IMAP response | |
| parts = folder.decode().split('"') | |
| if len(parts) >= 3: | |
| folder_name = parts[-2] | |
| folder_list.append(folder_name) | |
| return folder_list, f"β Found {len(folder_list)} folders" | |
| except Exception as e: | |
| logger.error(f"Failed to get folders: {e}") | |
| return [], f"β Failed to get folders: {str(e)}" | |
| def fetch_emails(self, folder: str = "INBOX", limit: int = 10, search_criteria: str = "ALL") -> Tuple[List[Dict], str]: | |
| """Fetch emails from specified folder""" | |
| if self.protocol == "IMAP": | |
| return self._fetch_emails_imap(folder, limit, search_criteria) | |
| elif self.protocol == "POP3": | |
| return self._fetch_emails_pop3(limit) | |
| else: | |
| return [], "β Not connected to any email server" | |
| def _fetch_emails_imap(self, folder: str, limit: int, search_criteria: str) -> Tuple[List[Dict], str]: | |
| """Fetch emails using IMAP""" | |
| if not self.imap_conn: | |
| return [], "β No IMAP connection" | |
| try: | |
| self.imap_conn.select(folder) | |
| status, messages = self.imap_conn.search(None, search_criteria) | |
| if status != 'OK': | |
| return [], f"β Search failed in folder {folder}" | |
| email_ids = messages[0].split() | |
| email_ids = email_ids[-limit:] if len(email_ids) > limit else email_ids | |
| emails = [] | |
| for email_id in reversed(email_ids): | |
| try: | |
| status, msg_data = self.imap_conn.fetch(email_id, '(RFC822)') | |
| if status == 'OK': | |
| email_body = msg_data[0][1] | |
| email_message = email.message_from_bytes(email_body) | |
| parsed_email = self._parse_email(email_message) | |
| parsed_email['id'] = email_id.decode() | |
| emails.append(parsed_email) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch email {email_id}: {e}") | |
| continue | |
| return emails, f"β Fetched {len(emails)} emails from {folder}" | |
| except Exception as e: | |
| logger.error(f"IMAP fetch failed: {e}") | |
| return [], f"β IMAP fetch failed: {str(e)}" | |
| def _fetch_emails_pop3(self, limit: int) -> Tuple[List[Dict], str]: | |
| """Fetch emails using POP3""" | |
| if not self.pop_conn: | |
| return [], "β No POP3 connection" | |
| try: | |
| num_messages = len(self.pop_conn.list()[1]) | |
| start_index = max(1, num_messages - limit + 1) | |
| emails = [] | |
| for i in range(start_index, num_messages + 1): | |
| try: | |
| raw_email = b'\n'.join(self.pop_conn.retr(i)[1]) | |
| email_message = email.message_from_bytes(raw_email) | |
| parsed_email = self._parse_email(email_message) | |
| parsed_email['id'] = str(i) | |
| emails.append(parsed_email) | |
| except Exception as e: | |
| logger.error(f"Failed to fetch email {i}: {e}") | |
| continue | |
| return list(reversed(emails)), f"β Fetched {len(emails)} emails" | |
| except Exception as e: | |
| logger.error(f"POP3 fetch failed: {e}") | |
| return [], f"β POP3 fetch failed: {str(e)}" | |
| def _parse_email(self, email_message) -> Dict: | |
| """Parse email message into dictionary""" | |
| def decode_mime_words(s): | |
| if s is None: | |
| return "" | |
| decoded_parts = decode_header(s) | |
| decoded_string = "" | |
| for part, encoding in decoded_parts: | |
| if isinstance(part, bytes): | |
| if encoding: | |
| decoded_string += part.decode(encoding) | |
| else: | |
| decoded_string += part.decode('utf-8', errors='ignore') | |
| else: | |
| decoded_string += part | |
| return decoded_string | |
| # Extract basic headers | |
| subject = decode_mime_words(email_message.get('Subject', '')) | |
| from_addr = decode_mime_words(email_message.get('From', '')) | |
| to_addr = decode_mime_words(email_message.get('To', '')) | |
| date_str = email_message.get('Date', '') | |
| # Parse date | |
| try: | |
| date_obj = parsedate_to_datetime(date_str) | |
| formatted_date = date_obj.strftime("%Y-%m-%d %H:%M:%S") | |
| except: | |
| formatted_date = date_str | |
| # Extract body | |
| body = self._extract_body(email_message) | |
| # Extract attachments info | |
| attachments = self._extract_attachments_info(email_message) | |
| return { | |
| 'subject': subject, | |
| 'from': from_addr, | |
| 'to': to_addr, | |
| 'date': formatted_date, | |
| 'body_text': body.get('text', ''), | |
| 'body_html': body.get('html', ''), | |
| 'attachments': attachments, | |
| 'attachment_count': len(attachments) | |
| } | |
| def _extract_body(self, email_message) -> Dict[str, str]: | |
| """Extract email body (text and HTML)""" | |
| body = {'text': '', 'html': ''} | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| content_type = part.get_content_type() | |
| content_disposition = str(part.get('Content-Disposition', '')) | |
| if 'attachment' not in content_disposition: | |
| if content_type == 'text/plain': | |
| try: | |
| body['text'] = part.get_payload(decode=True).decode('utf-8', errors='ignore') | |
| except: | |
| body['text'] = str(part.get_payload()) | |
| elif content_type == 'text/html': | |
| try: | |
| body['html'] = part.get_payload(decode=True).decode('utf-8', errors='ignore') | |
| except: | |
| body['html'] = str(part.get_payload()) | |
| else: | |
| content_type = email_message.get_content_type() | |
| try: | |
| payload = email_message.get_payload(decode=True).decode('utf-8', errors='ignore') | |
| except: | |
| payload = str(email_message.get_payload()) | |
| if content_type == 'text/plain': | |
| body['text'] = payload | |
| elif content_type == 'text/html': | |
| body['html'] = payload | |
| else: | |
| body['text'] = payload | |
| return body | |
| def _extract_attachments_info(self, email_message) -> List[str]: | |
| """Extract attachment information""" | |
| attachments = [] | |
| if email_message.is_multipart(): | |
| for part in email_message.walk(): | |
| content_disposition = str(part.get('Content-Disposition', '')) | |
| if 'attachment' in content_disposition: | |
| filename = part.get_filename() | |
| if filename: | |
| filename = decode_header(filename)[0][0] | |
| if isinstance(filename, bytes): | |
| filename = filename.decode('utf-8', errors='ignore') | |
| attachments.append(filename) | |
| return attachments | |
| def disconnect(self): | |
| """Disconnect from email server""" | |
| try: | |
| if self.imap_conn: | |
| self.imap_conn.close() | |
| self.imap_conn.logout() | |
| self.imap_conn = None | |
| if self.pop_conn: | |
| self.pop_conn.quit() | |
| self.pop_conn = None | |
| self.protocol = None | |
| self.connected_email = None | |
| logger.info("Disconnected from email server") | |
| return "β Disconnected from email server" | |
| except Exception as e: | |
| logger.error(f"Disconnect error: {e}") | |
| return f"β Disconnect error: {str(e)}" | |
| # Global email client instance | |
| email_client = EmailClient() | |
| # Email server configurations | |
| EMAIL_SERVERS = { | |
| 'Gmail': { | |
| 'imap': {'server': 'imap.gmail.com', 'port': 993}, | |
| 'pop3': {'server': 'pop.gmail.com', 'port': 995} | |
| }, | |
| 'Outlook/Hotmail': { | |
| 'imap': {'server': 'outlook.office365.com', 'port': 993}, | |
| 'pop3': {'server': 'outlook.office365.com', 'port': 995} | |
| }, | |
| 'Yahoo': { | |
| 'imap': {'server': 'imap.mail.yahoo.com', 'port': 993}, | |
| 'pop3': {'server': 'pop.mail.yahoo.com', 'port': 995} | |
| }, | |
| 'iCloud': { | |
| 'imap': {'server': 'imap.mail.me.com', 'port': 993}, | |
| 'pop3': {'server': 'pop.mail.me.com', 'port': 995} | |
| } | |
| } | |
| def connect_to_email(email_addr, password, provider, protocol, custom_server="", custom_port=993): | |
| """Connect to email server""" | |
| if not email_addr or not password: | |
| return "β Email and password are required", "", [] | |
| # Get server configuration | |
| if provider == "Custom": | |
| if not custom_server: | |
| return "β Custom server address is required", "", [] | |
| server_addr = custom_server | |
| port = custom_port | |
| else: | |
| if provider not in EMAIL_SERVERS: | |
| return f"β Unsupported provider: {provider}", "", [] | |
| server_config = EMAIL_SERVERS[provider][protocol.lower()] | |
| server_addr = server_config["server"] | |
| port = server_config["port"] | |
| # Connect to email server | |
| if protocol.lower() == "imap": | |
| success, message = email_client.connect_imap(server_addr, port, email_addr, password) | |
| else: | |
| success, message = email_client.connect_pop3(server_addr, port, email_addr, password) | |
| if success: | |
| # Get folders if IMAP | |
| if protocol.lower() == "imap": | |
| folders, _ = email_client.get_folders() | |
| return message, f"Connected as: {email_addr}", gr.update(choices=folders, value="INBOX") | |
| else: | |
| return message, f"Connected as: {email_addr}", gr.update(choices=["INBOX"], value="INBOX") | |
| else: | |
| return message, "", [] | |
| def fetch_emails_ui(folder, limit, search_criteria): | |
| """Fetch emails and return as DataFrame""" | |
| if not email_client.protocol: | |
| return None, "β Not connected to email server" | |
| emails, message = email_client.fetch_emails(folder, limit, search_criteria) | |
| if not emails: | |
| return None, message | |
| # Convert to DataFrame for better display | |
| df_data = [] | |
| for email_data in emails: | |
| df_data.append({ | |
| 'Subject': email_data['subject'][:50] + "..." if len(email_data['subject']) > 50 else email_data['subject'], | |
| 'From': email_data['from'], | |
| 'Date': email_data['date'], | |
| 'Attachments': email_data['attachment_count'], | |
| 'Preview': (email_data['body_text'][:100] + "...") if email_data['body_text'] else "No text content" | |
| }) | |
| df = pd.DataFrame(df_data) | |
| return df, message | |
| def search_emails_ui(query, folder, limit): | |
| """Search emails with query""" | |
| if not email_client.protocol: | |
| return None, "β Not connected to email server" | |
| if not query.strip(): | |
| return None, "β Search query is required" | |
| # Create search criteria | |
| if email_client.protocol == "IMAP": | |
| search_criteria = f'(OR (SUBJECT "{query}") (FROM "{query}") (BODY "{query}"))' | |
| else: | |
| search_criteria = "ALL" # POP3 doesn't support server-side search | |
| emails, message = email_client.fetch_emails(folder, limit, search_criteria) | |
| if not emails: | |
| return None, message | |
| # Additional client-side filtering for POP3 or better results | |
| filtered_emails = [] | |
| query_lower = query.lower() | |
| for email_data in emails: | |
| if (query_lower in email_data.get('subject', '').lower() or | |
| query_lower in email_data.get('from', '').lower() or | |
| query_lower in email_data.get('body_text', '').lower()): | |
| filtered_emails.append(email_data) | |
| if not filtered_emails: | |
| return None, f"β No emails found matching query: {query}" | |
| # Convert to DataFrame | |
| df_data = [] | |
| for email_data in filtered_emails: | |
| df_data.append({ | |
| 'Subject': email_data['subject'][:50] + "..." if len(email_data['subject']) > 50 else email_data['subject'], | |
| 'From': email_data['from'], | |
| 'Date': email_data['date'], | |
| 'Attachments': email_data['attachment_count'], | |
| 'Preview': (email_data['body_text'][:100] + "...") if email_data['body_text'] else "No text content" | |
| }) | |
| df = pd.DataFrame(df_data) | |
| return df, f"β Found {len(filtered_emails)} emails matching '{query}'" | |
| def disconnect_email(): | |
| """Disconnect from email server""" | |
| message = email_client.disconnect() | |
| return message, "", [] | |
| def get_connection_status(): | |
| """Get current connection status""" | |
| if email_client.protocol and email_client.connected_email: | |
| return f"π’ Connected to {email_client.connected_email} via {email_client.protocol}" | |
| else: | |
| return "π΄ Not connected" | |
| def update_custom_server_visibility(provider): | |
| """Show/hide custom server fields based on provider selection""" | |
| if provider == "Custom": | |
| return gr.update(visible=True), gr.update(visible=True) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| # Create Gradio interface | |
| with gr.Blocks(title="Email Client", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π§ Email Client") | |
| gr.Markdown("Connect to your email account and manage your emails with a user-friendly interface.") | |
| # Connection status | |
| with gr.Row(): | |
| status_display = gr.Textbox( | |
| label="Connection Status", | |
| value=get_connection_status(), | |
| interactive=False, | |
| scale=4 | |
| ) | |
| refresh_status_btn = gr.Button("π Refresh Status", scale=1) | |
| with gr.Tabs(): | |
| # Connection Tab | |
| with gr.TabItem("π Connect"): | |
| gr.Markdown("### Email Connection Settings") | |
| with gr.Row(): | |
| email_input = gr.Textbox( | |
| label="Email Address", | |
| placeholder="your-email@example.com", | |
| scale=3 | |
| ) | |
| password_input = gr.Textbox( | |
| label="Password", | |
| placeholder="Your password or app password", | |
| type="password", | |
| scale=3 | |
| ) | |
| with gr.Row(): | |
| provider_dropdown = gr.Dropdown( | |
| choices=list(EMAIL_SERVERS.keys()) + ["Custom"], | |
| label="Email Provider", | |
| value="Gmail", | |
| scale=2 | |
| ) | |
| protocol_dropdown = gr.Dropdown( | |
| choices=["IMAP", "POP3"], | |
| label="Protocol", | |
| value="IMAP", | |
| scale=1 | |
| ) | |
| # Custom server fields (initially hidden) | |
| with gr.Row(): | |
| custom_server_input = gr.Textbox( | |
| label="Custom Server", | |
| placeholder="mail.example.com", | |
| visible=False, | |
| scale=3 | |
| ) | |
| custom_port_input = gr.Number( | |
| label="Port", | |
| value=993, | |
| visible=False, | |
| scale=1 | |
| ) | |
| with gr.Row(): | |
| connect_btn = gr.Button("π Connect", variant="primary", scale=1) | |
| disconnect_btn = gr.Button("β Disconnect", scale=1) | |
| connection_message = gr.Textbox( | |
| label="Connection Message", | |
| interactive=False | |
| ) | |
| gr.Markdown(""" | |
| ### π‘ Connection Tips: | |
| - **Gmail**: Use app passwords instead of your regular password | |
| - **IMAP**: Recommended for full folder access and search capabilities | |
| - **POP3**: Downloads emails to local client, limited folder support | |
| """) | |
| # Email Management Tab | |
| with gr.TabItem("π¬ Emails"): | |
| with gr.Row(): | |
| folder_dropdown = gr.Dropdown( | |
| label="Folder", | |
| choices=["INBOX"], | |
| value="INBOX", | |
| scale=2 | |
| ) | |
| limit_slider = gr.Slider( | |
| minimum=1, | |
| maximum=100, | |
| value=10, | |
| step=1, | |
| label="Email Limit", | |
| scale=1 | |
| ) | |
| with gr.Row(): | |
| search_criteria_input = gr.Textbox( | |
| label="Search Criteria (IMAP only)", | |
| placeholder="ALL, UNSEEN, FROM sender@example.com", | |
| value="ALL", | |
| scale=3 | |
| ) | |
| fetch_btn = gr.Button("π₯ Fetch Emails", variant="primary", scale=1) | |
| fetch_message = gr.Textbox( | |
| label="Fetch Status", | |
| interactive=False | |
| ) | |
| emails_dataframe = gr.Dataframe( | |
| label="Emails", | |
| headers=["Subject", "From", "Date", "Attachments", "Preview"], | |
| interactive=False | |
| ) | |
| # Search Tab | |
| with gr.TabItem("π Search"): | |
| with gr.Row(): | |
| search_query_input = gr.Textbox( | |
| label="Search Query", | |
| placeholder="Enter keywords to search in subject, sender, or body", | |
| scale=3 | |
| ) | |
| search_btn = gr.Button("π Search", variant="primary", scale=1) | |
| with gr.Row(): | |
| search_folder_dropdown = gr.Dropdown( | |
| label="Search in Folder", | |
| choices=["INBOX"], | |
| value="INBOX", | |
| scale=2 | |
| ) | |
| search_limit_slider = gr.Slider( | |
| minimum=1, | |
| maximum=100, | |
| value=20, | |
| step=1, | |
| label="Max Results", | |
| scale=1 | |
| ) | |
| search_message = gr.Textbox( | |
| label="Search Status", | |
| interactive=False | |
| ) | |
| search_results_dataframe = gr.Dataframe( | |
| label="Search Results", | |
| headers=["Subject", "From", "Date", "Attachments", "Preview"], | |
| interactive=False | |
| ) | |
| # Help Tab | |
| with gr.TabItem("β Help"): | |
| gr.Markdown(""" | |
| ## How to Use This Email Client | |
| ### 1. **Connect to Your Email** | |
| - Enter your email address and password | |
| - For **Gmail**: You need to use an "App Password" instead of your regular password | |
| - Go to your Google Account settings | |
| - Enable 2-Step Verification | |
| - Generate an App Password for "Mail" | |
| - Choose your email provider or use "Custom" for other providers | |
| - Select IMAP (recommended) or POP3 protocol | |
| ### 2. **Fetch Emails** | |
| - Select a folder (INBOX is default) | |
| - Set the number of emails to fetch | |
| - Use search criteria for IMAP (e.g., "UNSEEN" for unread emails) | |
| ### 3. **Search Emails** | |
| - Enter keywords to search in subject, sender, or email body | |
| - Choose the folder to search in | |
| - Set maximum number of results to return | |
| ### **Supported Email Providers:** | |
| - Gmail (imap.gmail.com, pop.gmail.com) | |
| - Outlook/Hotmail (outlook.office365.com) | |
| - Yahoo (imap.mail.yahoo.com, pop.mail.yahoo.com) | |
| - iCloud (imap.mail.me.com, pop.mail.me.com) | |
| - Custom servers | |
| ### **Security Notes:** | |
| - Your credentials are only used for the current session | |
| - Use app-specific passwords when available | |
| - All connections use SSL/TLS encryption | |
| """) | |
| # Event handlers | |
| provider_dropdown.change( | |
| fn=update_custom_server_visibility, | |
| inputs=[provider_dropdown], | |
| outputs=[custom_server_input, custom_port_input] | |
| ) | |
| connect_btn.click( | |
| fn=connect_to_email, | |
| inputs=[email_input, password_input, provider_dropdown, protocol_dropdown, custom_server_input, custom_port_input], | |
| outputs=[connection_message, status_display, folder_dropdown] | |
| ) | |
| disconnect_btn.click( | |
| fn=disconnect_email, | |
| outputs=[connection_message, status_display, folder_dropdown] | |
| ) | |
| refresh_status_btn.click( | |
| fn=get_connection_status, | |
| outputs=[status_display] | |
| ) | |
| fetch_btn.click( | |
| fn=fetch_emails_ui, | |
| inputs=[folder_dropdown, limit_slider, search_criteria_input], | |
| outputs=[emails_dataframe, fetch_message] | |
| ) | |
| search_btn.click( | |
| fn=search_emails_ui, | |
| inputs=[search_query_input, search_folder_dropdown, search_limit_slider], | |
| outputs=[search_results_dataframe, search_message] | |
| ) | |
| # Update search folder dropdown when main folder dropdown changes | |
| folder_dropdown.change( | |
| fn=lambda x: gr.update(value=x), | |
| inputs=[folder_dropdown], | |
| outputs=[search_folder_dropdown] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| # Check if running in a deployment environment | |
| import os | |
| port = int(os.environ.get("PORT", 7860)) | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| show_error=True | |
| ) |