# -*- coding: utf-8 -*- ################################################# # XSS Log Monitor + BERT Detector ################################################# import re import time import sqlite3 import unicodedata from urllib.parse import ( urlparse, parse_qs, unquote ) import torch from transformers import ( AutoTokenizer, AutoModelForSequenceClassification ) ################################################# # setting ################################################# LOG_FILE = "access.log" MODEL_PATH = "xss_detect_trained" MAX_INPUT_LENGTH = 2000 CHECK_INTERVAL = 0.2 ################################################# # SQLite ################################################# conn = sqlite3.connect("xss_detection.db") cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS detections ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, ip TEXT, url TEXT, payload TEXT, prediction TEXT, confidence REAL ) """) conn.commit() ################################################# # model load ################################################# print("[+] Loading Model...") tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH) device = torch.device("cpu") model.to(device) model.eval() print("[+] Loading Model Completed.") ################################################# # label ################################################# labels = { 0: "NORMAL", 1: "XSS" } ################################################# # URL? ################################################# def is_url(text): return ( text.startswith("http://") or text.startswith("https://") or text.startswith("/") ) ################################################# # Unicode ################################################# def normalize_unicode(text): return unicodedata.normalize("NFKC", text) ################################################# # URL payload extract ################################################# def extract_url_payload(url): try: parsed = urlparse(url) raw_query = unquote(parsed.query) params = parse_qs(parsed.query) extracted = [] ################################################# # parameter value ################################################# for key, values in params.items(): for value in values: decoded = unquote(value) extracted.append(decoded) ################################################# # Add when suspicious code exists in query itself ################################################# if contains_suspicious_code(raw_query): extracted.append(raw_query) ################################################# # use path when parameter xde ################################################# if not extracted: return parsed.path return " ".join(extracted) except: return url ################################################# # suspicious code? ################################################# def contains_suspicious_code(text): suspicious_patterns = [ # HTML / JS "<", ">", "script", "javascript:", "onerror", "onclick", "onload", "iframe", "svg", # JS "eval(", "alert(", "prompt(", "confirm(", "document.cookie", "document.domain", "window.location", # bypassing "constructor", "fromcharcode", "\\x", "%3c", "%3e", "&#", "base64", "atob(", # "srcdoc", "data:text/html", "vbscript:", "expression(" ] text_lower = text.lower() for pattern in suspicious_patterns: if pattern in text_lower: return True return False ################################################# # log parsing ################################################# def parse_log_line(line): """ Apache/Nginx common log format 대응 """ try: ip_match = re.search(r'^(\S+)', line) request_match = re.search( r'\"(GET|POST|PUT|DELETE|HEAD|OPTIONS)\s+(.+?)\s+HTTP', line ) if not ip_match or not request_match: return None, None ip = ip_match.group(1) url = request_match.group(2) return ip, url except: return None, None ################################################# # BERT ################################################# def predict_xss(text): inputs = tokenizer( text, return_tensors="pt", truncation=True, padding=True, max_length=128 ).to(device) with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits probs = torch.softmax(logits, dim=1) confidence, pred = torch.max(probs, dim=1) pred = pred.item() confidence = confidence.item() label = labels[pred] return label, confidence ################################################# # log ################################################# def follow(thefile): thefile.seek(0, 2) while True: line = thefile.readline() if not line: time.sleep(CHECK_INTERVAL) continue yield line ################################################# # main ################################################# print(f"[+] Start Monitoring Logs: {LOG_FILE}") with open(LOG_FILE, "r", encoding="utf-8", errors="ignore") as logfile: loglines = follow(logfile) for line in loglines: try: ip, url = parse_log_line(line) if not url: continue ################################################# # unicode normalization ################################################# url = normalize_unicode(url) ################################################# # URL payload ################################################# if is_url(url): target_text = extract_url_payload(url) else: target_text = url ################################################# # length ################################################# if len(target_text) > MAX_INPUT_LENGTH: continue ################################################# # skip when suspicious fragment no exist ################################################# if not contains_suspicious_code(target_text): continue ################################################# # ML ################################################# label, confidence = predict_xss(target_text) ################################################# # XSS detected ################################################# if label == "XSS": print("\n==============================") print("[XSS DETECTED]") print(f"IP : {ip}") print(f"URL : {url}") print(f"Payload : {target_text}") print(f"Confidence : {confidence:.4f}") print("==============================\n") ################################################# # DB ################################################# cursor.execute(""" INSERT INTO detections ( timestamp, ip, url, payload, prediction, confidence ) VALUES ( datetime('now'), ?, ?, ?, ?, ? ) """, ( ip, url, target_text, label, confidence )) conn.commit() except Exception as e: print(f"[ERROR] {e}")