# import os
# import re
# import fitz # PyMuPDF
# import tempfile
# import base64
# from datetime import datetime
# from gtts import gTTS
# import streamlit as st
# from transformers import pipeline
# from groq import Groq
# # ✅ Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface
# # ⚪ Access secrets securely from environment variables
# GROQ_API_KEY = os.getenv("GROQ_API_KEY")
# HF_TOKEN = os.getenv("HF_TOKEN")
# KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
# KAGGLE_KEY = os.getenv("KAGGLE_KEY")
# # ✅ Validate secrets
# if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]):
# st.error("❌ One or more required API keys are missing from the environment.")
# st.stop()
# # ✅ Initialize Groq client
# client = Groq(api_key=GROQ_API_KEY)
# # ✅ Load phishing detection pipeline from Hugging Face
# phishing_pipe = pipeline(
# "text-classification",
# model="ealvaradob/bert-finetuned-phishing",
# token=HF_TOKEN
# )
# # ✅ Language and role options
# language_choices = ["English", "Urdu", "French"]
# role_choices = ["Admin", "Procurement", "Logistics"]
# # ✅ Glossary terms
# GLOSSARY = {
# "phishing": "Phishing is a scam where attackers trick you into revealing personal information.",
# "malware": "Malicious software designed to harm or exploit systems.",
# "spam": "Unwanted or unsolicited messages.",
# "tone": "The emotional character of the message."
# }
# # ✅ Translations (demo dictionary-based)
# TRANSLATIONS = {
# "Phishing": {"Urdu": "فشنگ", "French": "Hameçonnage"},
# "Spam": {"Urdu": "سپیم", "French": "Courrier indésirable"},
# "Malware": {"Urdu": "میلویئر", "French": "Logiciel malveillant"},
# "Safe": {"Urdu": "محفوظ", "French": "Sûr"}
# }
# # ✅ In-memory history and audio
# if "history" not in st.session_state:
# st.session_state.history = []
# if "audio_summary" not in st.session_state:
# st.session_state.audio_summary = ""
# if "report_sent" not in st.session_state:
# st.session_state.report_sent = False
# # =======================
# # Streamlit UI
# # =======================
# st.set_page_config(page_title="ZeroPhish Gate", layout="wide")
# st.title("🛡️ ZeroPhish Gate")
# st.markdown("AI-powered phishing message detection and explanation.")
# # Input fields
# col1, col2 = st.columns([3, 1])
# with col1:
# text_input = st.text_area("✉️ Paste Suspicious Message", height=200)
# uploaded_file = st.file_uploader("📄 Upload PDF/TXT (optional)", type=["pdf", "txt"])
# with col2:
# language = st.selectbox("🌐 Preferred Language", language_choices)
# role = st.selectbox("🧑💼 Your Role", role_choices)
# analyze_btn = st.button("🔍 Analyze with AI")
# clear_btn = st.button("🗑️ Clear History")
# # =======================
# # Function Definitions
# # =======================
# def extract_text_from_file(file):
# if file is None:
# return ""
# ext = file.name.split(".")[-1].lower()
# if ext == "pdf":
# doc = fitz.open(stream=file.read(), filetype="pdf")
# return "\n".join(page.get_text() for page in doc)
# elif ext == "txt":
# return file.read().decode("utf-8")
# return ""
# def analyze_with_huggingface(text):
# try:
# result = phishing_pipe(text)
# label = result[0]['label']
# confidence = round(result[0]['score'] * 100, 2)
# threat_type = {
# "PHISHING": "Phishing",
# "SPAM": "Spam",
# "MALWARE": "Malware",
# "LEGITIMATE": "Safe"
# }.get(label.upper(), "Unknown")
# return label, confidence, threat_type
# except Exception as e:
# return "Error", 0, f"Error: {e}"
# def semantic_analysis(text):
# response = client.chat.completions.create(
# model="llama3-8b-8192",
# messages=[
# {"role": "system", "content": "You are a cybersecurity assistant."},
# {"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"}
# ]
# )
# raw = response.choices[0].message.content
# clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip()
# return clean
# def translate_label(threat_type):
# return TRANSLATIONS.get(threat_type, {}).get(language, threat_type)
# def text_to_speech(text):
# try:
# tts = gTTS(text=text, lang='en')
# with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
# tts.save(fp.name)
# return fp.name
# except Exception as e:
# st.error(f"❌ Audio generation error: {e}")
# return None
# def create_report(label, score, threat_type, explanation, text):
# ts = datetime.now().strftime("%Y%m%d_%H%M%S")
# filename = f"Zerophish_Report_{ts}.txt"
# report = f"""
# 🔍 AI Threat Detection Report
# Input Message:
# {text}
# Prediction: {label}
# Threat Type: {threat_type}
# Confidence: {score}%
# ---
# 🧠 Explanation:
# {explanation}
# """
# with open(filename, "w") as f:
# f.write(report)
# return filename
# def render_history():
# with st.expander("🕓 View Analysis History", expanded=True):
# for i, record in enumerate(reversed(st.session_state.history)):
# with st.container():
# st.markdown(f"**🔢 Entry #{len(st.session_state.history) - i}**")
# st.markdown(f"**📝 Input:** {record['input'][:100]}...")
# st.markdown(f"**🔐 Type:** {record['threat']} | **📊 Confidence:** {record['score']}%")
# st.markdown(f"**📖 Summary:** {record['summary'][:200]}...")
# st.markdown("---")
# # =======================
# # Run Analysis
# # =======================
# if clear_btn:
# st.session_state.history.clear()
# st.session_state.audio_summary = ""
# st.session_state.report_sent = False
# st.success("✅ History cleared!")
# if analyze_btn:
# st.session_state.report_sent = False
# combined_text = text_input
# if uploaded_file:
# extracted = extract_text_from_file(uploaded_file)
# combined_text += "\n" + extracted
# if not combined_text.strip():
# st.warning("⚠️ Please enter some text or upload a file to analyze.")
# else:
# label, score, threat_type = analyze_with_huggingface(combined_text)
# translated_threat = translate_label(threat_type)
# st.subheader("🔍 AI Threat Detection Result")
# st.markdown(f"**Prediction:** {label}")
# st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})")
# st.markdown(f"**Confidence:** {score}%")
# summary = ""
# if threat_type.lower() != "safe":
# with st.expander("🧠 Semantic Reanalysis by LLaMA"):
# summary = semantic_analysis(combined_text)
# st.write(summary)
# st.session_state.audio_summary = summary # Save for audio playback
# audio_path = text_to_speech(summary)
# if audio_path:
# with open(audio_path, "rb") as f:
# st.markdown("### 🔊 Audio Explanation")
# st.audio(f.read(), format="audio/mp3")
# os.remove(audio_path)
# # Save history
# st.session_state.history.append({
# "input": combined_text,
# "threat": threat_type,
# "score": score,
# "summary": summary
# })
# # Generate and offer download link
# if summary:
# report_path = create_report(label, score, threat_type, summary, combined_text)
# with open(report_path, "rb") as f:
# b64 = base64.b64encode(f.read()).decode()
# href = f'📄 Download Full Report'
# st.markdown(href, unsafe_allow_html=True)
# with st.expander("📜 Glossary Help"):
# for term, definition in GLOSSARY.items():
# st.markdown(f"**{term.capitalize()}**: {definition}")
# # ✅ Report to IT section - outside the expander and stable
# st.markdown("---")
# report_it = st.button("📤 Report to IT", key="report_it_btn")
# if report_it:
# st.session_state.report_sent = True
# if st.session_state.report_sent:
# st.success("📨 Report sent to IT successfully.")
# render_history()
# app.py
import os
import re
import fitz # PyMuPDF
import tempfile
import base64
from datetime import datetime
from gtts import gTTS
import streamlit as st
from transformers import pipeline
from groq import Groq
# ✅ Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface
# ⚪ Access secrets securely from environment variables
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
HF_TOKEN = os.getenv("HF_TOKEN")
KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME")
KAGGLE_KEY = os.getenv("KAGGLE_KEY")
# ✅ Validate secrets
if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]):
st.error("❌ One or more required API keys are missing from the environment.")
st.stop()
# ✅ Initialize Groq client
client = Groq(api_key=GROQ_API_KEY)
# ✅ Load phishing detection pipeline from Hugging Face
phishing_pipe = pipeline(
"text-classification",
model="ealvaradob/bert-finetuned-phishing",
token=HF_TOKEN
)
# ✅ Language and role options
language_choices = ["English", "Urdu", "French"]
role_choices = ["Admin", "Procurement", "Logistics"]
# ✅ Glossary terms
GLOSSARY = {
"phishing": "Phishing is a scam where attackers trick you into revealing personal information.",
"malware": "Malicious software designed to harm or exploit systems.",
"spam": "Unwanted or unsolicited messages.",
"tone": "The emotional character of the message."
}
# ✅ Translations (demo dictionary-based)
TRANSLATIONS = {
"Phishing": {"Urdu": "فشنگ", "French": "Hameçonnage"},
"Spam": {"Urdu": "سپیم", "French": "Courrier indésirable"},
"Malware": {"Urdu": "میلویئر", "French": "Logiciel malveillant"},
"Safe": {"Urdu": "محفوظ", "French": "Sûr"}
}
# ✅ In-memory history and audio
if "history" not in st.session_state:
st.session_state.history = []
if "audio_summary" not in st.session_state:
st.session_state.audio_summary = ""
if "report_sent" not in st.session_state:
st.session_state.report_sent = False
if "chat_active" not in st.session_state:
st.session_state.chat_active = False
if "text_input" not in st.session_state:
st.session_state.text_input = ""
if "uploaded_file" not in st.session_state:
st.session_state.uploaded_file = None
# =======================
# Streamlit UI
# =======================
st.set_page_config(page_title="ZeroPhish Gate", layout="wide")
st.title("🛡️ ZeroPhish Gate")
st.markdown("AI-powered phishing message detection and explanation.")
# ✅ New Chat button
if st.button("🆕 New Chat"):
st.session_state.chat_active = False
st.session_state.audio_summary = ""
st.session_state.report_sent = False
st.session_state.text_input = ""
st.session_state.uploaded_file = None
# Input fields
col1, col2 = st.columns([3, 1])
with col1:
st.session_state.text_input = st.text_area("✉️ Paste Suspicious Message", value=st.session_state.text_input, height=200)
st.session_state.uploaded_file = st.file_uploader("📄 Upload PDF/TXT (optional)", type=["pdf", "txt"])
with col2:
language = st.selectbox("🌐 Preferred Language", language_choices)
role = st.selectbox("🧑💼 Your Role", role_choices)
analyze_btn = st.button("🔍 Analyze with AI")
clear_btn = st.button("🗑️ Clear History")
# =======================
# Function Definitions
# =======================
def extract_text_from_file(file):
if file is None:
return ""
ext = file.name.split(".")[-1].lower()
if ext == "pdf":
doc = fitz.open(stream=file.read(), filetype="pdf")
return "\n".join(page.get_text() for page in doc)
elif ext == "txt":
return file.read().decode("utf-8")
return ""
def analyze_with_huggingface(text):
try:
result = phishing_pipe(text)
label = result[0]['label']
confidence = round(result[0]['score'] * 100, 2)
threat_type = {
"PHISHING": "Phishing",
"SPAM": "Spam",
"MALWARE": "Malware",
"LEGITIMATE": "Safe"
}.get(label.upper(), "Unknown")
return label, confidence, threat_type
except Exception as e:
return "Error", 0, f"Error: {e}"
def semantic_analysis(text):
response = client.chat.completions.create(
model="llama3-8b-8192",
messages=[
{"role": "system", "content": "You are a cybersecurity assistant."},
{"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"}
]
)
raw = response.choices[0].message.content
clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip()
return clean
def translate_label(threat_type):
return TRANSLATIONS.get(threat_type, {}).get(language, threat_type)
def text_to_speech(text):
try:
tts = gTTS(text=text, lang='en')
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
tts.save(fp.name)
return fp.name
except Exception as e:
st.error(f"❌ Audio generation error: {e}")
return None
def create_report(label, score, threat_type, explanation, text):
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"Zerophish_Report_{ts}.txt"
report = f"""
🔍 AI Threat Detection Report
Input Message:
{text}
Prediction: {label}
Threat Type: {threat_type}
Confidence: {score}%
---
🧠 Explanation:
{explanation}
"""
with open(filename, "w") as f:
f.write(report)
return filename
def render_history():
with st.expander("🕓 View Analysis History", expanded=True):
for i, record in enumerate(reversed(st.session_state.history)):
with st.container():
st.markdown(f"**🔢 Entry #{len(st.session_state.history) - i}**")
st.markdown(f"**📝 Input:** {record['input'][:100]}...")
st.markdown(f"**🔐 Type:** {record['threat']} | **📊 Confidence:** {record['score']}%")
st.markdown(f"**📖 Summary:** {record['summary'][:200]}...")
st.markdown("---")
# =======================
# Run Analysis
# =======================
if clear_btn:
st.session_state.history.clear()
st.session_state.audio_summary = ""
st.session_state.report_sent = False
st.success("✅ History cleared!")
if analyze_btn:
st.session_state.report_sent = False
st.session_state.chat_active = True
combined_text = st.session_state.text_input
if st.session_state.uploaded_file:
extracted = extract_text_from_file(st.session_state.uploaded_file)
combined_text += "\n" + extracted
if not combined_text.strip():
st.warning("⚠️ Please enter some text or upload a file to analyze.")
else:
label, score, threat_type = analyze_with_huggingface(combined_text)
translated_threat = translate_label(threat_type)
st.subheader("🔍 AI Threat Detection Result")
st.markdown(f"**Prediction:** {label}")
st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})")
st.markdown(f"**Confidence:** {score}%")
summary = ""
if threat_type.lower() != "safe":
with st.expander("🧠 Semantic Reanalysis by LLaMA"):
summary = semantic_analysis(combined_text)
st.write(summary)
st.session_state.audio_summary = summary # Save for audio playback
audio_path = text_to_speech(summary)
if audio_path:
with open(audio_path, "rb") as f:
st.markdown("### 🔊 Audio Explanation")
st.audio(f.read(), format="audio/mp3")
os.remove(audio_path)
# Save history
st.session_state.history.append({
"input": combined_text,
"threat": threat_type,
"score": score,
"summary": summary
})
# Generate and offer download link
if summary:
report_path = create_report(label, score, threat_type, summary, combined_text)
with open(report_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode()
href = f'📄 Download Full Report'
st.markdown(href, unsafe_allow_html=True)
with st.expander("📜 Glossary Help"):
for term, definition in GLOSSARY.items():
st.markdown(f"**{term.capitalize()}**: {definition}")
# ✅ Report to IT section - only visible after analysis
if st.session_state.chat_active:
st.markdown("---")
report_it = st.button("📤 Report to IT", key="report_it_btn")
if report_it:
st.session_state.report_sent = True
if st.session_state.report_sent:
st.success("📨 Report sent to IT successfully.")
render_history()