Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Email Validator - Hugging Face Spaces (Gradio Version) | |
| Works perfectly on Hugging Face because Gradio is native | |
| """ | |
| import gradio as gr | |
| import socket | |
| import time | |
| import uuid | |
| from typing import Dict, List, Tuple | |
| import dns.resolver | |
| import smtplib | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| FROM_ADDRESS = 'verifier@yourdomain.com' | |
| SMTP_TIMEOUT = 15 | |
| RATE_LIMIT_SECONDS = 1.0 | |
| # Domain cache | |
| domain_cache = {} | |
| def is_valid_format(email: str) -> bool: | |
| """Basic email format validation""" | |
| try: | |
| return bool(email and '@' in email and email.count('@') == 1 and '.' in email.split('@')[1]) | |
| except: | |
| return False | |
| def resolve_mx_or_a(domain: str): | |
| """Return list of MX hosts or A records""" | |
| try: | |
| answers = dns.resolver.resolve(domain, 'MX') | |
| mx = sorted([(r.preference, str(r.exchange).rstrip('.')) for r in answers], key=lambda x: x[0]) | |
| return [h for _, h in mx] | |
| except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.NoNameservers, dns.exception.Timeout): | |
| try: | |
| a = dns.resolver.resolve(domain, 'A') | |
| return [str(r) for r in a] | |
| except Exception: | |
| return [] | |
| def generate_random_localpart(): | |
| """Generate random email local part for catch-all detection""" | |
| return 'probe-' + uuid.uuid4().hex[:12] | |
| def smtp_probe_recipient(mx_host: str, from_addr: str, recipient: str): | |
| """SMTP RCPT TO probe""" | |
| try: | |
| with smtplib.SMTP(host=mx_host, port=25, timeout=SMTP_TIMEOUT) as smtp: | |
| smtp.set_debuglevel(0) | |
| smtp.ehlo_or_helo_if_needed() | |
| if smtp.has_extn('starttls'): | |
| try: | |
| smtp.starttls() | |
| smtp.ehlo() | |
| except Exception: | |
| pass | |
| smtp.mail(from_addr) | |
| rcpt_code, rcpt_msg = smtp.rcpt(recipient) | |
| if isinstance(rcpt_msg, bytes): | |
| rcpt_msg = rcpt_msg.decode(errors='ignore') | |
| return True, int(rcpt_code), str(rcpt_msg), None | |
| except smtplib.SMTPRecipientsRefused as e: | |
| try: | |
| rec = e.recipients.get(recipient) | |
| if rec: | |
| code, resp = rec | |
| return True, int(code), resp.decode() if isinstance(resp, bytes) else str(resp), None | |
| except Exception: | |
| pass | |
| return False, None, None, 'SMTPRecipientsRefused' | |
| except (smtplib.SMTPConnectError, smtplib.SMTPServerDisconnected, socket.timeout, ConnectionRefusedError, OSError): | |
| return False, None, None, 'NETWORK_ERROR' | |
| except Exception: | |
| return False, None, None, 'ERROR' | |
| def decide_outcome(email, mx_host, rcpt_code, catch_all_flag, probe_error): | |
| """Determine if email is valid or invalid""" | |
| if not is_valid_format(email): | |
| return 'INVALID', 'INVALID_FORMAT' | |
| if not mx_host: | |
| return 'INVALID', 'UNDELIVERABLE_NO_MX' | |
| if probe_error or rcpt_code is None: | |
| return 'INVALID', 'PROBE_FAILED_NETWORK' | |
| if 500 <= rcpt_code <= 599: | |
| return 'INVALID', 'MAILBOX_DOES_NOT_EXIST' | |
| if 200 <= rcpt_code <= 299: | |
| if catch_all_flag: | |
| return 'VALID', 'UNKNOWN_CATCHALL' | |
| else: | |
| return 'VALID', 'MAILBOX_EXISTS' | |
| if 400 <= rcpt_code <= 499: | |
| return 'INVALID', 'TEMPORARY_ERROR' | |
| return 'INVALID', 'UNKNOWN_STATUS' | |
| def validate_email_function(email: str) -> Dict: | |
| """Validate a single email""" | |
| email = email.strip().lower() | |
| if not is_valid_format(email): | |
| return { | |
| 'Email': email, | |
| 'Status': '❌ INVALID', | |
| 'Reason': 'Invalid format', | |
| 'Valid': 'No' | |
| } | |
| domain = email.split('@')[1].lower() | |
| # Resolve MX (cached per domain) | |
| if domain not in domain_cache: | |
| hosts = resolve_mx_or_a(domain) | |
| domain_cache[domain] = {'hosts': hosts, 'catch_all': None} | |
| hosts = domain_cache[domain]['hosts'] | |
| if not hosts: | |
| return { | |
| 'Email': email, | |
| 'Status': '❌ INVALID', | |
| 'Reason': 'No MX records', | |
| 'Valid': 'No' | |
| } | |
| mx_host = hosts[0] | |
| time.sleep(RATE_LIMIT_SECONDS) | |
| # Catch-all detection (once per domain) | |
| if domain_cache[domain]['catch_all'] is None: | |
| random_local = generate_random_localpart() | |
| random_email = f"{random_local}@{domain}" | |
| ok, code, msg, err = smtp_probe_recipient(mx_host, FROM_ADDRESS, random_email) | |
| catch_all = False | |
| if ok and code and 200 <= code <= 299: | |
| catch_all = True | |
| domain_cache[domain]['catch_all'] = catch_all | |
| time.sleep(RATE_LIMIT_SECONDS) | |
| catch_all_flag = domain_cache[domain]['catch_all'] | |
| # Probe actual recipient | |
| ok, code, msg, err = smtp_probe_recipient(mx_host, FROM_ADDRESS, email) | |
| status, reason = decide_outcome( | |
| email=email, | |
| mx_host=mx_host, | |
| rcpt_code=code, | |
| catch_all_flag=catch_all_flag, | |
| probe_error=(not ok or err is not None) | |
| ) | |
| is_valid = status == 'VALID' | |
| status_display = '✅ VALID' if is_valid else '❌ INVALID' | |
| valid_text = 'Yes' if is_valid else 'No' | |
| return { | |
| 'Email': email, | |
| 'Status': status_display, | |
| 'Reason': reason, | |
| 'Valid': valid_text | |
| } | |
| def validate_batch(emails_text: str) -> str: | |
| """Validate multiple emails from text input""" | |
| emails = [e.strip() for e in emails_text.split('\n') if e.strip()] | |
| if not emails: | |
| return "Please enter at least one email" | |
| results = [] | |
| valid_count = 0 | |
| for email in emails: | |
| result = validate_email_function(email) | |
| results.append(result) | |
| if result['Valid'] == 'Yes': | |
| valid_count += 1 | |
| # Format output | |
| output = f"**Total: {len(emails)} | Valid: {valid_count} | Invalid: {len(emails) - valid_count}**\n\n" | |
| output += "| Email | Status | Reason | Valid |\n" | |
| output += "|-------|--------|--------|-------|\n" | |
| for r in results: | |
| output += f"| {r['Email']} | {r['Status']} | {r['Reason']} | {r['Valid']} |\n" | |
| return output | |
| # Create Gradio Interface | |
| with gr.Blocks(title="Email Validator") as demo: | |
| gr.Markdown("# 📧 Email Validator API") | |
| gr.Markdown("Validate emails using DNS MX records and SMTP probing") | |
| with gr.Tab("Single Email"): | |
| with gr.Row(): | |
| email_input = gr.Textbox( | |
| label="Email Address", | |
| placeholder="Enter email to validate", | |
| value="test@gmail.com" | |
| ) | |
| validate_btn = gr.Button("Validate", variant="primary", size="lg") | |
| with gr.Row(): | |
| email_output = gr.Textbox(label="Email", interactive=False) | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| reason_output = gr.Textbox(label="Reason", interactive=False) | |
| valid_output = gr.Textbox(label="Valid", interactive=False) | |
| validate_btn.click( | |
| fn=validate_email_function, | |
| inputs=email_input, | |
| outputs=[email_output, status_output, reason_output, valid_output] | |
| ) | |
| with gr.Tab("Batch Validation"): | |
| with gr.Row(): | |
| batch_input = gr.Textbox( | |
| label="Emails (one per line)", | |
| placeholder="test1@gmail.com\ntest2@yahoo.com\ninvalid@test.com", | |
| lines=10 | |
| ) | |
| batch_btn = gr.Button("Validate Batch", variant="primary", size="lg") | |
| batch_output = gr.Markdown(label="Results") | |
| batch_btn.click( | |
| fn=validate_batch, | |
| inputs=batch_input, | |
| outputs=batch_output | |
| ) | |
| with gr.Tab("API Docs"): | |
| gr.Markdown(""" | |
| ## API Endpoints | |
| ### Single Email Validation | |
| - **URL:** `/api/validate` | |
| - **Method:** POST | |
| - **Payload:** `{"email": "test@gmail.com"}` | |
| ### Batch Validation | |
| - **URL:** `/api/batch` | |
| - **Method:** POST | |
| - **Payload:** `{"emails": ["test1@gmail.com", "test2@yahoo.com"]}` | |
| ## Response | |
| ``` | |
| { | |
| "Email": "test@gmail.com", | |
| "Status": "✅ VALID", | |
| "Reason": "MAILBOX_EXISTS", | |
| "Valid": "Yes" | |
| } | |
| ``` | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) | |