# import os # import re # import fitz # PyMuPDF # import tempfile # import base64 # from datetime import datetime # from gtts import gTTS # import streamlit as st # from transformers import pipeline # from groq import Groq # # ✅ Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface # # ⚪ Access secrets securely from environment variables # GROQ_API_KEY = os.getenv("GROQ_API_KEY") # HF_TOKEN = os.getenv("HF_TOKEN") # KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME") # KAGGLE_KEY = os.getenv("KAGGLE_KEY") # # ✅ Validate secrets # if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]): # st.error("❌ One or more required API keys are missing from the environment.") # st.stop() # # ✅ Initialize Groq client # client = Groq(api_key=GROQ_API_KEY) # # ✅ Load phishing detection pipeline from Hugging Face # phishing_pipe = pipeline( # "text-classification", # model="ealvaradob/bert-finetuned-phishing", # token=HF_TOKEN # ) # # ✅ Language and role options # language_choices = ["English", "Urdu", "French"] # role_choices = ["Admin", "Procurement", "Logistics"] # # ✅ Glossary terms # GLOSSARY = { # "phishing": "Phishing is a scam where attackers trick you into revealing personal information.", # "malware": "Malicious software designed to harm or exploit systems.", # "spam": "Unwanted or unsolicited messages.", # "tone": "The emotional character of the message." # } # # ✅ Translations (demo dictionary-based) # TRANSLATIONS = { # "Phishing": {"Urdu": "فشنگ", "French": "Hameçonnage"}, # "Spam": {"Urdu": "سپیم", "French": "Courrier indésirable"}, # "Malware": {"Urdu": "میلویئر", "French": "Logiciel malveillant"}, # "Safe": {"Urdu": "محفوظ", "French": "Sûr"} # } # # ✅ In-memory history and audio # if "history" not in st.session_state: # st.session_state.history = [] # if "audio_summary" not in st.session_state: # st.session_state.audio_summary = "" # if "report_sent" not in st.session_state: # st.session_state.report_sent = False # # ======================= # # Streamlit UI # # ======================= # st.set_page_config(page_title="ZeroPhish Gate", layout="wide") # st.title("🛡️ ZeroPhish Gate") # st.markdown("AI-powered phishing message detection and explanation.") # # Input fields # col1, col2 = st.columns([3, 1]) # with col1: # text_input = st.text_area("✉️ Paste Suspicious Message", height=200) # uploaded_file = st.file_uploader("📄 Upload PDF/TXT (optional)", type=["pdf", "txt"]) # with col2: # language = st.selectbox("🌐 Preferred Language", language_choices) # role = st.selectbox("🧑‍💼 Your Role", role_choices) # analyze_btn = st.button("🔍 Analyze with AI") # clear_btn = st.button("🗑️ Clear History") # # ======================= # # Function Definitions # # ======================= # def extract_text_from_file(file): # if file is None: # return "" # ext = file.name.split(".")[-1].lower() # if ext == "pdf": # doc = fitz.open(stream=file.read(), filetype="pdf") # return "\n".join(page.get_text() for page in doc) # elif ext == "txt": # return file.read().decode("utf-8") # return "" # def analyze_with_huggingface(text): # try: # result = phishing_pipe(text) # label = result[0]['label'] # confidence = round(result[0]['score'] * 100, 2) # threat_type = { # "PHISHING": "Phishing", # "SPAM": "Spam", # "MALWARE": "Malware", # "LEGITIMATE": "Safe" # }.get(label.upper(), "Unknown") # return label, confidence, threat_type # except Exception as e: # return "Error", 0, f"Error: {e}" # def semantic_analysis(text): # response = client.chat.completions.create( # model="llama3-8b-8192", # messages=[ # {"role": "system", "content": "You are a cybersecurity assistant."}, # {"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"} # ] # ) # raw = response.choices[0].message.content # clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip() # return clean # def translate_label(threat_type): # return TRANSLATIONS.get(threat_type, {}).get(language, threat_type) # def text_to_speech(text): # try: # tts = gTTS(text=text, lang='en') # with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: # tts.save(fp.name) # return fp.name # except Exception as e: # st.error(f"❌ Audio generation error: {e}") # return None # def create_report(label, score, threat_type, explanation, text): # ts = datetime.now().strftime("%Y%m%d_%H%M%S") # filename = f"Zerophish_Report_{ts}.txt" # report = f""" # 🔍 AI Threat Detection Report # Input Message: # {text} # Prediction: {label} # Threat Type: {threat_type} # Confidence: {score}% # --- # 🧠 Explanation: # {explanation} # """ # with open(filename, "w") as f: # f.write(report) # return filename # def render_history(): # with st.expander("🕓 View Analysis History", expanded=True): # for i, record in enumerate(reversed(st.session_state.history)): # with st.container(): # st.markdown(f"**🔢 Entry #{len(st.session_state.history) - i}**") # st.markdown(f"**📝 Input:** {record['input'][:100]}...") # st.markdown(f"**🔐 Type:** {record['threat']} | **📊 Confidence:** {record['score']}%") # st.markdown(f"**📖 Summary:** {record['summary'][:200]}...") # st.markdown("---") # # ======================= # # Run Analysis # # ======================= # if clear_btn: # st.session_state.history.clear() # st.session_state.audio_summary = "" # st.session_state.report_sent = False # st.success("✅ History cleared!") # if analyze_btn: # st.session_state.report_sent = False # combined_text = text_input # if uploaded_file: # extracted = extract_text_from_file(uploaded_file) # combined_text += "\n" + extracted # if not combined_text.strip(): # st.warning("⚠️ Please enter some text or upload a file to analyze.") # else: # label, score, threat_type = analyze_with_huggingface(combined_text) # translated_threat = translate_label(threat_type) # st.subheader("🔍 AI Threat Detection Result") # st.markdown(f"**Prediction:** {label}") # st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})") # st.markdown(f"**Confidence:** {score}%") # summary = "" # if threat_type.lower() != "safe": # with st.expander("🧠 Semantic Reanalysis by LLaMA"): # summary = semantic_analysis(combined_text) # st.write(summary) # st.session_state.audio_summary = summary # Save for audio playback # audio_path = text_to_speech(summary) # if audio_path: # with open(audio_path, "rb") as f: # st.markdown("### 🔊 Audio Explanation") # st.audio(f.read(), format="audio/mp3") # os.remove(audio_path) # # Save history # st.session_state.history.append({ # "input": combined_text, # "threat": threat_type, # "score": score, # "summary": summary # }) # # Generate and offer download link # if summary: # report_path = create_report(label, score, threat_type, summary, combined_text) # with open(report_path, "rb") as f: # b64 = base64.b64encode(f.read()).decode() # href = f'📄 Download Full Report' # st.markdown(href, unsafe_allow_html=True) # with st.expander("📜 Glossary Help"): # for term, definition in GLOSSARY.items(): # st.markdown(f"**{term.capitalize()}**: {definition}") # # ✅ Report to IT section - outside the expander and stable # st.markdown("---") # report_it = st.button("📤 Report to IT", key="report_it_btn") # if report_it: # st.session_state.report_sent = True # if st.session_state.report_sent: # st.success("📨 Report sent to IT successfully.") # render_history() # app.py import os import re import fitz # PyMuPDF import tempfile import base64 from datetime import datetime from gtts import gTTS import streamlit as st from transformers import pipeline from groq import Groq # ✅ Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface # ⚪ Access secrets securely from environment variables GROQ_API_KEY = os.getenv("GROQ_API_KEY") HF_TOKEN = os.getenv("HF_TOKEN") KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME") KAGGLE_KEY = os.getenv("KAGGLE_KEY") # ✅ Validate secrets if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]): st.error("❌ One or more required API keys are missing from the environment.") st.stop() # ✅ Initialize Groq client client = Groq(api_key=GROQ_API_KEY) # ✅ Load phishing detection pipeline from Hugging Face phishing_pipe = pipeline( "text-classification", model="ealvaradob/bert-finetuned-phishing", token=HF_TOKEN ) # ✅ Language and role options language_choices = ["English", "Urdu", "French"] role_choices = ["Admin", "Procurement", "Logistics"] # ✅ Glossary terms GLOSSARY = { "phishing": "Phishing is a scam where attackers trick you into revealing personal information.", "malware": "Malicious software designed to harm or exploit systems.", "spam": "Unwanted or unsolicited messages.", "tone": "The emotional character of the message." } # ✅ Translations (demo dictionary-based) TRANSLATIONS = { "Phishing": {"Urdu": "فشنگ", "French": "Hameçonnage"}, "Spam": {"Urdu": "سپیم", "French": "Courrier indésirable"}, "Malware": {"Urdu": "میلویئر", "French": "Logiciel malveillant"}, "Safe": {"Urdu": "محفوظ", "French": "Sûr"} } # ✅ In-memory history and audio if "history" not in st.session_state: st.session_state.history = [] if "audio_summary" not in st.session_state: st.session_state.audio_summary = "" if "report_sent" not in st.session_state: st.session_state.report_sent = False if "chat_active" not in st.session_state: st.session_state.chat_active = False if "text_input" not in st.session_state: st.session_state.text_input = "" if "uploaded_file" not in st.session_state: st.session_state.uploaded_file = None # ======================= # Streamlit UI # ======================= st.set_page_config(page_title="ZeroPhish Gate", layout="wide") st.title("🛡️ ZeroPhish Gate") st.markdown("AI-powered phishing message detection and explanation.") # ✅ New Chat button if st.button("🆕 New Chat"): st.session_state.chat_active = False st.session_state.audio_summary = "" st.session_state.report_sent = False st.session_state.text_input = "" st.session_state.uploaded_file = None # Input fields col1, col2 = st.columns([3, 1]) with col1: st.session_state.text_input = st.text_area("✉️ Paste Suspicious Message", value=st.session_state.text_input, height=200) st.session_state.uploaded_file = st.file_uploader("📄 Upload PDF/TXT (optional)", type=["pdf", "txt"]) with col2: language = st.selectbox("🌐 Preferred Language", language_choices) role = st.selectbox("🧑‍💼 Your Role", role_choices) analyze_btn = st.button("🔍 Analyze with AI") clear_btn = st.button("🗑️ Clear History") # ======================= # Function Definitions # ======================= def extract_text_from_file(file): if file is None: return "" ext = file.name.split(".")[-1].lower() if ext == "pdf": doc = fitz.open(stream=file.read(), filetype="pdf") return "\n".join(page.get_text() for page in doc) elif ext == "txt": return file.read().decode("utf-8") return "" def analyze_with_huggingface(text): try: result = phishing_pipe(text) label = result[0]['label'] confidence = round(result[0]['score'] * 100, 2) threat_type = { "PHISHING": "Phishing", "SPAM": "Spam", "MALWARE": "Malware", "LEGITIMATE": "Safe" }.get(label.upper(), "Unknown") return label, confidence, threat_type except Exception as e: return "Error", 0, f"Error: {e}" def semantic_analysis(text): response = client.chat.completions.create( model="llama3-8b-8192", messages=[ {"role": "system", "content": "You are a cybersecurity assistant."}, {"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"} ] ) raw = response.choices[0].message.content clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip() return clean def translate_label(threat_type): return TRANSLATIONS.get(threat_type, {}).get(language, threat_type) def text_to_speech(text): try: tts = gTTS(text=text, lang='en') with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: tts.save(fp.name) return fp.name except Exception as e: st.error(f"❌ Audio generation error: {e}") return None def create_report(label, score, threat_type, explanation, text): ts = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"Zerophish_Report_{ts}.txt" report = f""" 🔍 AI Threat Detection Report Input Message: {text} Prediction: {label} Threat Type: {threat_type} Confidence: {score}% --- 🧠 Explanation: {explanation} """ with open(filename, "w") as f: f.write(report) return filename def render_history(): with st.expander("🕓 View Analysis History", expanded=True): for i, record in enumerate(reversed(st.session_state.history)): with st.container(): st.markdown(f"**🔢 Entry #{len(st.session_state.history) - i}**") st.markdown(f"**📝 Input:** {record['input'][:100]}...") st.markdown(f"**🔐 Type:** {record['threat']} | **📊 Confidence:** {record['score']}%") st.markdown(f"**📖 Summary:** {record['summary'][:200]}...") st.markdown("---") # ======================= # Run Analysis # ======================= if clear_btn: st.session_state.history.clear() st.session_state.audio_summary = "" st.session_state.report_sent = False st.success("✅ History cleared!") if analyze_btn: st.session_state.report_sent = False st.session_state.chat_active = True combined_text = st.session_state.text_input if st.session_state.uploaded_file: extracted = extract_text_from_file(st.session_state.uploaded_file) combined_text += "\n" + extracted if not combined_text.strip(): st.warning("⚠️ Please enter some text or upload a file to analyze.") else: label, score, threat_type = analyze_with_huggingface(combined_text) translated_threat = translate_label(threat_type) st.subheader("🔍 AI Threat Detection Result") st.markdown(f"**Prediction:** {label}") st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})") st.markdown(f"**Confidence:** {score}%") summary = "" if threat_type.lower() != "safe": with st.expander("🧠 Semantic Reanalysis by LLaMA"): summary = semantic_analysis(combined_text) st.write(summary) st.session_state.audio_summary = summary # Save for audio playback audio_path = text_to_speech(summary) if audio_path: with open(audio_path, "rb") as f: st.markdown("### 🔊 Audio Explanation") st.audio(f.read(), format="audio/mp3") os.remove(audio_path) # Save history st.session_state.history.append({ "input": combined_text, "threat": threat_type, "score": score, "summary": summary }) # Generate and offer download link if summary: report_path = create_report(label, score, threat_type, summary, combined_text) with open(report_path, "rb") as f: b64 = base64.b64encode(f.read()).decode() href = f'📄 Download Full Report' st.markdown(href, unsafe_allow_html=True) with st.expander("📜 Glossary Help"): for term, definition in GLOSSARY.items(): st.markdown(f"**{term.capitalize()}**: {definition}") # ✅ Report to IT section - only visible after analysis if st.session_state.chat_active: st.markdown("---") report_it = st.button("📤 Report to IT", key="report_it_btn") if report_it: st.session_state.report_sent = True if st.session_state.report_sent: st.success("📨 Report sent to IT successfully.") render_history()