Spaces:
Sleeping
Sleeping
| # β Combined Flask + Gradio App with Google OAuth for CyberSentinel (Updated with server_metadata_url) | |
| import os | |
| import re | |
| import csv | |
| import fitz # PyMuPDF | |
| import shutil | |
| import smtplib | |
| from datetime import datetime | |
| from flask import Flask, redirect, url_for, session, request | |
| from email.mime.text import MIMEText | |
| from email.mime.multipart import MIMEMultipart | |
| from authlib.integrations.flask_client import OAuth | |
| import gradio as gr | |
| from groq import Groq | |
| from threading import Thread | |
| # === Secrets === | |
| CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") | |
| CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| OFFICIAL_EMAIL = "iqrafatima1717@gmail.com" | |
| OFFICIAL_EMAIL_PASS = os.getenv("OFFICIAL_EMAIL_APP_PASS") | |
| # === Flask App === | |
| app = Flask(__name__) | |
| app.secret_key = os.urandom(24) | |
| oauth = OAuth(app) | |
| google = oauth.register( | |
| name='google', | |
| client_id=CLIENT_ID, | |
| client_secret=CLIENT_SECRET, | |
| server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', | |
| client_kwargs={'scope': 'openid email profile'}, | |
| ) | |
| # === Groq Client === | |
| os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
| client = Groq(api_key=GROQ_API_KEY) | |
| # === App State === | |
| session_email = "" | |
| it_email_choices = ["fiqra753@gmail.com"] | |
| language_choices = ["English", "Urdu", "Arabic", "French", "German", "Spanish", "Portuguese", "Hindi", "Turkish", | |
| "Bengali", "Russian", "Chinese", "Japanese", "Korean", "Swahili", "Indonesian", "Italian", | |
| "Dutch", "Polish", "Thai", "Vietnamese", "Romanian", "Persian", "Punjabi", "Greek", "Hebrew", | |
| "Malay", "Czech", "Danish", "Finnish", "Hungarian", "Norwegian", "Slovak", "Swedish", "Tamil", | |
| "Telugu", "Gujarati", "Marathi", "Pashto", "Serbian", "Croatian", "Ukrainian", "Bulgarian", | |
| "Filipino", "Sinhala", "Mongolian", "Kazakh", "Azerbaijani", "Nepali", "Malayalam"] | |
| # === Core Logic === | |
| def build_prompt_messages(user_input, language="English"): | |
| system_prompt = f""" | |
| You are a cybersecurity assistant built for employees in the supply chain industry. | |
| Your job is to: | |
| - Identify the tone | |
| - Detect threat type | |
| - Score risk 0β100% | |
| - Explain in {language} | |
| - Advise simply | |
| """.strip() | |
| user_prompt = f""" | |
| Analyze: | |
| {user_input} | |
| Format: | |
| 1. Tone: | |
| 2. Threat Type: | |
| 3. Threat Score: | |
| 4. Explanation (in {language}): | |
| 5. Advice (in {language}): | |
| """.strip() | |
| return [ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ] | |
| def extract_text_from_file(file_obj): | |
| if file_obj is None: | |
| return "" | |
| filepath = file_obj.name | |
| ext = filepath.split(".")[-1].lower() | |
| if ext == "pdf": | |
| doc = fitz.open(filepath) | |
| return "\n".join(page.get_text() for page in doc) | |
| elif ext == "txt": | |
| with open(filepath, "r", encoding="utf-8") as f: | |
| return f.read() | |
| return "" | |
| def analyze_message_interface(text_input, uploaded_file, language): | |
| global session_email | |
| file_text = extract_text_from_file(uploaded_file) if uploaded_file else "" | |
| input_text = f"{text_input.strip()}\n\n{file_text.strip()}".strip() | |
| if not input_text: | |
| return "β No input provided.", gr.update(visible=False), gr.update(visible=False) | |
| messages = build_prompt_messages(input_text, language) | |
| response = client.chat.completions.create( | |
| model="llama3-8b-8192", messages=messages, temperature=0.3, max_tokens=700 | |
| ) | |
| result = response.choices[0].message.content.strip() | |
| is_threat = any(term in result.lower() for term in ["phishing", "spam", "malware"]) | |
| return result, gr.update(visible=is_threat), gr.update(visible=is_threat) | |
| def save_report(email, result): | |
| time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| with open("cyber_reports.csv", "a", newline='', encoding='utf-8') as csvfile: | |
| csv.writer(csvfile).writerow([time_now, email, result]) | |
| def download_report(): | |
| global session_email | |
| filtered = [] | |
| with open("cyber_reports.csv", "r", encoding='utf-8') as infile: | |
| for row in csv.reader(infile): | |
| if len(row) >= 2 and row[1].strip() == session_email: | |
| filtered.append(row) | |
| with open("report_download.csv", "w", newline='', encoding='utf-8') as outfile: | |
| csv.writer(outfile).writerows(filtered) | |
| return "report_download.csv" | |
| def report_to_it(language, selected_email, analysis_result): | |
| global session_email | |
| subject = "π¨ Suspicious Activity Reported via CyberSentinel" | |
| body = f"""Dear IT, | |
| Reported by user: {session_email} | |
| Result: | |
| {analysis_result} | |
| Please investigate. | |
| β CyberSentinel""" | |
| try: | |
| save_report(session_email, analysis_result) | |
| msg = MIMEMultipart() | |
| msg["From"] = OFFICIAL_EMAIL | |
| msg["To"] = selected_email | |
| msg["Subject"] = subject | |
| msg.attach(MIMEText(body, "plain")) | |
| with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: | |
| server.login(OFFICIAL_EMAIL, OFFICIAL_EMAIL_PASS) | |
| server.sendmail(OFFICIAL_EMAIL, selected_email, msg.as_string()) | |
| return "β Report sent." | |
| except Exception as e: | |
| return f"β Email failed: {str(e)}" | |
| # === Gradio UI === | |
| def start_gradio(): | |
| global session_email | |
| with gr.Blocks(title="CyberSentinel") as demo: | |
| text_input = gr.Textbox(label="βοΈ Paste Message", lines=6) | |
| file_input = gr.File(label="π Upload PDF/TXT", file_types=[".pdf", ".txt"]) | |
| language = gr.Dropdown(label="π Language", choices=language_choices, value="English") | |
| analyze_btn = gr.Button("π Analyze") | |
| output = gr.Textbox(label="π§ Result", lines=10) | |
| report_btn = gr.Button("π¨ Report to IT", visible=False) | |
| it_email_dropdown = gr.Dropdown(label="π¬ IT Email", choices=it_email_choices, visible=False) | |
| report_msg = gr.Textbox(label="π£ Confirmation", visible=False) | |
| download_btn = gr.Button("β¬οΈ Download Reports") | |
| download_file = gr.File(label="Download CSV", visible=False) | |
| analyze_btn.click(fn=analyze_message_interface, inputs=[text_input, file_input, language], outputs=[output, report_btn, it_email_dropdown]) | |
| report_btn.click(fn=report_to_it, inputs=[language, it_email_dropdown, output], outputs=[report_msg]) | |
| report_btn.click(lambda: gr.update(visible=True), outputs=[report_msg]) | |
| download_btn.click(fn=download_report, outputs=[download_file]) | |
| download_btn.click(lambda: gr.update(visible=True), outputs=[download_file]) | |
| demo.launch(share=True) | |
| # === Flask Routes === | |
| def home(): | |
| email = session.get('email', None) | |
| if email: | |
| global session_email | |
| session_email = email | |
| Thread(target=start_gradio).start() | |
| return f""" | |
| <div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'> | |
| <h2>β Logged in as: {email}</h2> | |
| <p>CyberSentinel interface launched in new tab. You can close this tab.</p> | |
| <a href='/logout' style='margin-top: 10px; color: #c00;'>Logout</a> | |
| </div> | |
| """ | |
| return """ | |
| <div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'> | |
| <h2>Welcome to CyberSentinel π</h2> | |
| <a href='/login' style='padding: 10px 20px; background-color: #4285F4; color: white; text-decoration: none; border-radius: 5px; font-size: 16px;'> | |
| π Login with Google | |
| </a> | |
| </div> | |
| """ | |
| def login(): | |
| redirect_uri = url_for('auth_callback', _external=True) | |
| return google.authorize_redirect(redirect_uri) | |
| def auth_callback(): | |
| token = google.authorize_access_token() | |
| user_info = google.get('userinfo').json() | |
| session['email'] = user_info['email'] | |
| return redirect('/') | |
| def logout(): | |
| session.clear() | |
| return redirect('/') | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |