# ✅ Combined Flask + Gradio App with Google OAuth for CyberSentinel (Updated with server_metadata_url) import os import re import csv import fitz # PyMuPDF import shutil import smtplib from datetime import datetime from flask import Flask, redirect, url_for, session, request from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from authlib.integrations.flask_client import OAuth import gradio as gr from groq import Groq from threading import Thread # === Secrets === CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID") CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET") GROQ_API_KEY = os.getenv("GROQ_API_KEY") OFFICIAL_EMAIL = "iqrafatima1717@gmail.com" OFFICIAL_EMAIL_PASS = os.getenv("OFFICIAL_EMAIL_APP_PASS") # === Flask App === app = Flask(__name__) app.secret_key = os.urandom(24) oauth = OAuth(app) google = oauth.register( name='google', client_id=CLIENT_ID, client_secret=CLIENT_SECRET, server_metadata_url='https://accounts.google.com/.well-known/openid-configuration', client_kwargs={'scope': 'openid email profile'}, ) # === Groq Client === os.environ["GROQ_API_KEY"] = GROQ_API_KEY client = Groq(api_key=GROQ_API_KEY) # === App State === session_email = "" it_email_choices = ["fiqra753@gmail.com"] language_choices = ["English", "Urdu", "Arabic", "French", "German", "Spanish", "Portuguese", "Hindi", "Turkish", "Bengali", "Russian", "Chinese", "Japanese", "Korean", "Swahili", "Indonesian", "Italian", "Dutch", "Polish", "Thai", "Vietnamese", "Romanian", "Persian", "Punjabi", "Greek", "Hebrew", "Malay", "Czech", "Danish", "Finnish", "Hungarian", "Norwegian", "Slovak", "Swedish", "Tamil", "Telugu", "Gujarati", "Marathi", "Pashto", "Serbian", "Croatian", "Ukrainian", "Bulgarian", "Filipino", "Sinhala", "Mongolian", "Kazakh", "Azerbaijani", "Nepali", "Malayalam"] # === Core Logic === def build_prompt_messages(user_input, language="English"): system_prompt = f""" You are a cybersecurity assistant built for employees in the supply chain industry. Your job is to: - Identify the tone - Detect threat type - Score risk 0–100% - Explain in {language} - Advise simply """.strip() user_prompt = f""" Analyze: {user_input} Format: 1. Tone: 2. Threat Type: 3. Threat Score: 4. Explanation (in {language}): 5. Advice (in {language}): """.strip() return [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] def extract_text_from_file(file_obj): if file_obj is None: return "" filepath = file_obj.name ext = filepath.split(".")[-1].lower() if ext == "pdf": doc = fitz.open(filepath) return "\n".join(page.get_text() for page in doc) elif ext == "txt": with open(filepath, "r", encoding="utf-8") as f: return f.read() return "" def analyze_message_interface(text_input, uploaded_file, language): global session_email file_text = extract_text_from_file(uploaded_file) if uploaded_file else "" input_text = f"{text_input.strip()}\n\n{file_text.strip()}".strip() if not input_text: return "❌ No input provided.", gr.update(visible=False), gr.update(visible=False) messages = build_prompt_messages(input_text, language) response = client.chat.completions.create( model="llama3-8b-8192", messages=messages, temperature=0.3, max_tokens=700 ) result = response.choices[0].message.content.strip() is_threat = any(term in result.lower() for term in ["phishing", "spam", "malware"]) return result, gr.update(visible=is_threat), gr.update(visible=is_threat) def save_report(email, result): time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") with open("cyber_reports.csv", "a", newline='', encoding='utf-8') as csvfile: csv.writer(csvfile).writerow([time_now, email, result]) def download_report(): global session_email filtered = [] with open("cyber_reports.csv", "r", encoding='utf-8') as infile: for row in csv.reader(infile): if len(row) >= 2 and row[1].strip() == session_email: filtered.append(row) with open("report_download.csv", "w", newline='', encoding='utf-8') as outfile: csv.writer(outfile).writerows(filtered) return "report_download.csv" def report_to_it(language, selected_email, analysis_result): global session_email subject = "🚨 Suspicious Activity Reported via CyberSentinel" body = f"""Dear IT, Reported by user: {session_email} Result: {analysis_result} Please investigate. — CyberSentinel""" try: save_report(session_email, analysis_result) msg = MIMEMultipart() msg["From"] = OFFICIAL_EMAIL msg["To"] = selected_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server: server.login(OFFICIAL_EMAIL, OFFICIAL_EMAIL_PASS) server.sendmail(OFFICIAL_EMAIL, selected_email, msg.as_string()) return "✅ Report sent." except Exception as e: return f"❌ Email failed: {str(e)}" # === Gradio UI === def start_gradio(): global session_email with gr.Blocks(title="CyberSentinel") as demo: text_input = gr.Textbox(label="✉️ Paste Message", lines=6) file_input = gr.File(label="📄 Upload PDF/TXT", file_types=[".pdf", ".txt"]) language = gr.Dropdown(label="🌐 Language", choices=language_choices, value="English") analyze_btn = gr.Button("🔍 Analyze") output = gr.Textbox(label="🧠 Result", lines=10) report_btn = gr.Button("🚨 Report to IT", visible=False) it_email_dropdown = gr.Dropdown(label="📬 IT Email", choices=it_email_choices, visible=False) report_msg = gr.Textbox(label="📣 Confirmation", visible=False) download_btn = gr.Button("⬇️ Download Reports") download_file = gr.File(label="Download CSV", visible=False) analyze_btn.click(fn=analyze_message_interface, inputs=[text_input, file_input, language], outputs=[output, report_btn, it_email_dropdown]) report_btn.click(fn=report_to_it, inputs=[language, it_email_dropdown, output], outputs=[report_msg]) report_btn.click(lambda: gr.update(visible=True), outputs=[report_msg]) download_btn.click(fn=download_report, outputs=[download_file]) download_btn.click(lambda: gr.update(visible=True), outputs=[download_file]) demo.launch(share=True) # === Flask Routes === @app.route('/') def home(): email = session.get('email', None) if email: global session_email session_email = email Thread(target=start_gradio).start() return f"""

✅ Logged in as: {email}

CyberSentinel interface launched in new tab. You can close this tab.

Logout
""" return """

Welcome to CyberSentinel 👋

🔐 Login with Google
""" @app.route('/login') def login(): redirect_uri = url_for('auth_callback', _external=True) return google.authorize_redirect(redirect_uri) @app.route('/auth_callback') def auth_callback(): token = google.authorize_access_token() user_info = google.get('userinfo').json() session['email'] = user_info['email'] return redirect('/') @app.route('/logout') def logout(): session.clear() return redirect('/') if __name__ == "__main__": app.run(host="0.0.0.0", port=7860)