Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import imaplib | |
| import email | |
| from email.header import decode_header | |
| import torch | |
| from transformers import BertTokenizer, BertForSequenceClassification | |
| import re | |
| class EmailProcessor: | |
| def decode_email_content(content, default_charset='utf-8'): | |
| if isinstance(content, bytes): | |
| try: | |
| return content.decode(default_charset) | |
| except UnicodeDecodeError: | |
| try: | |
| return content.decode('iso-8859-1') | |
| except UnicodeDecodeError: | |
| return content.decode(default_charset, errors='ignore') | |
| return str(content) | |
| def clean_text(text): | |
| text = re.sub(r'<[^>]+>', '', text) | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def get_emails(email_address, password, imap_server, imap_port): | |
| try: | |
| imap = imaplib.IMAP4_SSL(imap_server, imap_port) | |
| imap.login(email_address, password) | |
| imap.select('INBOX') | |
| _, message_numbers = imap.search(None, 'ALL') | |
| emails = [] | |
| for num in message_numbers[0].split()[-5:]: | |
| _, msg_data = imap.fetch(num, '(RFC822)') | |
| email_body = msg_data[0][1] | |
| message = email.message_from_bytes(email_body) | |
| subject = decode_header(message["subject"])[0][0] | |
| if isinstance(subject, bytes): | |
| subject = EmailProcessor.decode_email_content(subject) | |
| if message.is_multipart(): | |
| content = '' | |
| for part in message.walk(): | |
| if part.get_content_type() == "text/plain": | |
| payload = part.get_payload(decode=True) | |
| if payload: | |
| charset = part.get_content_charset() or 'utf-8' | |
| content += EmailProcessor.decode_email_content(payload, charset) | |
| else: | |
| payload = message.get_payload(decode=True) | |
| if payload: | |
| charset = message.get_content_charset() or 'utf-8' | |
| content = EmailProcessor.decode_email_content(payload, charset) | |
| else: | |
| content = "" | |
| emails.append({ | |
| 'subject': subject, | |
| 'content': EmailProcessor.clean_text(content) | |
| }) | |
| imap.close() | |
| imap.logout() | |
| return emails, None | |
| except Exception as e: | |
| return None, str(e) | |
| class PhishingDetector: | |
| def __init__(self, model_path="./phishing_model"): | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.tokenizer = BertTokenizer.from_pretrained(model_path) | |
| self.model = BertForSequenceClassification.from_pretrained( | |
| model_path, | |
| num_labels=2 | |
| ).to(self.device) | |
| self.model.eval() | |
| def predict(self, text): | |
| cleaned_text = EmailProcessor.clean_text(text) | |
| inputs = self.tokenizer( | |
| cleaned_text, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True | |
| ) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| outputs = self.model(**inputs) | |
| probabilities = torch.nn.functional.softmax(outputs.logits, dim=1) | |
| return probabilities[0][1].item() | |
| # Initialize the app | |
| st.title("📧 Email Phishing Detector") | |
| st.write("Connect your email account to analyze messages for potential phishing attempts.") | |
| # Email configuration in sidebar | |
| with st.sidebar: | |
| st.header("Email Settings") | |
| email_address = st.text_input("Email Address", key="email_address_input") | |
| password = st.text_input("Password", type="password", key="password_input") | |
| imap_server = st.text_input("IMAP Server", value="imap.gmail.com", key="imap_server_input") | |
| imap_port = st.number_input("IMAP Port", value=993, key="imap_port_input") | |
| # Initialize the model using st.cache_resource | |
| def load_detector(): | |
| return PhishingDetector() | |
| try: | |
| detector = load_detector() | |
| model_loaded = True | |
| except Exception as e: | |
| st.error(f"Error loading model: {str(e)}") | |
| model_loaded = False | |
| # Add manual text analysis option | |
| st.markdown("### 📝 Manual Text Analysis") | |
| manual_text = st.text_area("Enter text to analyze:", height=100, key="manual_text_input") | |
| if st.button("Analyze Text", key="analyze_text_btn") and manual_text.strip(): | |
| with st.spinner("Analyzing text..."): | |
| phishing_score = detector.predict(manual_text) | |
| risk_color = "red" if phishing_score > 0.5 else "green" | |
| st.markdown(f"**Phishing Risk Score:** <span style='color:{risk_color}'>{phishing_score:.2%}</span>", unsafe_allow_html=True) | |
| if phishing_score > 0.8: | |
| st.error("⚠️ High Risk: This text shows strong indicators of being a phishing attempt!") | |
| elif phishing_score > 0.5: | |
| st.warning("⚠️ Medium Risk: This text shows some suspicious characteristics.") | |
| else: | |
| st.success("✅ Low Risk: This text appears to be legitimate.") | |
| st.markdown("### 📨 Email Analysis") | |
| if model_loaded and st.button("Analyze Emails", key="analyze_emails_btn"): | |
| if not email_address or not password: | |
| st.warning("Please enter your email credentials.") | |
| else: | |
| with st.spinner("Connecting to email..."): | |
| emails, error = EmailProcessor.get_emails(email_address, password, imap_server, imap_port) | |
| if error: | |
| st.error(f"Error connecting to email: {error}") | |
| elif emails: | |
| st.success("Successfully retrieved emails!") | |
| for i, email_data in enumerate(emails): | |
| with st.expander(f"Email {i+1}: {email_data['subject']}"): | |
| phishing_score = detector.predict(email_data['content']) | |
| risk_color = "red" if phishing_score > 0.5 else "green" | |
| st.markdown(f"**Phishing Risk Score:** <span style='color:{risk_color}'>{phishing_score:.2%}</span>", unsafe_allow_html=True) | |
| if phishing_score > 0.8: | |
| st.error("⚠️ High Risk: This email shows strong indicators of being a phishing attempt!") | |
| elif phishing_score > 0.5: | |
| st.warning("⚠️ Medium Risk: This email shows some suspicious characteristics.") | |
| else: | |
| st.success("✅ Low Risk: This email appears to be legitimate.") | |
| st.text_area("Email Content", email_data['content'], height=100, key=f"email_content_{i}") | |
| else: | |
| st.warning("No emails found in inbox.") | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown(""" | |
| ### Instructions | |
| 1. Enter your email credentials | |
| 2. For Gmail: | |
| - Use an App Password instead of your regular password | |
| - Enable 2FA and generate an App Password from Google Account settings | |
| 3. Click "Analyze Emails" to scan your recent emails | |
| """) | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown(""" | |
| ### About | |
| This application uses a BERT-based model to detect phishing attempts in emails. | |
| You can either: | |
| 1. Analyze your emails directly by connecting your email account | |
| 2. Manually input text to analyze for phishing content | |
| """) | |