Text Classification
Transformers
Safetensors
English
distilbert
cybersecurity
xss
security
web
payload-detection
web-security
Instructions to use kd7979148/XSS_Payload_Detector with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use kd7979148/XSS_Payload_Detector with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-classification", model="kd7979148/XSS_Payload_Detector")# Load model directly from transformers import AutoTokenizer, AutoModelForSequenceClassification tokenizer = AutoTokenizer.from_pretrained("kd7979148/XSS_Payload_Detector") model = AutoModelForSequenceClassification.from_pretrained("kd7979148/XSS_Payload_Detector") - Notebooks
- Google Colab
- Kaggle
| # -*- coding: utf-8 -*- | |
| ################################################# | |
| # XSS Log Monitor + BERT Detector | |
| ################################################# | |
| import re | |
| import time | |
| import sqlite3 | |
| import unicodedata | |
| from urllib.parse import ( | |
| urlparse, | |
| parse_qs, | |
| unquote | |
| ) | |
| import torch | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForSequenceClassification | |
| ) | |
| ################################################# | |
| # setting | |
| ################################################# | |
| LOG_FILE = "access.log" | |
| MODEL_PATH = "xss_detect_trained" | |
| MAX_INPUT_LENGTH = 2000 | |
| CHECK_INTERVAL = 0.2 | |
| ################################################# | |
| # SQLite | |
| ################################################# | |
| conn = sqlite3.connect("xss_detection.db") | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS detections ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT, | |
| ip TEXT, | |
| url TEXT, | |
| payload TEXT, | |
| prediction TEXT, | |
| confidence REAL | |
| ) | |
| """) | |
| conn.commit() | |
| ################################################# | |
| # model load | |
| ################################################# | |
| print("[+] Loading Model...") | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH) | |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_PATH) | |
| device = torch.device("cpu") | |
| model.to(device) | |
| model.eval() | |
| print("[+] Loading Model Completed.") | |
| ################################################# | |
| # label | |
| ################################################# | |
| labels = { | |
| 0: "NORMAL", | |
| 1: "XSS" | |
| } | |
| ################################################# | |
| # URL? | |
| ################################################# | |
| def is_url(text): | |
| return ( | |
| text.startswith("http://") | |
| or text.startswith("https://") | |
| or text.startswith("/") | |
| ) | |
| ################################################# | |
| # Unicode | |
| ################################################# | |
| def normalize_unicode(text): | |
| return unicodedata.normalize("NFKC", text) | |
| ################################################# | |
| # URL payload extract | |
| ################################################# | |
| def extract_url_payload(url): | |
| try: | |
| parsed = urlparse(url) | |
| raw_query = unquote(parsed.query) | |
| params = parse_qs(parsed.query) | |
| extracted = [] | |
| ################################################# | |
| # parameter value | |
| ################################################# | |
| for key, values in params.items(): | |
| for value in values: | |
| decoded = unquote(value) | |
| extracted.append(decoded) | |
| ################################################# | |
| # Add when suspicious code exists in query itself | |
| ################################################# | |
| if contains_suspicious_code(raw_query): | |
| extracted.append(raw_query) | |
| ################################################# | |
| # use path when parameter xde | |
| ################################################# | |
| if not extracted: | |
| return parsed.path | |
| return " ".join(extracted) | |
| except: | |
| return url | |
| ################################################# | |
| # suspicious code? | |
| ################################################# | |
| def contains_suspicious_code(text): | |
| suspicious_patterns = [ | |
| # HTML / JS | |
| "<", | |
| ">", | |
| "script", | |
| "javascript:", | |
| "onerror", | |
| "onclick", | |
| "onload", | |
| "iframe", | |
| "svg", | |
| # JS | |
| "eval(", | |
| "alert(", | |
| "prompt(", | |
| "confirm(", | |
| "document.cookie", | |
| "document.domain", | |
| "window.location", | |
| # bypassing | |
| "constructor", | |
| "fromcharcode", | |
| "\\x", | |
| "%3c", | |
| "%3e", | |
| "&#", | |
| "base64", | |
| "atob(", | |
| # | |
| "srcdoc", | |
| "data:text/html", | |
| "vbscript:", | |
| "expression(" | |
| ] | |
| text_lower = text.lower() | |
| for pattern in suspicious_patterns: | |
| if pattern in text_lower: | |
| return True | |
| return False | |
| ################################################# | |
| # log parsing | |
| ################################################# | |
| def parse_log_line(line): | |
| """ | |
| Apache/Nginx common log format 대응 | |
| """ | |
| try: | |
| ip_match = re.search(r'^(\S+)', line) | |
| request_match = re.search( | |
| r'\"(GET|POST|PUT|DELETE|HEAD|OPTIONS)\s+(.+?)\s+HTTP', | |
| line | |
| ) | |
| if not ip_match or not request_match: | |
| return None, None | |
| ip = ip_match.group(1) | |
| url = request_match.group(2) | |
| return ip, url | |
| except: | |
| return None, None | |
| ################################################# | |
| # BERT | |
| ################################################# | |
| def predict_xss(text): | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| padding=True, | |
| max_length=128 | |
| ).to(device) | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| logits = outputs.logits | |
| probs = torch.softmax(logits, dim=1) | |
| confidence, pred = torch.max(probs, dim=1) | |
| pred = pred.item() | |
| confidence = confidence.item() | |
| label = labels[pred] | |
| return label, confidence | |
| ################################################# | |
| # log | |
| ################################################# | |
| def follow(thefile): | |
| thefile.seek(0, 2) | |
| while True: | |
| line = thefile.readline() | |
| if not line: | |
| time.sleep(CHECK_INTERVAL) | |
| continue | |
| yield line | |
| ################################################# | |
| # main | |
| ################################################# | |
| print(f"[+] Start Monitoring Logs: {LOG_FILE}") | |
| with open(LOG_FILE, "r", encoding="utf-8", errors="ignore") as logfile: | |
| loglines = follow(logfile) | |
| for line in loglines: | |
| try: | |
| ip, url = parse_log_line(line) | |
| if not url: | |
| continue | |
| ################################################# | |
| # unicode normalization | |
| ################################################# | |
| url = normalize_unicode(url) | |
| ################################################# | |
| # URL payload | |
| ################################################# | |
| if is_url(url): | |
| target_text = extract_url_payload(url) | |
| else: | |
| target_text = url | |
| ################################################# | |
| # length | |
| ################################################# | |
| if len(target_text) > MAX_INPUT_LENGTH: | |
| continue | |
| ################################################# | |
| # skip when suspicious fragment no exist | |
| ################################################# | |
| if not contains_suspicious_code(target_text): | |
| continue | |
| ################################################# | |
| # ML | |
| ################################################# | |
| label, confidence = predict_xss(target_text) | |
| ################################################# | |
| # XSS detected | |
| ################################################# | |
| if label == "XSS": | |
| print("\n==============================") | |
| print("[XSS DETECTED]") | |
| print(f"IP : {ip}") | |
| print(f"URL : {url}") | |
| print(f"Payload : {target_text}") | |
| print(f"Confidence : {confidence:.4f}") | |
| print("==============================\n") | |
| ################################################# | |
| # DB | |
| ################################################# | |
| cursor.execute(""" | |
| INSERT INTO detections ( | |
| timestamp, | |
| ip, | |
| url, | |
| payload, | |
| prediction, | |
| confidence | |
| ) | |
| VALUES ( | |
| datetime('now'), | |
| ?, | |
| ?, | |
| ?, | |
| ?, | |
| ? | |
| ) | |
| """, ( | |
| ip, | |
| url, | |
| target_text, | |
| label, | |
| confidence | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| print(f"[ERROR] {e}") | |