| import random |
| import string |
| import os |
| import time |
| import json |
| import requests |
| import pandas as pd |
| import mariadb |
| import gradio as gr |
| from sqlalchemy import create_engine, text, inspect |
| from typing import Optional |
|
|
| class temp_mail(): |
| def __init__(self, adress: str = None, password: str = None, token: str = None): |
| if adress is None or password is None: |
| adress, password = self.create_random_username_and_password() |
| self.adress = adress |
| self.password = password |
| self.token = token |
| |
| self.create_database() |
| |
| |
| if self.token is not None: |
| self.save_to_db() |
|
|
| def get_domains(self): |
| url = "https://api.mail.tm/domains" |
| response = requests.get(url=url) |
| if response.status_code == 200: |
| return json.loads(response.text) |
| else: |
| return "server error" |
|
|
| def create_random_username_and_password(self, domain=None, length=4): |
| if domain is None: |
| domain = self.get_domains() |
| domain = domain['hydra:member'][0]['domain'] |
| characters = string.ascii_letters + string.digits |
| characters_adress = string.ascii_letters |
| username = ''.join(random.choices(characters_adress, k=length)) |
| username = username.lower() |
| adress = username + '@' + domain |
| password = ''.join(random.choices(characters, k=length + 4)) |
| return (adress, password) |
|
|
| def get_account(self): |
| url = "https://api.mail.tm/accounts" |
| body = { |
| "address": self.adress, |
| "password": self.password |
| } |
| response = requests.post(url=url, json=body) |
| if response.status_code == 200: |
| return response.json() |
| else: |
| return "This username is already taken. Regenerate another one using create_random_username_and_password() and retry." |
|
|
| def get_token(self): |
| url = "https://api.mail.tm/token" |
| body = { |
| "address": self.adress, |
| "password": self.password |
| } |
| response = requests.post(url=url, json=body) |
| if response.status_code == 200: |
| self.token = response.json().get('token', None) |
| if self.token: |
| self.save_to_db() |
| return self.token |
| return "Unrecognized address and password." |
|
|
| def get_messages(self): |
| if self.token is None: |
| self.get_token() |
| url = "https://api.mail.tm/messages" |
| headers = {"Authorization": f"Bearer {self.token}"} |
| response = requests.get(url=url, headers=headers) |
| if response.status_code == 200: |
| return response.json() |
| else: |
| return "Unrecognized token." |
|
|
| def get_messages_more_precise(self): |
| messages_data = self.get_messages() |
| if isinstance(messages_data, str): |
| return [] |
| messages = messages_data.get('hydra:member', []) |
| return [(i.get('from', {}).get('address', 'not found'), |
| i.get('subject', 'not found'), |
| i.get('size', 'not found'), |
| i.get('intro', 'not found'), |
| i.get('id', 'not found')) for i in messages] |
|
|
| def download_message(self, message_id, base_path=os.getcwd()): |
| """Download a single message by ID into a folder named after the email address""" |
| if self.token is None: |
| self.get_token() |
| if isinstance(self.token, str) and self.token.startswith("Unrecognized"): |
| return f"Failed to get token: {self.token}" |
| |
| |
| email_folder = self.adress.replace('@', '_at_').replace('.', '_dot_') |
| folder_path = os.path.join(base_path, email_folder) |
| os.makedirs(folder_path, exist_ok=True) |
| |
| download_url = f"https://api.mail.tm/messages/{message_id}/download" |
| headers = {"Authorization": f"Bearer {self.token}"} |
| response = requests.get(url=download_url, headers=headers) |
| |
| if response.status_code == 200: |
| |
| filename = f"email_{time.strftime('%Y%m%d_%H%M%S')}_{message_id}.eml" |
| filepath = os.path.join(folder_path, filename) |
| |
| with open(filepath, 'wb') as f: |
| f.write(response.content) |
| return filepath |
| else: |
| return f"Failed to download message {message_id}: HTTP {response.status_code}" |
|
|
| def download_messages(self, base_path=os.getcwd()): |
| """Download all messages for the current account into a folder named after the email address""" |
| if self.token is None: |
| token_result = self.get_token() |
| if isinstance(token_result, str) and token_result.startswith("Unrecognized"): |
| return f"Failed to get token: {token_result}" |
| |
| |
| messages_data = self.get_messages() |
| if isinstance(messages_data, str): |
| return f"Failed to get messages: {messages_data}" |
| |
| messages = messages_data.get('hydra:member', []) |
| if not messages: |
| return "No messages found to download" |
| |
| |
| email_folder = self.adress.replace('@', '_at_').replace('.', '_dot_') |
| folder_path = os.path.join(base_path, email_folder) |
| os.makedirs(folder_path, exist_ok=True) |
| |
| |
| downloaded_files = [] |
| failed_downloads = [] |
| |
| for msg in messages: |
| msg_id = msg.get('id') |
| if not msg_id: |
| continue |
| |
| result = self.download_message(msg_id, base_path) |
| if result.startswith("Failed"): |
| failed_downloads.append(result) |
| else: |
| downloaded_files.append(result) |
| |
| |
| if failed_downloads: |
| return f"Downloaded {len(downloaded_files)} of {len(messages)} messages to {folder_path}. Errors: {'; '.join(failed_downloads)}" |
| else: |
| return f"Successfully downloaded {len(downloaded_files)} messages to {folder_path}" |
| |
| def create_database(self): |
| try: |
| conn = mariadb.connect( |
| host='localhost', |
| user='root', |
| password='anas' |
| ) |
| conn.cursor().execute("CREATE DATABASE IF NOT EXISTS temp_mail") |
| conn.close() |
|
|
| engine = self.get_engine() |
| inspector = inspect(engine) |
| |
| if not inspector.has_table('accounts'): |
| with engine.connect() as conn: |
| conn.execute(text(""" |
| CREATE TABLE accounts ( |
| address VARCHAR(255) PRIMARY KEY, |
| password VARCHAR(255) NOT NULL, |
| token TEXT NOT NULL, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """)) |
| conn.commit() |
| except Exception as e: |
| print(f"Database setup error: {e}") |
|
|
| def get_engine(self): |
| return create_engine( |
| "mariadb+mariadbconnector://root:anas@localhost/temp_mail", |
| pool_pre_ping=True |
| ) |
|
|
| def save_to_db(self): |
| if not self.token: |
| return |
|
|
| try: |
| engine = self.get_engine() |
| with engine.begin() as connection: |
| connection.execute( |
| text("DELETE FROM accounts WHERE address = :address"), |
| {"address": self.adress} |
| ) |
| connection.execute( |
| text(""" |
| INSERT INTO accounts (address, password, token) |
| VALUES (:address, :password, :token) |
| """), |
| { |
| "address": self.adress, |
| "password": self.password, |
| "token": self.token |
| } |
| ) |
| return f"Saved account: {self.adress}" |
| except Exception as e: |
| return f"Database save failed: {e}" |
|
|
| def retrieve_random_user(self, regex: Optional[str] = None) -> Optional[dict]: |
| try: |
| engine = self.get_engine() |
| with engine.connect() as conn: |
| query = """ |
| SELECT address, token, password |
| FROM accounts |
| """ |
| |
| params = {} |
| if regex: |
| query += " WHERE address REGEXP :regex" |
| params["regex"] = regex |
| |
| query += " ORDER BY RAND() LIMIT 1" |
| |
| result = conn.execute(text(query), params).fetchone() |
| |
| if result: |
| address, token, password = result |
| return { |
| "address": address, |
| "password": password, |
| "token": token |
| } |
| return None |
| except Exception as e: |
| print(f"Database retrieval error: {e}") |
| return None |
|
|
| def get_all_users(self): |
| try: |
| engine = self.get_engine() |
| with engine.connect() as conn: |
| query = "SELECT address, password FROM accounts ORDER BY created_at DESC" |
| results = conn.execute(text(query)).fetchall() |
| return [{"address": addr, "password": pwd} for addr, pwd in results] |
| except Exception as e: |
| print(f"Error retrieving users: {e}") |
| return [] |
|
|
| |
| def create_random_account(): |
| mail = temp_mail() |
| mail.get_account() |
| token = mail.get_token() |
| return mail.adress, mail.password, token, "Account created successfully!" |
|
|
| def use_existing_account(address, password): |
| if not address or not password: |
| return "", "", "", "Please provide both email address and password" |
| |
| mail = temp_mail(address, password) |
| token = mail.get_token() |
| |
| if token.startswith("Unrecognized"): |
| return address, password, "", "Failed to get token: Invalid credentials" |
| |
| return address, password, token, "Account authenticated successfully!" |
|
|
| def load_random_account(): |
| mail = temp_mail() |
| user = mail.retrieve_random_user() |
| if user: |
| return user["address"], user["password"], user["token"], "Loaded account from database" |
| else: |
| return "", "", "", "No accounts found in database" |
|
|
| def check_messages(address, password, token): |
| if not address or not password or not token: |
| return [], "Please provide address, password, and token" |
| |
| mail = temp_mail(address, password, token) |
| messages = mail.get_messages_more_precise() |
| |
| if not messages: |
| return [], "No messages found" |
| |
| |
| formatted_messages = [] |
| for msg in messages: |
| sender, subject, size, intro, msg_id = msg |
| formatted_messages.append([sender, subject, intro]) |
| |
| return formatted_messages, f"Found {len(formatted_messages)} messages" |
| |
| def download_all_messages(address, password, token, download_path): |
| if not address or not password: |
| return "Please provide email address and password" |
| |
| if not download_path: |
| |
| download_path = os.path.join(os.path.expanduser("~"), "Documents", "TempMail") |
| |
| |
| os.makedirs(download_path, exist_ok=True) |
| |
| mail = temp_mail(address, password, token) |
| result = mail.download_messages(download_path) |
| |
| |
| email_folder = address.replace('@', '_at_').replace('.', '_dot_') |
| folder_path = os.path.join(download_path, email_folder) |
| |
| return f"{result}\n\nFiles are organized in a folder named after your email address:\n{os.path.abspath(folder_path)}" |
|
|
| def load_accounts_list(): |
| mail = temp_mail() |
| try: |
| engine = mail.get_engine() |
| with engine.connect() as conn: |
| |
| query = "SELECT address, password, token FROM accounts ORDER BY created_at DESC" |
| results = conn.execute(text(query)).fetchall() |
| |
| if not results: |
| return [], "No accounts found in database" |
| |
| |
| formatted_accounts = [] |
| for row in results: |
| address, password, token = row |
| formatted_accounts.append([address, password, token]) |
| |
| return formatted_accounts, f"Found {len(formatted_accounts)} accounts" |
| except Exception as e: |
| return [], f"Error retrieving accounts: {e}" |
| |
| |
|
|
| |
| with gr.Blocks(title="Temporary Email Client") as app: |
| gr.Markdown("# Temporary Email Client") |
| gr.Markdown("Create temporary email accounts, check messages, and download emails.") |
| |
| with gr.Tab("Create/Use Account"): |
| with gr.Row(): |
| with gr.Column(): |
| gr.Markdown("### Create Random Account") |
| create_btn = gr.Button("Create New Random Account") |
| |
| with gr.Column(): |
| gr.Markdown("### Use Existing Account") |
| with gr.Row(): |
| input_address = gr.Textbox(label="Email Address") |
| input_password = gr.Textbox(label="Password") |
| use_existing_btn = gr.Button("Authenticate") |
| |
| with gr.Column(): |
| gr.Markdown("### Load From Database") |
| load_random_btn = gr.Button("Load Random Account") |
| |
| with gr.Row(): |
| output_address = gr.Textbox(label="Email Address") |
| output_password = gr.Textbox(label="Password") |
| output_token = gr.Textbox(label="Token") |
| |
| status_msg = gr.Textbox(label="Status") |
| |
| with gr.Tab("Check Messages"): |
| with gr.Row(): |
| msg_address = gr.Textbox(label="Email Address") |
| msg_password = gr.Textbox(label="Password") |
| msg_token = gr.Textbox(label="Token") |
| |
| check_msg_btn = gr.Button("Check Messages") |
| |
| messages_output = messages_output = gr.Dataframe( |
| headers=["From", "Subject", "Preview"], |
| label="Messages" |
| ) |
| |
| msg_status = gr.Textbox(label="Status") |
| |
| with gr.Tab("Download Messages"): |
| with gr.Row(): |
| dl_address = gr.Textbox(label="Email Address") |
| dl_password = gr.Textbox(label="Password") |
| dl_token = gr.Textbox(label="Token") |
| |
| dl_path = gr.Textbox( |
| label="Download Path", |
| placeholder="Leave empty to use Documents/TempMail folder", |
| info="Specify a directory where emails will be saved" |
| ) |
| |
| download_btn = gr.Button("Download All Messages") |
| dl_status = gr.Textbox(label="Status") |
| |
| with gr.Tab("Saved Accounts"): |
| list_accounts_btn = gr.Button("List Saved Accounts") |
| accounts_output = gr.Dataframe( |
| headers=["Address", "Password", "Token"], |
| label="Saved Accounts", |
| row_count=10 |
| ) |
| accounts_status = gr.Textbox(label="Status") |
| |
| |
| create_btn.click( |
| create_random_account, |
| outputs=[output_address, output_password, output_token, status_msg] |
| ) |
| |
| use_existing_btn.click( |
| use_existing_account, |
| inputs=[input_address, input_password], |
| outputs=[output_address, output_password, output_token, status_msg] |
| ) |
| |
| load_random_btn.click( |
| load_random_account, |
| outputs=[output_address, output_password, output_token, status_msg] |
| ) |
| |
| check_msg_btn.click( |
| check_messages, |
| inputs=[msg_address, msg_password, msg_token], |
| outputs=[messages_output, msg_status] |
| ) |
| |
| download_btn.click( |
| download_all_messages, |
| inputs=[dl_address, dl_password, dl_token, dl_path], |
| outputs=[dl_status] |
| ) |
| |
| list_accounts_btn.click( |
| load_accounts_list, |
| outputs=[accounts_output, accounts_status] |
| ) |
| |
| |
| def copy_to_msg_tab(address, password, token): |
| return address, password, token |
| |
| def copy_to_dl_tab(address, password, token): |
| return address, password, token |
|
|
| output_address.change( |
| copy_to_msg_tab, |
| inputs=[output_address, output_password, output_token], |
| outputs=[msg_address, msg_password, msg_token] |
| ) |
| |
| output_address.change( |
| copy_to_dl_tab, |
| inputs=[output_address, output_password, output_token], |
| outputs=[dl_address, dl_password, dl_token] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| app.launch() |
|
|