XSS_Payload_Detector / monitor.py
kd7979148's picture
Rename moniter.py to monitor.py
b3ee694 verified
# -*- coding: utf-8 -*-
#################################################
# XSS Log Monitor + BERT Detector
#################################################
import re
import time
import sqlite3
import unicodedata
from urllib.parse import (
urlparse,
parse_qs,
unquote
)
import torch
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification
)
#################################################
# setting
#################################################
LOG_FILE = "access.log"
MODEL_PATH = "xss_detect_trained"
MAX_INPUT_LENGTH = 2000
CHECK_INTERVAL = 0.2
#################################################
# SQLite
#################################################
conn = sqlite3.connect("xss_detection.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS detections (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
ip TEXT,
url TEXT,
payload TEXT,
prediction TEXT,
confidence REAL
)
""")
conn.commit()
#################################################
# model load
#################################################
print("[+] Loading Model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH)
device = torch.device("cpu")
model.to(device)
model.eval()
print("[+] Loading Model Completed.")
#################################################
# label
#################################################
labels = {
0: "NORMAL",
1: "XSS"
}
#################################################
# URL?
#################################################
def is_url(text):
return (
text.startswith("http://")
or text.startswith("https://")
or text.startswith("/")
)
#################################################
# Unicode
#################################################
def normalize_unicode(text):
return unicodedata.normalize("NFKC", text)
#################################################
# URL payload extract
#################################################
def extract_url_payload(url):
try:
parsed = urlparse(url)
raw_query = unquote(parsed.query)
params = parse_qs(parsed.query)
extracted = []
#################################################
# parameter value
#################################################
for key, values in params.items():
for value in values:
decoded = unquote(value)
extracted.append(decoded)
#################################################
# Add when suspicious code exists in query itself
#################################################
if contains_suspicious_code(raw_query):
extracted.append(raw_query)
#################################################
# use path when parameter xde
#################################################
if not extracted:
return parsed.path
return " ".join(extracted)
except:
return url
#################################################
# suspicious code?
#################################################
def contains_suspicious_code(text):
suspicious_patterns = [
# HTML / JS
"<",
">",
"script",
"javascript:",
"onerror",
"onclick",
"onload",
"iframe",
"svg",
# JS
"eval(",
"alert(",
"prompt(",
"confirm(",
"document.cookie",
"document.domain",
"window.location",
# bypassing
"constructor",
"fromcharcode",
"\\x",
"%3c",
"%3e",
"&#",
"base64",
"atob(",
#
"srcdoc",
"data:text/html",
"vbscript:",
"expression("
]
text_lower = text.lower()
for pattern in suspicious_patterns:
if pattern in text_lower:
return True
return False
#################################################
# log parsing
#################################################
def parse_log_line(line):
"""
Apache/Nginx common log format 대응
"""
try:
ip_match = re.search(r'^(\S+)', line)
request_match = re.search(
r'\"(GET|POST|PUT|DELETE|HEAD|OPTIONS)\s+(.+?)\s+HTTP',
line
)
if not ip_match or not request_match:
return None, None
ip = ip_match.group(1)
url = request_match.group(2)
return ip, url
except:
return None, None
#################################################
# BERT
#################################################
def predict_xss(text):
inputs = tokenizer(
text,
return_tensors="pt",
truncation=True,
padding=True,
max_length=128
).to(device)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probs = torch.softmax(logits, dim=1)
confidence, pred = torch.max(probs, dim=1)
pred = pred.item()
confidence = confidence.item()
label = labels[pred]
return label, confidence
#################################################
# log
#################################################
def follow(thefile):
thefile.seek(0, 2)
while True:
line = thefile.readline()
if not line:
time.sleep(CHECK_INTERVAL)
continue
yield line
#################################################
# main
#################################################
print(f"[+] Start Monitoring Logs: {LOG_FILE}")
with open(LOG_FILE, "r", encoding="utf-8", errors="ignore") as logfile:
loglines = follow(logfile)
for line in loglines:
try:
ip, url = parse_log_line(line)
if not url:
continue
#################################################
# unicode normalization
#################################################
url = normalize_unicode(url)
#################################################
# URL payload
#################################################
if is_url(url):
target_text = extract_url_payload(url)
else:
target_text = url
#################################################
# length
#################################################
if len(target_text) > MAX_INPUT_LENGTH:
continue
#################################################
# skip when suspicious fragment no exist
#################################################
if not contains_suspicious_code(target_text):
continue
#################################################
# ML
#################################################
label, confidence = predict_xss(target_text)
#################################################
# XSS detected
#################################################
if label == "XSS":
print("\n==============================")
print("[XSS DETECTED]")
print(f"IP : {ip}")
print(f"URL : {url}")
print(f"Payload : {target_text}")
print(f"Confidence : {confidence:.4f}")
print("==============================\n")
#################################################
# DB
#################################################
cursor.execute("""
INSERT INTO detections (
timestamp,
ip,
url,
payload,
prediction,
confidence
)
VALUES (
datetime('now'),
?,
?,
?,
?,
?
)
""", (
ip,
url,
target_text,
label,
confidence
))
conn.commit()
except Exception as e:
print(f"[ERROR] {e}")