Spaces:
Sleeping
Sleeping
File size: 8,049 Bytes
027febb fcd3fd8 feaee1b fcd3fd8 feaee1b 82b477b feaee1b fcd3fd8 feaee1b fcd3fd8 027febb fcd3fd8 feaee1b fcd3fd8 feaee1b efcc57f feaee1b efcc57f fcd3fd8 82b477b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# β
Combined Flask + Gradio App with Google OAuth for CyberSentinel (Updated with server_metadata_url)
import os
import re
import csv
import fitz # PyMuPDF
import shutil
import smtplib
from datetime import datetime
from flask import Flask, redirect, url_for, session, request
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from authlib.integrations.flask_client import OAuth
import gradio as gr
from groq import Groq
from threading import Thread
# === Secrets ===
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID")
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET")
GROQ_API_KEY = os.getenv("GROQ_API_KEY")
OFFICIAL_EMAIL = "iqrafatima1717@gmail.com"
OFFICIAL_EMAIL_PASS = os.getenv("OFFICIAL_EMAIL_APP_PASS")
# === Flask App ===
app = Flask(__name__)
app.secret_key = os.urandom(24)
oauth = OAuth(app)
google = oauth.register(
name='google',
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={'scope': 'openid email profile'},
)
# === Groq Client ===
os.environ["GROQ_API_KEY"] = GROQ_API_KEY
client = Groq(api_key=GROQ_API_KEY)
# === App State ===
session_email = ""
it_email_choices = ["fiqra753@gmail.com"]
language_choices = ["English", "Urdu", "Arabic", "French", "German", "Spanish", "Portuguese", "Hindi", "Turkish",
"Bengali", "Russian", "Chinese", "Japanese", "Korean", "Swahili", "Indonesian", "Italian",
"Dutch", "Polish", "Thai", "Vietnamese", "Romanian", "Persian", "Punjabi", "Greek", "Hebrew",
"Malay", "Czech", "Danish", "Finnish", "Hungarian", "Norwegian", "Slovak", "Swedish", "Tamil",
"Telugu", "Gujarati", "Marathi", "Pashto", "Serbian", "Croatian", "Ukrainian", "Bulgarian",
"Filipino", "Sinhala", "Mongolian", "Kazakh", "Azerbaijani", "Nepali", "Malayalam"]
# === Core Logic ===
def build_prompt_messages(user_input, language="English"):
system_prompt = f"""
You are a cybersecurity assistant built for employees in the supply chain industry.
Your job is to:
- Identify the tone
- Detect threat type
- Score risk 0β100%
- Explain in {language}
- Advise simply
""".strip()
user_prompt = f"""
Analyze:
{user_input}
Format:
1. Tone:
2. Threat Type:
3. Threat Score:
4. Explanation (in {language}):
5. Advice (in {language}):
""".strip()
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
def extract_text_from_file(file_obj):
if file_obj is None:
return ""
filepath = file_obj.name
ext = filepath.split(".")[-1].lower()
if ext == "pdf":
doc = fitz.open(filepath)
return "\n".join(page.get_text() for page in doc)
elif ext == "txt":
with open(filepath, "r", encoding="utf-8") as f:
return f.read()
return ""
def analyze_message_interface(text_input, uploaded_file, language):
global session_email
file_text = extract_text_from_file(uploaded_file) if uploaded_file else ""
input_text = f"{text_input.strip()}\n\n{file_text.strip()}".strip()
if not input_text:
return "β No input provided.", gr.update(visible=False), gr.update(visible=False)
messages = build_prompt_messages(input_text, language)
response = client.chat.completions.create(
model="llama3-8b-8192", messages=messages, temperature=0.3, max_tokens=700
)
result = response.choices[0].message.content.strip()
is_threat = any(term in result.lower() for term in ["phishing", "spam", "malware"])
return result, gr.update(visible=is_threat), gr.update(visible=is_threat)
def save_report(email, result):
time_now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open("cyber_reports.csv", "a", newline='', encoding='utf-8') as csvfile:
csv.writer(csvfile).writerow([time_now, email, result])
def download_report():
global session_email
filtered = []
with open("cyber_reports.csv", "r", encoding='utf-8') as infile:
for row in csv.reader(infile):
if len(row) >= 2 and row[1].strip() == session_email:
filtered.append(row)
with open("report_download.csv", "w", newline='', encoding='utf-8') as outfile:
csv.writer(outfile).writerows(filtered)
return "report_download.csv"
def report_to_it(language, selected_email, analysis_result):
global session_email
subject = "π¨ Suspicious Activity Reported via CyberSentinel"
body = f"""Dear IT,
Reported by user: {session_email}
Result:
{analysis_result}
Please investigate.
β CyberSentinel"""
try:
save_report(session_email, analysis_result)
msg = MIMEMultipart()
msg["From"] = OFFICIAL_EMAIL
msg["To"] = selected_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
with smtplib.SMTP_SSL("smtp.gmail.com", 465) as server:
server.login(OFFICIAL_EMAIL, OFFICIAL_EMAIL_PASS)
server.sendmail(OFFICIAL_EMAIL, selected_email, msg.as_string())
return "β
Report sent."
except Exception as e:
return f"β Email failed: {str(e)}"
# === Gradio UI ===
def start_gradio():
global session_email
with gr.Blocks(title="CyberSentinel") as demo:
text_input = gr.Textbox(label="βοΈ Paste Message", lines=6)
file_input = gr.File(label="π Upload PDF/TXT", file_types=[".pdf", ".txt"])
language = gr.Dropdown(label="π Language", choices=language_choices, value="English")
analyze_btn = gr.Button("π Analyze")
output = gr.Textbox(label="π§ Result", lines=10)
report_btn = gr.Button("π¨ Report to IT", visible=False)
it_email_dropdown = gr.Dropdown(label="π¬ IT Email", choices=it_email_choices, visible=False)
report_msg = gr.Textbox(label="π£ Confirmation", visible=False)
download_btn = gr.Button("β¬οΈ Download Reports")
download_file = gr.File(label="Download CSV", visible=False)
analyze_btn.click(fn=analyze_message_interface, inputs=[text_input, file_input, language], outputs=[output, report_btn, it_email_dropdown])
report_btn.click(fn=report_to_it, inputs=[language, it_email_dropdown, output], outputs=[report_msg])
report_btn.click(lambda: gr.update(visible=True), outputs=[report_msg])
download_btn.click(fn=download_report, outputs=[download_file])
download_btn.click(lambda: gr.update(visible=True), outputs=[download_file])
demo.launch(share=True)
# === Flask Routes ===
@app.route('/')
def home():
email = session.get('email', None)
if email:
global session_email
session_email = email
Thread(target=start_gradio).start()
return f"""
<div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'>
<h2>β
Logged in as: {email}</h2>
<p>CyberSentinel interface launched in new tab. You can close this tab.</p>
<a href='/logout' style='margin-top: 10px; color: #c00;'>Logout</a>
</div>
"""
return """
<div style='display: flex; justify-content: center; align-items: center; height: 100vh; flex-direction: column; font-family: sans-serif;'>
<h2>Welcome to CyberSentinel π</h2>
<a href='/login' style='padding: 10px 20px; background-color: #4285F4; color: white; text-decoration: none; border-radius: 5px; font-size: 16px;'>
π Login with Google
</a>
</div>
"""
@app.route('/login')
def login():
redirect_uri = url_for('auth_callback', _external=True)
return google.authorize_redirect(redirect_uri)
@app.route('/auth_callback')
def auth_callback():
token = google.authorize_access_token()
user_info = google.get('userinfo').json()
session['email'] = user_info['email']
return redirect('/')
@app.route('/logout')
def logout():
session.clear()
return redirect('/')
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)
|