CyberSentinel / app.py
IqraFatima's picture
Update app.py
027febb verified
# βœ… Combined Flask + Gradio App with Google OAuth for CyberSentinel (Updated with server_metadata_url)
import os
import re
import csv
import fitz # PyMuPDF
import shutil
import smtplib
from datetime import datetime
from flask import Flask, redirect, url_for, session, request
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from authlib.integrations.flask_client import OAuth
import gradio as gr
from groq import Groq
from threading import Thread
# === Secrets ===
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
OFFICIAL_EMAIL = "iqrafatima1717@gmail.com"
OFFICIAL_EMAIL_PASS = os.getenv("OFFICIAL_EMAIL_APP_PASS")
# === Flask App ===
app = Flask(__name__)
app.secret_key = os.urandom(24)
oauth = OAuth(app)
google = oauth.register(
name='google',
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'},
)
# === Groq Client ===
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
client = Groq(api_key=GROQ_API_KEY)
# === App State ===
session_email = ""
it_email_choices = ["fiqra753@gmail.com"]
language_choices = ["English", "Urdu", "Arabic", "French", "German", "Spanish", "Portuguese", "Hindi", "Turkish",
"Bengali", "Russian", "Chinese", "Japanese", "Korean", "Swahili", "Indonesian", "Italian",
"Dutch", "Polish", "Thai", "Vietnamese", "Romanian", "Persian", "Punjabi", "Greek", "Hebrew",
"Malay", "Czech", "Danish", "Finnish", "Hungarian", "Norwegian", "Slovak", "Swedish", "Tamil",
"Telugu", "Gujarati", "Marathi", "Pashto", "Serbian", "Croatian", "Ukrainian", "Bulgarian",
"Filipino", "Sinhala", "Mongolian", "Kazakh", "Azerbaijani", "Nepali", "Malayalam"]
# === Core Logic ===
def build_prompt_messages(user_input, language="English"):
system_prompt = f"""
You are a cybersecurity assistant built for employees in the supply chain industry.
Your job is to:
- Identify the tone
- Detect threat type
- Score risk 0–100%
- Explain in {language}
- Advise simply
""".strip()
user_prompt = f"""
Analyze:
{user_input}
Format:
1. Tone:
2. Threat Type:
3. Threat Score:
4. Explanation (in {language}):
5. Advice (in {language}):
""".strip()
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
def extract_text_from_file(file_obj):
if file_obj is None:
return ""
filepath = file_obj.name
ext = filepath.split(".")[-1].lower()
if ext == "pdf":
doc = fitz.open(filepath)
return "\n".join(page.get_text() for page in doc)
elif ext == "txt":
with open(filepath, "r", encoding="utf-8") as f:
return f.read()
return ""
def analyze_message_interface(text_input, uploaded_file, language):
global session_email
file_text = extract_text_from_file(uploaded_file) if uploaded_file else ""
input_text = f"{text_input.strip()}\n\n{file_text.strip()}".strip()
if not input_text:
return "❌ No input provided.", gr.update(visible=False), gr.update(visible=False)
messages = build_prompt_messages(input_text, language)
response = client.chat.completions.create(
model="llama3-8b-8192", messages=messages, temperature=0.3, max_tokens=700
)
result = response.choices[0].message.content.strip()
is_threat = any(term in result.lower() for term in ["phishing", "spam", "malware"])
return result, gr.update(visible=is_threat), gr.update(visible=is_threat)
def save_report(email, result):
time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("cyber_reports.csv", "a", newline='', encoding='utf-8') as csvfile:
csv.writer(csvfile).writerow([time_now, email, result])
def download_report():
global session_email
filtered = []
with open("cyber_reports.csv", "r", encoding='utf-8') as infile:
for row in csv.reader(infile):
if len(row) >= 2 and row[1].strip() == session_email:
filtered.append(row)
with open("report_download.csv", "w", newline='', encoding='utf-8') as outfile:
csv.writer(outfile).writerows(filtered)
return "report_download.csv"
def report_to_it(language, selected_email, analysis_result):
global session_email
subject = "🚨 Suspicious Activity Reported via CyberSentinel"
body = f"""Dear IT,
Reported by user: {session_email}
Result:
{analysis_result}
Please investigate.
β€” CyberSentinel"""
try:
save_report(session_email, analysis_result)
msg = MIMEMultipart()
msg["From"] = OFFICIAL_EMAIL
msg["To"] = selected_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(OFFICIAL_EMAIL, OFFICIAL_EMAIL_PASS)
server.sendmail(OFFICIAL_EMAIL, selected_email, msg.as_string())
return "βœ… Report sent."
except Exception as e:
return f"❌ Email failed: {str(e)}"
# === Gradio UI ===
def start_gradio():
global session_email
with gr.Blocks(title="CyberSentinel") as demo:
text_input = gr.Textbox(label="βœ‰οΈ Paste Message", lines=6)
file_input = gr.File(label="πŸ“„ Upload PDF/TXT", file_types=[".pdf", ".txt"])
language = gr.Dropdown(label="🌐 Language", choices=language_choices, value="English")
analyze_btn = gr.Button("πŸ” Analyze")
output = gr.Textbox(label="🧠 Result", lines=10)
report_btn = gr.Button("🚨 Report to IT", visible=False)
it_email_dropdown = gr.Dropdown(label="πŸ“¬ IT Email", choices=it_email_choices, visible=False)
report_msg = gr.Textbox(label="πŸ“£ Confirmation", visible=False)
download_btn = gr.Button("⬇️ Download Reports")
download_file = gr.File(label="Download CSV", visible=False)
analyze_btn.click(fn=analyze_message_interface, inputs=[text_input, file_input, language], outputs=[output, report_btn, it_email_dropdown])
report_btn.click(fn=report_to_it, inputs=[language, it_email_dropdown, output], outputs=[report_msg])
report_btn.click(lambda: gr.update(visible=True), outputs=[report_msg])
download_btn.click(fn=download_report, outputs=[download_file])
download_btn.click(lambda: gr.update(visible=True), outputs=[download_file])
demo.launch(share=True)
# === Flask Routes ===
@app.route('/')
def home():
email = session.get('email', None)
if email:
global session_email
session_email = email
Thread(target=start_gradio).start()
return f"""
<div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'>
<h2>βœ… Logged in as: {email}</h2>
<p>CyberSentinel interface launched in new tab. You can close this tab.</p>
<a href='/logout' style='margin-top: 10px; color: #c00;'>Logout</a>
</div>
"""
return """
<div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'>
<h2>Welcome to CyberSentinel πŸ‘‹</h2>
<a href='/login' style='padding: 10px 20px; background-color: #4285F4; color: white; text-decoration: none; border-radius: 5px; font-size: 16px;'>
πŸ” Login with Google
</a>
</div>
"""
@app.route('/login')
def login():
redirect_uri = url_for('auth_callback', _external=True)
return google.authorize_redirect(redirect_uri)
@app.route('/auth_callback')
def auth_callback():
token = google.authorize_access_token()
user_info = google.get('userinfo').json()
session['email'] = user_info['email']
return redirect('/')
@app.route('/logout')
def logout():
session.clear()
return redirect('/')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)