import streamlit as st import requests import json import time import re import random import string from datetime import datetime import html # Set page configuration st.set_page_config( page_title="Temp Mail Client", page_icon="✉️", layout="wide", initial_sidebar_state="expanded" ) # Custom CSS for styling st.markdown(""" """, unsafe_allow_html=True) # Initialize session state variables if 'provider' not in st.session_state: st.session_state.provider = "mail.tm" if 'domains' not in st.session_state: st.session_state.domains = [] if 'email' not in st.session_state: st.session_state.email = "" if 'password' not in st.session_state: st.session_state.password = "" if 'token' not in st.session_state: st.session_state.token = "" if 'account_id' not in st.session_state: st.session_state.account_id = "" if 'messages' not in st.session_state: st.session_state.messages = [] if 'selected_message' not in st.session_state: st.session_state.selected_message = None if 'message_content' not in st.session_state: st.session_state.message_content = None if 'last_refresh' not in st.session_state: st.session_state.last_refresh = datetime.now() # API configuration API_ENDPOINTS = { "mail.tm": { "base_url": "https://api.mail.tm", "mercure_url": "https://mercure.mail.tm/.well-known/mercure" }, "mail.gw": { "base_url": "https://api.mail.gw", "mercure_url": "https://api.mail.gw/.well-known/mercure" } } def get_base_url(): return API_ENDPOINTS[st.session_state.provider]["base_url"] def get_mercure_url(): return API_ENDPOINTS[st.session_state.provider]["mercure_url"] def get_domains(): """Fetch available domains from the API""" try: response = requests.get(f"{get_base_url()}/domains") if response.status_code == 200: data = response.json() st.session_state.domains = [domain["domain"] for domain in data["hydra:member"]] return st.session_state.domains else: st.error(f"Failed to fetch domains: {response.status_code}") return [] except Exception as e: st.error(f"Error fetching domains: {str(e)}") return [] def create_account(email, password): """Create a new email account""" try: payload = { "address": email, "password": password } response = requests.post( f"{get_base_url()}/accounts", json=payload ) if response.status_code == 201: data = response.json() st.session_state.account_id = data["id"] return True else: st.error(f"Failed to create account: {response.status_code} - {response.text}") return False except Exception as e: st.error(f"Error creating account: {str(e)}") return False def get_token(email, password): """Get authentication token""" try: payload = { "address": email, "password": password } response = requests.post( f"{get_base_url()}/token", json=payload ) if response.status_code == 200: data = response.json() st.session_state.token = data["token"] st.session_state.account_id = data["id"] return True else: st.error(f"Failed to get token: {response.status_code} - {response.text}") return False except Exception as e: st.error(f"Error getting token: {str(e)}") return False def get_account_info(): """Get account information""" if not st.session_state.token: return None try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.get( f"{get_base_url()}/me", headers=headers ) if response.status_code == 200: return response.json() else: st.error(f"Failed to get account info: {response.status_code}") return None except Exception as e: st.error(f"Error getting account info: {str(e)}") return None def get_messages(): """Get messages for the current account""" if not st.session_state.token: return [] try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.get( f"{get_base_url()}/messages", headers=headers ) if response.status_code == 200: data = response.json() st.session_state.messages = data["hydra:member"] return st.session_state.messages else: st.error(f"Failed to get messages: {response.status_code}") return [] except Exception as e: st.error(f"Error getting messages: {str(e)}") return [] def get_message_content(message_id): """Get detailed content of a specific message""" if not st.session_state.token: return None try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.get( f"{get_base_url()}/messages/{message_id}", headers=headers ) if response.status_code == 200: return response.json() else: st.error(f"Failed to get message content: {response.status_code}") return None except Exception as e: st.error(f"Error getting message content: {str(e)}") return None def mark_as_read(message_id): """Mark a message as read""" if not st.session_state.token: return False try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.patch( f"{get_base_url()}/messages/{message_id}", headers=headers ) if response.status_code == 200: # Update the message in the session state for i, msg in enumerate(st.session_state.messages): if msg["id"] == message_id: st.session_state.messages[i]["seen"] = True return True else: st.error(f"Failed to mark message as read: {response.status_code}") return False except Exception as e: st.error(f"Error marking message as read: {str(e)}") return False def delete_message(message_id): """Delete a message""" if not st.session_state.token: return False try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.delete( f"{get_base_url()}/messages/{message_id}", headers=headers ) if response.status_code == 204: # Remove the message from the session state st.session_state.messages = [msg for msg in st.session_state.messages if msg["id"] != message_id] if st.session_state.selected_message == message_id: st.session_state.selected_message = None st.session_state.message_content = None return True else: st.error(f"Failed to delete message: {response.status_code}") return False except Exception as e: st.error(f"Error deleting message: {str(e)}") return False def delete_account(): """Delete the current account""" if not st.session_state.token or not st.session_state.account_id: return False try: headers = {"Authorization": f"Bearer {st.session_state.token}"} response = requests.delete( f"{get_base_url()}/accounts/{st.session_state.account_id}", headers=headers ) if response.status_code == 204: # Clear session state st.session_state.email = "" st.session_state.password = "" st.session_state.token = "" st.session_state.account_id = "" st.session_state.messages = [] st.session_state.selected_message = None st.session_state.message_content = None return True else: st.error(f"Failed to delete account: {response.status_code}") return False except Exception as e: st.error(f"Error deleting account: {str(e)}") return False def format_date(date_str): """Format date string to a more readable format""" try: dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) now = datetime.now() # If today, show only time if dt.date() == now.date(): return dt.strftime("%H:%M") # If this year, show month and day elif dt.year == now.year: return dt.strftime("%b %d") # Otherwise show full date else: return dt.strftime("%Y-%m-%d") except: return date_str def generate_random_username(length=10): """Generate a random username for email""" letters = string.ascii_lowercase + string.digits return ''.join(random.choice(letters) for i in range(length)) def format_size(size_bytes): """Format file size in bytes to human-readable format""" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: return f"{size_bytes/1024:.1f} KB" else: return f"{size_bytes/(1024*1024):.1f} MB" def sanitize_html(html_content): """Basic sanitization of HTML content""" # Remove potentially dangerous tags and attributes # This is a very basic implementation - in production, use a proper HTML sanitizer dangerous_tags = ['script', 'iframe', 'object', 'embed'] for tag in dangerous_tags: html_content = re.sub(f'<{tag}.*?{tag}>', '', html_content, flags=re.DOTALL) html_content = re.sub(f'<{tag}.*?>', '', html_content, flags=re.DOTALL) # Remove on* attributes html_content = re.sub(r'on\w+=".*?"', '', html_content) html_content = re.sub(r"on\w+='.*?'", '', html_content) return html_content # Main application layout def main(): # Sidebar for account management with st.sidebar: st.markdown("
", unsafe_allow_html=True) # Main content area st.markdown("From: {display_from}
", unsafe_allow_html=True) # To to_addresses = [] for to in message.get("to", []): name = to.get("name", "") address = to.get("address", "") if name: to_addresses.append(f"{name} <{address}>") else: to_addresses.append(address) st.markdown(f"To: {', '.join(to_addresses)}
", unsafe_allow_html=True) # Date date_str = message.get("createdAt", "") if date_str: try: dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) formatted_date = dt.strftime("%a, %d %b %Y %H:%M:%S") st.markdown(f"Date: {formatted_date}
", unsafe_allow_html=True) except: st.markdown(f"Date: {date_str}
", unsafe_allow_html=True) # Attachments attachments = message.get("attachments", []) if attachments: st.markdown("{html.escape(text_content)}", unsafe_allow_html=True)
else:
st.markdown("(No content)
", unsafe_allow_html=True) st.markdown("Create a temporary email address to protect your privacy.