| import streamlit as st |
| import email |
| from email.policy import default |
| import re |
| import pickle |
| from nltk.tokenize import word_tokenize |
| from nltk.corpus import stopwords |
| from nltk.stem import WordNetLemmatizer |
|
|
|
|
|
|
| class HeaderAnalyzer: |
| def __init__(self): |
| pass |
|
|
| def analyze_header(self, header): |
| |
| sender = header.get("From") |
| subject = header.get("Subject") |
| to = header.get("To") |
| date = header.get("Date") |
| |
|
|
| |
| spam_score = self.calculate_spam_score(header) |
|
|
| |
| return { |
| "sender": sender, |
| "subject": subject, |
| "to": to, |
| "date": date, |
| "spam_score": spam_score |
| |
| } |
|
|
| def calculate_spam_score(self, header): |
| |
| spam_score = 0 |
|
|
| |
| if header.get("X-Spam-Score"): |
| spam_score += float(header.get("X-Spam-Score")) |
| if header.get("X-Spam-Flag"): |
| spam_flag = header.get("X-Spam-Flag") |
| if spam_flag.lower() == "yes": |
| spam_score += 1 |
|
|
| return spam_score |
|
|
|
|
| class SpamDetector: |
| def __init__(self): |
| |
| with open("verdict/email_subj_model.pkl", "rb") as f: |
| self.subj_model = pickle.load(f) |
|
|
| with open("verdict/phishing.pkl", "rb") as f: |
| self.url_model = pickle.load(f) |
|
|
| def predict_subject(self, subject): |
| |
| processed_subject = preprocess_subject(subject) |
|
|
| |
| subject_prediction = self.subj_model.predict(processed_subject) |
|
|
| |
| return subject_prediction |
|
|
| def predict_url(self, url): |
| |
| processed_url = preprocess_url(url) |
|
|
| |
| url_prediction = self.url_model.predict(processed_url) |
|
|
| |
| return url_prediction |
|
|
| def preprocess_subject(subject): |
| |
| subject = subject.lower() |
|
|
| |
| subject = re.sub(r"[^a-zA-Z]", " ", subject) |
|
|
| |
| tokens = word_tokenize(subject) |
|
|
| |
| stop_words = set(stopwords.words("english")) |
| tokens = [token for token in tokens if token not in stop_words] |
|
|
| |
| lemmatizer = WordNetLemmatizer() |
| tokens = [lemmatizer.lemmatize(token) for token in tokens] |
|
|
| |
| processed_subject = " ".join(tokens) |
|
|
| return processed_subject |
|
|
|
|
| def preprocess_url(url): |
| |
| url = url.lower() |
|
|
| |
| url = re.sub(r"[^a-zA-Z0-9]", " ", url) |
|
|
| |
| tokens = url.split() |
|
|
| |
| stop_words = set(stopwords.words("english")) |
| tokens = [token for token in tokens if token not in stop_words] |
|
|
| |
| processed_url = " ".join(tokens) |
|
|
| return processed_url |
|
|
|
|
|
|
| def home(uploaded_file): |
| if uploaded_file: |
| st.header('Begin exploring the data using the menu on the left') |
| else: |
| st.header('To begin, please upload an EML file') |
|
|
|
|
| def extract_subject(eml): |
| with open(eml, 'rb') as f: |
| msg = email.message_from_binary_file(f, policy=default) |
| subject = msg['Subject'] |
| return subject |
|
|
|
|
| def extract_attachments(eml): |
| with open(eml, 'rb') as f: |
| msg = email.message_from_binary_file(f, policy=default) |
| attachments = [] |
| urls = [] |
| for part in msg.iter_attachments(): |
| filename = part.get_filename() |
| if filename: |
| attachments.append(filename) |
| if part.get_content_type().startswith("text/"): |
| content = part.get_content() |
| urls.extend(re.findall(r'(https?://\S+)', content)) |
| return attachments, urls |
|
|
|
|
| def extract_headers(eml): |
| with open(eml, 'rb') as f: |
| msg = email.message_from_binary_file(f, policy=default) |
| headers = {} |
| for key, value in msg.items(): |
| headers[key] = value |
| return headers |
|
|
|
|
| |
| st.title('Email Phishing Explorer') |
| st.text('This is a web app to allow exploration of phishing emails') |
|
|
| |
| st.sidebar.title('Sidebar') |
| upload_file = st.sidebar.file_uploader('Upload an EML file') |
|
|
| |
| st.sidebar.title('Navigation') |
| options = st.sidebar.radio('Select what you want to display:', |
| ['Home', 'Email Subject', 'Email Attachments', 'Email Headers']) |
|
|
| |
| if upload_file is not None: |
| eml_path = 'uploaded.eml' |
| with open(eml_path, 'wb') as f: |
| f.write(upload_file.read()) |
|
|
| |
| header_analyzer = HeaderAnalyzer() |
| spam_detector = SpamDetector() |
|
|
| |
| if options == 'Home': |
| home(upload_file) |
| elif options == 'Email Subject': |
| if upload_file is not None: |
| subject = extract_subject(eml_path) |
| st.header('Email Subject Verdict') |
| st.write(f'Subject: {subject}') |
| subject_analysis = header_analyzer.analyze_header({'Subject': subject}) |
| st.write(f'Spam Score: {subject_analysis["spam_score"]}') |
| else: |
| st.warning('Please upload an EML file first.') |
| elif options == 'Email Attachments': |
| if upload_file is not None: |
| attachments, urls = extract_attachments(eml_path) |
| st.header('Email Attachments Verdict') |
| if attachments: |
| st.write('Attachments:') |
| for attachment in attachments: |
| st.write(attachment) |
| else: |
| st.write('No attachments found.') |
|
|
| if urls: |
| st.write('URLs in Attachments:') |
| for url in urls: |
| st.write(url) |
| else: |
| st.write('No URLs found in attachments.') |
| else: |
| st.warning('Please upload an EML file first.') |
|
|
| elif options == 'Email Headers': |
| if upload_file is not None: |
| headers = extract_headers(eml_path) |
| st.header('Email Headers Verdict') |
| if headers: |
| st.write('Headers:') |
| header_analysis = header_analyzer.analyze_header(headers) |
| for key, value in headers.items(): |
| st.write(f'{key}: {value}') |
| st.write(f'Spam Score: {header_analysis["spam_score"]}') |
| else: |
| st.write('No headers found.') |
| else: |
| st.warning('Please upload an EML file first.') |
|
|