Spaces:
Sleeping
Sleeping
| # import os | |
| # import re | |
| # import fitz # PyMuPDF | |
| # import tempfile | |
| # import base64 | |
| # from datetime import datetime | |
| # from gtts import gTTS | |
| # import streamlit as st | |
| # from transformers import pipeline | |
| # from groq import Groq | |
| # # β Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface | |
| # # βͺ Access secrets securely from environment variables | |
| # GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| # HF_TOKEN = os.getenv("HF_TOKEN") | |
| # KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME") | |
| # KAGGLE_KEY = os.getenv("KAGGLE_KEY") | |
| # # β Validate secrets | |
| # if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]): | |
| # st.error("β One or more required API keys are missing from the environment.") | |
| # st.stop() | |
| # # β Initialize Groq client | |
| # client = Groq(api_key=GROQ_API_KEY) | |
| # # β Load phishing detection pipeline from Hugging Face | |
| # phishing_pipe = pipeline( | |
| # "text-classification", | |
| # model="ealvaradob/bert-finetuned-phishing", | |
| # token=HF_TOKEN | |
| # ) | |
| # # β Language and role options | |
| # language_choices = ["English", "Urdu", "French"] | |
| # role_choices = ["Admin", "Procurement", "Logistics"] | |
| # # β Glossary terms | |
| # GLOSSARY = { | |
| # "phishing": "Phishing is a scam where attackers trick you into revealing personal information.", | |
| # "malware": "Malicious software designed to harm or exploit systems.", | |
| # "spam": "Unwanted or unsolicited messages.", | |
| # "tone": "The emotional character of the message." | |
| # } | |
| # # β Translations (demo dictionary-based) | |
| # TRANSLATIONS = { | |
| # "Phishing": {"Urdu": "ΩΨ΄ΩΪ―", "French": "HameΓ§onnage"}, | |
| # "Spam": {"Urdu": "Ψ³ΩΎΫΩ ", "French": "Courrier indΓ©sirable"}, | |
| # "Malware": {"Urdu": "Ω ΫΩΩΫΨ¦Ψ±", "French": "Logiciel malveillant"}, | |
| # "Safe": {"Urdu": "Ω ΨΩΩΨΈ", "French": "SΓ»r"} | |
| # } | |
| # # β In-memory history and audio | |
| # if "history" not in st.session_state: | |
| # st.session_state.history = [] | |
| # if "audio_summary" not in st.session_state: | |
| # st.session_state.audio_summary = "" | |
| # if "report_sent" not in st.session_state: | |
| # st.session_state.report_sent = False | |
| # # ======================= | |
| # # Streamlit UI | |
| # # ======================= | |
| # st.set_page_config(page_title="ZeroPhish Gate", layout="wide") | |
| # st.title("π‘οΈ ZeroPhish Gate") | |
| # st.markdown("AI-powered phishing message detection and explanation.") | |
| # # Input fields | |
| # col1, col2 = st.columns([3, 1]) | |
| # with col1: | |
| # text_input = st.text_area("βοΈ Paste Suspicious Message", height=200) | |
| # uploaded_file = st.file_uploader("π Upload PDF/TXT (optional)", type=["pdf", "txt"]) | |
| # with col2: | |
| # language = st.selectbox("π Preferred Language", language_choices) | |
| # role = st.selectbox("π§βπΌ Your Role", role_choices) | |
| # analyze_btn = st.button("π Analyze with AI") | |
| # clear_btn = st.button("ποΈ Clear History") | |
| # # ======================= | |
| # # Function Definitions | |
| # # ======================= | |
| # def extract_text_from_file(file): | |
| # if file is None: | |
| # return "" | |
| # ext = file.name.split(".")[-1].lower() | |
| # if ext == "pdf": | |
| # doc = fitz.open(stream=file.read(), filetype="pdf") | |
| # return "\n".join(page.get_text() for page in doc) | |
| # elif ext == "txt": | |
| # return file.read().decode("utf-8") | |
| # return "" | |
| # def analyze_with_huggingface(text): | |
| # try: | |
| # result = phishing_pipe(text) | |
| # label = result[0]['label'] | |
| # confidence = round(result[0]['score'] * 100, 2) | |
| # threat_type = { | |
| # "PHISHING": "Phishing", | |
| # "SPAM": "Spam", | |
| # "MALWARE": "Malware", | |
| # "LEGITIMATE": "Safe" | |
| # }.get(label.upper(), "Unknown") | |
| # return label, confidence, threat_type | |
| # except Exception as e: | |
| # return "Error", 0, f"Error: {e}" | |
| # def semantic_analysis(text): | |
| # response = client.chat.completions.create( | |
| # model="llama3-8b-8192", | |
| # messages=[ | |
| # {"role": "system", "content": "You are a cybersecurity assistant."}, | |
| # {"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"} | |
| # ] | |
| # ) | |
| # raw = response.choices[0].message.content | |
| # clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip() | |
| # return clean | |
| # def translate_label(threat_type): | |
| # return TRANSLATIONS.get(threat_type, {}).get(language, threat_type) | |
| # def text_to_speech(text): | |
| # try: | |
| # tts = gTTS(text=text, lang='en') | |
| # with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: | |
| # tts.save(fp.name) | |
| # return fp.name | |
| # except Exception as e: | |
| # st.error(f"β Audio generation error: {e}") | |
| # return None | |
| # def create_report(label, score, threat_type, explanation, text): | |
| # ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # filename = f"Zerophish_Report_{ts}.txt" | |
| # report = f""" | |
| # π AI Threat Detection Report | |
| # Input Message: | |
| # {text} | |
| # Prediction: {label} | |
| # Threat Type: {threat_type} | |
| # Confidence: {score}% | |
| # --- | |
| # π§ Explanation: | |
| # {explanation} | |
| # """ | |
| # with open(filename, "w") as f: | |
| # f.write(report) | |
| # return filename | |
| # def render_history(): | |
| # with st.expander("π View Analysis History", expanded=True): | |
| # for i, record in enumerate(reversed(st.session_state.history)): | |
| # with st.container(): | |
| # st.markdown(f"**π’ Entry #{len(st.session_state.history) - i}**") | |
| # st.markdown(f"**π Input:** {record['input'][:100]}...") | |
| # st.markdown(f"**π Type:** {record['threat']} | **π Confidence:** {record['score']}%") | |
| # st.markdown(f"**π Summary:** {record['summary'][:200]}...") | |
| # st.markdown("---") | |
| # # ======================= | |
| # # Run Analysis | |
| # # ======================= | |
| # if clear_btn: | |
| # st.session_state.history.clear() | |
| # st.session_state.audio_summary = "" | |
| # st.session_state.report_sent = False | |
| # st.success("β History cleared!") | |
| # if analyze_btn: | |
| # st.session_state.report_sent = False | |
| # combined_text = text_input | |
| # if uploaded_file: | |
| # extracted = extract_text_from_file(uploaded_file) | |
| # combined_text += "\n" + extracted | |
| # if not combined_text.strip(): | |
| # st.warning("β οΈ Please enter some text or upload a file to analyze.") | |
| # else: | |
| # label, score, threat_type = analyze_with_huggingface(combined_text) | |
| # translated_threat = translate_label(threat_type) | |
| # st.subheader("π AI Threat Detection Result") | |
| # st.markdown(f"**Prediction:** {label}") | |
| # st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})") | |
| # st.markdown(f"**Confidence:** {score}%") | |
| # summary = "" | |
| # if threat_type.lower() != "safe": | |
| # with st.expander("π§ Semantic Reanalysis by LLaMA"): | |
| # summary = semantic_analysis(combined_text) | |
| # st.write(summary) | |
| # st.session_state.audio_summary = summary # Save for audio playback | |
| # audio_path = text_to_speech(summary) | |
| # if audio_path: | |
| # with open(audio_path, "rb") as f: | |
| # st.markdown("### π Audio Explanation") | |
| # st.audio(f.read(), format="audio/mp3") | |
| # os.remove(audio_path) | |
| # # Save history | |
| # st.session_state.history.append({ | |
| # "input": combined_text, | |
| # "threat": threat_type, | |
| # "score": score, | |
| # "summary": summary | |
| # }) | |
| # # Generate and offer download link | |
| # if summary: | |
| # report_path = create_report(label, score, threat_type, summary, combined_text) | |
| # with open(report_path, "rb") as f: | |
| # b64 = base64.b64encode(f.read()).decode() | |
| # href = f'<a href="data:file/txt;base64,{b64}" download="{report_path}">π Download Full Report</a>' | |
| # st.markdown(href, unsafe_allow_html=True) | |
| # with st.expander("π Glossary Help"): | |
| # for term, definition in GLOSSARY.items(): | |
| # st.markdown(f"**{term.capitalize()}**: {definition}") | |
| # # β Report to IT section - outside the expander and stable | |
| # st.markdown("---") | |
| # report_it = st.button("π€ Report to IT", key="report_it_btn") | |
| # if report_it: | |
| # st.session_state.report_sent = True | |
| # if st.session_state.report_sent: | |
| # st.success("π¨ Report sent to IT successfully.") | |
| # render_history() | |
| # app.py | |
| import os | |
| import re | |
| import fitz # PyMuPDF | |
| import tempfile | |
| import base64 | |
| from datetime import datetime | |
| from gtts import gTTS | |
| import streamlit as st | |
| from transformers import pipeline | |
| from groq import Groq | |
| # β Hugging Face and GROQ secrets loaded via Hugging Face Spaces Secrets interface | |
| # βͺ Access secrets securely from environment variables | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| KAGGLE_USERNAME = os.getenv("KAGGLE_USERNAME") | |
| KAGGLE_KEY = os.getenv("KAGGLE_KEY") | |
| # β Validate secrets | |
| if not all([GROQ_API_KEY, HF_TOKEN, KAGGLE_USERNAME, KAGGLE_KEY]): | |
| st.error("β One or more required API keys are missing from the environment.") | |
| st.stop() | |
| # β Initialize Groq client | |
| client = Groq(api_key=GROQ_API_KEY) | |
| # β Load phishing detection pipeline from Hugging Face | |
| phishing_pipe = pipeline( | |
| "text-classification", | |
| model="ealvaradob/bert-finetuned-phishing", | |
| token=HF_TOKEN | |
| ) | |
| # β Language and role options | |
| language_choices = ["English", "Urdu", "French"] | |
| role_choices = ["Admin", "Procurement", "Logistics"] | |
| # β Glossary terms | |
| GLOSSARY = { | |
| "phishing": "Phishing is a scam where attackers trick you into revealing personal information.", | |
| "malware": "Malicious software designed to harm or exploit systems.", | |
| "spam": "Unwanted or unsolicited messages.", | |
| "tone": "The emotional character of the message." | |
| } | |
| # β Translations (demo dictionary-based) | |
| TRANSLATIONS = { | |
| "Phishing": {"Urdu": "ΩΨ΄ΩΪ―", "French": "HameΓ§onnage"}, | |
| "Spam": {"Urdu": "Ψ³ΩΎΫΩ ", "French": "Courrier indΓ©sirable"}, | |
| "Malware": {"Urdu": "Ω ΫΩΩΫΨ¦Ψ±", "French": "Logiciel malveillant"}, | |
| "Safe": {"Urdu": "Ω ΨΩΩΨΈ", "French": "SΓ»r"} | |
| } | |
| # β In-memory history and audio | |
| if "history" not in st.session_state: | |
| st.session_state.history = [] | |
| if "audio_summary" not in st.session_state: | |
| st.session_state.audio_summary = "" | |
| if "report_sent" not in st.session_state: | |
| st.session_state.report_sent = False | |
| if "chat_active" not in st.session_state: | |
| st.session_state.chat_active = False | |
| if "text_input" not in st.session_state: | |
| st.session_state.text_input = "" | |
| if "uploaded_file" not in st.session_state: | |
| st.session_state.uploaded_file = None | |
| # ======================= | |
| # Streamlit UI | |
| # ======================= | |
| st.set_page_config(page_title="ZeroPhish Gate", layout="wide") | |
| st.title("π‘οΈ ZeroPhish Gate") | |
| st.markdown("AI-powered phishing message detection and explanation.") | |
| # β New Chat button | |
| if st.button("π New Chat"): | |
| st.session_state.chat_active = False | |
| st.session_state.audio_summary = "" | |
| st.session_state.report_sent = False | |
| st.session_state.text_input = "" | |
| st.session_state.uploaded_file = None | |
| # Input fields | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| st.session_state.text_input = st.text_area("βοΈ Paste Suspicious Message", value=st.session_state.text_input, height=200) | |
| st.session_state.uploaded_file = st.file_uploader("π Upload PDF/TXT (optional)", type=["pdf", "txt"]) | |
| with col2: | |
| language = st.selectbox("π Preferred Language", language_choices) | |
| role = st.selectbox("π§βπΌ Your Role", role_choices) | |
| analyze_btn = st.button("π Analyze with AI") | |
| clear_btn = st.button("ποΈ Clear History") | |
| # ======================= | |
| # Function Definitions | |
| # ======================= | |
| def extract_text_from_file(file): | |
| if file is None: | |
| return "" | |
| ext = file.name.split(".")[-1].lower() | |
| if ext == "pdf": | |
| doc = fitz.open(stream=file.read(), filetype="pdf") | |
| return "\n".join(page.get_text() for page in doc) | |
| elif ext == "txt": | |
| return file.read().decode("utf-8") | |
| return "" | |
| def analyze_with_huggingface(text): | |
| try: | |
| result = phishing_pipe(text) | |
| label = result[0]['label'] | |
| confidence = round(result[0]['score'] * 100, 2) | |
| threat_type = { | |
| "PHISHING": "Phishing", | |
| "SPAM": "Spam", | |
| "MALWARE": "Malware", | |
| "LEGITIMATE": "Safe" | |
| }.get(label.upper(), "Unknown") | |
| return label, confidence, threat_type | |
| except Exception as e: | |
| return "Error", 0, f"Error: {e}" | |
| def semantic_analysis(text): | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", | |
| messages=[ | |
| {"role": "system", "content": "You are a cybersecurity assistant."}, | |
| {"role": "user", "content": f"Explain this suspicious message for a {role} in {language} without ending in questions:\n{text}"} | |
| ] | |
| ) | |
| raw = response.choices[0].message.content | |
| clean = re.sub(r"Is there anything else you'd like.*", "", raw, flags=re.I).strip() | |
| return clean | |
| def translate_label(threat_type): | |
| return TRANSLATIONS.get(threat_type, {}).get(language, threat_type) | |
| def text_to_speech(text): | |
| try: | |
| tts = gTTS(text=text, lang='en') | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: | |
| tts.save(fp.name) | |
| return fp.name | |
| except Exception as e: | |
| st.error(f"β Audio generation error: {e}") | |
| return None | |
| def create_report(label, score, threat_type, explanation, text): | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"Zerophish_Report_{ts}.txt" | |
| report = f""" | |
| π AI Threat Detection Report | |
| Input Message: | |
| {text} | |
| Prediction: {label} | |
| Threat Type: {threat_type} | |
| Confidence: {score}% | |
| --- | |
| π§ Explanation: | |
| {explanation} | |
| """ | |
| with open(filename, "w") as f: | |
| f.write(report) | |
| return filename | |
| def render_history(): | |
| with st.expander("π View Analysis History", expanded=True): | |
| for i, record in enumerate(reversed(st.session_state.history)): | |
| with st.container(): | |
| st.markdown(f"**π’ Entry #{len(st.session_state.history) - i}**") | |
| st.markdown(f"**π Input:** {record['input'][:100]}...") | |
| st.markdown(f"**π Type:** {record['threat']} | **π Confidence:** {record['score']}%") | |
| st.markdown(f"**π Summary:** {record['summary'][:200]}...") | |
| st.markdown("---") | |
| # ======================= | |
| # Run Analysis | |
| # ======================= | |
| if clear_btn: | |
| st.session_state.history.clear() | |
| st.session_state.audio_summary = "" | |
| st.session_state.report_sent = False | |
| st.success("β History cleared!") | |
| if analyze_btn: | |
| st.session_state.report_sent = False | |
| st.session_state.chat_active = True | |
| combined_text = st.session_state.text_input | |
| if st.session_state.uploaded_file: | |
| extracted = extract_text_from_file(st.session_state.uploaded_file) | |
| combined_text += "\n" + extracted | |
| if not combined_text.strip(): | |
| st.warning("β οΈ Please enter some text or upload a file to analyze.") | |
| else: | |
| label, score, threat_type = analyze_with_huggingface(combined_text) | |
| translated_threat = translate_label(threat_type) | |
| st.subheader("π AI Threat Detection Result") | |
| st.markdown(f"**Prediction:** {label}") | |
| st.markdown(f"**Threat Type:** {threat_type} ({translated_threat})") | |
| st.markdown(f"**Confidence:** {score}%") | |
| summary = "" | |
| if threat_type.lower() != "safe": | |
| with st.expander("π§ Semantic Reanalysis by LLaMA"): | |
| summary = semantic_analysis(combined_text) | |
| st.write(summary) | |
| st.session_state.audio_summary = summary # Save for audio playback | |
| audio_path = text_to_speech(summary) | |
| if audio_path: | |
| with open(audio_path, "rb") as f: | |
| st.markdown("### π Audio Explanation") | |
| st.audio(f.read(), format="audio/mp3") | |
| os.remove(audio_path) | |
| # Save history | |
| st.session_state.history.append({ | |
| "input": combined_text, | |
| "threat": threat_type, | |
| "score": score, | |
| "summary": summary | |
| }) | |
| # Generate and offer download link | |
| if summary: | |
| report_path = create_report(label, score, threat_type, summary, combined_text) | |
| with open(report_path, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode() | |
| href = f'<a href="data:file/txt;base64,{b64}" download="{report_path}">π Download Full Report</a>' | |
| st.markdown(href, unsafe_allow_html=True) | |
| with st.expander("π Glossary Help"): | |
| for term, definition in GLOSSARY.items(): | |
| st.markdown(f"**{term.capitalize()}**: {definition}") | |
| # β Report to IT section - only visible after analysis | |
| if st.session_state.chat_active: | |
| st.markdown("---") | |
| report_it = st.button("π€ Report to IT", key="report_it_btn") | |
| if report_it: | |
| st.session_state.report_sent = True | |
| if st.session_state.report_sent: | |
| st.success("π¨ Report sent to IT successfully.") | |
| render_history() | |