|
|
import csv |
|
|
from tempfile import NamedTemporaryFile |
|
|
import shutil |
|
|
import pandas as pd |
|
|
import source_code as sc |
|
|
from suggestion import suggest_email_domain |
|
|
import whois |
|
|
from popular_domains import emailDomains |
|
|
import streamlit as st |
|
|
from streamlit_extras.metric_cards import style_metric_cards |
|
|
|
|
|
st.set_page_config( |
|
|
page_title="Email verification", |
|
|
page_icon="✅", |
|
|
layout="centered", |
|
|
) |
|
|
|
|
|
def label_email(email): |
|
|
if not sc.is_valid_email(email): |
|
|
return "Invalid" |
|
|
if not sc.has_valid_mx_record(email.split('@')[1]): |
|
|
return "Invalid" |
|
|
if not sc.verify_email(email): |
|
|
return "Unknown" |
|
|
if sc.is_disposable(email.split('@')[1]): |
|
|
return "Risky" |
|
|
return "Valid" |
|
|
|
|
|
def label_emails(input_file): |
|
|
file_extension = input_file.name.split('.')[-1].lower() |
|
|
|
|
|
if file_extension == 'csv': |
|
|
df = process_csv(input_file) |
|
|
elif file_extension == 'xlsx': |
|
|
df = process_xlsx(input_file) |
|
|
elif file_extension == 'txt': |
|
|
df = process_txt(input_file) |
|
|
else: |
|
|
st.warning("Unsupported file format. Please provide a CSV, XLSX, or TXT file.") |
|
|
|
|
|
|
|
|
def process_csv(input_file): |
|
|
|
|
|
if input_file: |
|
|
if isinstance(input_file, str): |
|
|
df = pd.read_csv(input_file, header=None) |
|
|
else: |
|
|
df = pd.read_csv(input_file, header=None) |
|
|
|
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
email = row[0].strip() |
|
|
label = label_email(email) |
|
|
results.append([email, label]) |
|
|
|
|
|
|
|
|
result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
|
|
result_df.index = range(1, len(result_df) + 1) |
|
|
return result_df |
|
|
else: |
|
|
return pd.DataFrame(columns=['Email', 'Label']) |
|
|
|
|
|
def process_xlsx(input_file): |
|
|
df = pd.read_excel(input_file, header=None) |
|
|
results = [] |
|
|
|
|
|
for index, row in df.iterrows(): |
|
|
email = row[0].strip() |
|
|
label = label_email(email) |
|
|
results.append([email, label]) |
|
|
|
|
|
result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
|
|
result_df.index = range(1, len(result_df) + 1) |
|
|
|
|
|
|
|
|
st.dataframe(result_df) |
|
|
|
|
|
|
|
|
def process_txt(input_file): |
|
|
input_text = input_file.read().decode("utf-8").splitlines() |
|
|
|
|
|
|
|
|
results = [] |
|
|
|
|
|
for line in input_text: |
|
|
email = line.strip() |
|
|
label = label_email(email) |
|
|
results.append([email, label]) |
|
|
|
|
|
|
|
|
result_df = pd.DataFrame(results, columns=['Email', 'Label']) |
|
|
result_df.index = range(1, len(result_df) + 1) |
|
|
|
|
|
|
|
|
st.dataframe(result_df) |
|
|
|
|
|
def main(): |
|
|
with open('style.css') as f: |
|
|
st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True) |
|
|
|
|
|
st.title("Email Verification Tool", help="This tool verifies the validity of an email address.") |
|
|
st.info("The result may not be accurate. However, it has 90% accuracy.") |
|
|
|
|
|
t1, t2= st.tabs(["Single Email", "Bulk Email Processing"]) |
|
|
|
|
|
with t1: |
|
|
|
|
|
|
|
|
email = st.text_input("Enter an email address:") |
|
|
|
|
|
if st.button("Verify"): |
|
|
with st.spinner('Verifying...'): |
|
|
result = {} |
|
|
|
|
|
|
|
|
result['syntaxValidation'] = sc.is_valid_email(email) |
|
|
|
|
|
if result['syntaxValidation']: |
|
|
domain_part = email.split('@')[1] if '@' in email else '' |
|
|
|
|
|
if not domain_part: |
|
|
st.error("Invalid email format. Please enter a valid email address.") |
|
|
else: |
|
|
|
|
|
if not sc.has_valid_mx_record(domain_part): |
|
|
st.warning("Not valid: MX record not found.") |
|
|
suggested_domains = suggest_email_domain(domain_part, emailDomains) |
|
|
if suggested_domains: |
|
|
st.info("Suggested Domains:") |
|
|
for suggested_domain in suggested_domains: |
|
|
st.write(suggested_domain) |
|
|
else: |
|
|
st.warning("No suggested domains found.") |
|
|
else: |
|
|
|
|
|
result['MXRecord'] = sc.has_valid_mx_record(domain_part) |
|
|
|
|
|
|
|
|
if result['MXRecord']: |
|
|
result['smtpConnection'] = sc.verify_email(email) |
|
|
else: |
|
|
result['smtpConnection'] = False |
|
|
|
|
|
|
|
|
result['is Temporary'] = sc.is_disposable(domain_part) |
|
|
|
|
|
|
|
|
is_valid = ( |
|
|
result['syntaxValidation'] |
|
|
and result['MXRecord'] |
|
|
and result['smtpConnection'] |
|
|
and not result['is Temporary'] |
|
|
) |
|
|
|
|
|
st.markdown("**Result:**") |
|
|
|
|
|
|
|
|
col1, col2, col3 = st.columns(3) |
|
|
col1.metric(label="Syntax", value=result['syntaxValidation']) |
|
|
col2.metric(label="MxRecord", value=result['MXRecord']) |
|
|
col3.metric(label="Is Temporary", value=result['is Temporary']) |
|
|
|
|
|
|
|
|
|
|
|
if not result['smtpConnection']: |
|
|
st.warning("SMTP connection not established.") |
|
|
|
|
|
|
|
|
with st.expander("See Domain Information"): |
|
|
try: |
|
|
dm_info = whois.whois(domain_part) |
|
|
st.write("Registrar:", dm_info.registrar) |
|
|
st.write("Server:", dm_info.whois_server) |
|
|
st.write("Country:", dm_info.country) |
|
|
except: |
|
|
st.error("Domain information retrieval failed.") |
|
|
|
|
|
|
|
|
if is_valid: |
|
|
st.success(f"{email} is a Valid email") |
|
|
else: |
|
|
st.error(f"{email} is a Invalid email") |
|
|
if result['is Temporary']: |
|
|
st.text("It is a disposable email") |
|
|
|
|
|
with t2: |
|
|
|
|
|
st.header("Bulk Email Processing") |
|
|
input_file = st.file_uploader("Upload a CSV, XLSX, or TXT file", type=["csv", "xlsx", "txt"]) |
|
|
if input_file: |
|
|
st.write("Processing...") |
|
|
if input_file.type == 'text/plain': |
|
|
process_txt(input_file) |
|
|
else: |
|
|
df = process_csv(input_file) |
|
|
st.success("Processing completed. Displaying results:") |
|
|
st.dataframe(df) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|