ai-nids-backend / utils /logger.py
CodebaseAi's picture
Solved classify_risk
44fda73
# logger.py (Model-separated, non-blocking logger, per-model CSVs)
# -------------------------------------------------------------
import os
import csv
import threading
import time
from datetime import datetime
import numpy as np
LOG_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "logs"))
os.makedirs(LOG_DIR, exist_ok=True)
# ADD THESE TWO LINES:
LOG_FILE = os.path.join(LOG_DIR, "bcc_logs.csv") # Fallback to BCC log
def classify_risk(score):
"""Simple helper to convert numeric score to text for the UI"""
if score > 0.8: return "High"
if score > 0.4: return "Medium"
return "Low"
BCC_LOG_FILE = os.path.join(LOG_DIR, "bcc_logs.csv")
CICIDS_LOG_FILE = os.path.join(LOG_DIR, "cicids_logs.csv")
_MAX_RECENT = 500
_FLUSH_INTERVAL = 2.0
_FLUSH_BATCH = 50
_headers = [
"time", "src_ip", "sport", "dst_ip", "dport", "proto",
"prediction", "risk_level", "risk_score",
"src_country", "src_city", "src_lat", "src_lon",
"dst_country", "dst_city", "dst_lat", "dst_lon"
]
# In-memory per-model buffers & stats
_model_events = {
"bcc": [], # list of dicts
"cicids": []
}
_model_stats = {
"bcc": {},
"cicids": {}
}
# active model (default)
_active_model_lock = threading.Lock()
_active_model = "bcc"
# writer buffers and locks
_write_buffer = [] # list of dicts, each item must include "model" key
_buffer_lock = threading.Lock()
_events_lock = threading.Lock()
_stop_writer = threading.Event()
# -------------------------
# Helpers: file name for model
# -------------------------
def _file_for_model(model):
if model == "cicids":
return CICIDS_LOG_FILE
return BCC_LOG_FILE
# -------------------------
# Full overwrite for a model CSV
# -------------------------
def _flush_full_overwrite_model(model):
"""Rewrite the entire CSV for a specific model from its in-memory buffer."""
fname = _file_for_model(model)
try:
with _events_lock:
rows = list(_model_events.get(model, []))
with open(fname, "w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=_headers)
writer.writeheader()
for row in rows:
writer.writerow({k: row.get(k, "") for k in _headers})
# optional debug print
# print(f"[logger] {model} CSV fully rewritten: {len(rows)} rows -> {fname}")
except Exception as e:
print("[logger] Full overwrite failed:", e)
# -------------------------
# Flush small batches to disk (append)
# -------------------------
def _flush_to_disk():
global _write_buffer
with _buffer_lock:
if not _write_buffer:
return
batch = _write_buffer[:_FLUSH_BATCH]
_write_buffer = _write_buffer[len(batch):]
# group by model for efficient writes
groups = {}
for row in batch:
m = row.get("model", "bcc")
groups.setdefault(m, []).append(row)
for model, rows in groups.items():
fname = _file_for_model(model)
try:
file_empty = not os.path.exists(fname) or os.stat(fname).st_size == 0
with open(fname, "a", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=_headers)
if file_empty:
writer.writeheader()
for r in rows:
# write only header keys (ignore extra)
writer.writerow({k: r.get(k, "") for k in _headers})
except Exception as e:
print("[logger] Append write error for", model, ":", e)
# -------------------------
# Background writer thread
# -------------------------
def _writer_thread():
while not _stop_writer.is_set():
time.sleep(_FLUSH_INTERVAL)
_flush_to_disk()
# flush remaining on shutdown
_flush_to_disk()
_writer_thr = threading.Thread(target=_writer_thread, daemon=True)
_writer_thr.start()
# -------------------------
# Load existing CSVs into _model_events on startup (keep last _MAX_RECENT)
# -------------------------
def _load_recent_model(model):
fname = _file_for_model(model)
if not os.path.exists(fname):
return []
try:
with open(fname, "r", encoding="utf-8") as f:
reader = list(csv.DictReader(f))
return reader[-_MAX_RECENT:]
except Exception:
return []
def _load_all_recent():
global _model_events
with _events_lock:
_model_events["bcc"] = _load_recent_model("bcc")
_model_events["cicids"] = _load_recent_model("cicids")
_load_all_recent()
# ===============================
# Public API: push_event
# ===============================
def push_event(evt):
"""
evt: dict containing event fields expected (prediction, src_ip, dst_ip, etc.)
Uses current active model to store event.
Also enqueues to write buffer for background flush.
"""
global _write_buffer
# attach model at time of push
with _active_model_lock:
model = _active_model
e = dict(evt)
e.setdefault("time", datetime.now().strftime("%H:%M:%S"))
e.setdefault("risk_level", "Low")
e.setdefault("risk_score", 0)
# add to in-memory buffer for model
with _events_lock:
_model_events.setdefault(model, [])
_model_events[model].append(e)
if len(_model_events[model]) > _MAX_RECENT:
_model_events[model] = _model_events[model][-_MAX_RECENT:]
# update stats
pred = str(e.get("prediction", "Unknown"))
_model_stats.setdefault(model, {})
_model_stats[model][pred] = _model_stats[model].get(pred, 0) + 1
# add to write buffer with model tag for background writer
item = dict(e)
item["model"] = model
with _buffer_lock:
_write_buffer.append(item)
# if buffer grows big, flush asynchronously
if len(_write_buffer) > (_FLUSH_BATCH * 4):
threading.Thread(target=_flush_to_disk, daemon=True).start()
# ===============================
# Public API: get recent & stats
# ===============================
def get_recent_events(model="bcc", n=None):
with _events_lock:
data = list(_model_events.get(model, []))
if n:
return data[-n:]
return data
def get_model_stats(model="bcc"):
with _events_lock:
# return a shallow copy to avoid external mutation
return dict(_model_stats.get(model, {}))
# -------------------------
# Convenience: summary across active model (legacy)
# -------------------------
def summarize_counts():
with _active_model_lock:
model = _active_model
return get_model_stats(model)
# ===============================
# Model selection API
# ===============================
def set_active_model(model):
if model not in ("bcc", "cicids"):
raise ValueError("invalid model")
with _active_model_lock:
global _active_model
_active_model = model
# no immediate clearing — in-memory buffers persist per model
return _active_model
def get_active_model():
with _active_model_lock:
return _active_model
# ===============================
# CLEAR / DELETE (model-wise)
# ===============================
def clear_last_events(model="bcc", n=99999):
with _events_lock:
ev = _model_events.get(model, [])
if n >= len(ev):
_model_events[model] = []
else:
_model_events[model] = ev[:-n]
# reset stats for this model
_model_stats[model] = {}
# rewrite model CSV fully
_flush_full_overwrite_model(model)
return True
def delete_by_index(model="bcc", idx=0):
with _events_lock:
ev = _model_events.get(model, [])
if 0 <= idx < len(ev):
ev.pop(idx)
_model_events[model] = ev
# recompute stats (simple recompute)
_model_stats[model] = {}
for e in ev:
pred = str(e.get("prediction", "Unknown"))
_model_stats[model][pred] = _model_stats[model].get(pred, 0) + 1
_flush_full_overwrite_model(model)
return True
return False
def delete_by_prediction(model="bcc", pred=None):
if pred is None:
return False
with _events_lock:
ev = _model_events.get(model, [])
_model_events[model] = [e for e in ev if e.get("prediction") != pred]
# recompute stats
_model_stats[model] = {}
for e in _model_events[model]:
p = str(e.get("prediction", "Unknown"))
_model_stats[model][p] = _model_stats[model].get(p, 0) + 1
_flush_full_overwrite_model(model)
return True
# ===============================
# Shutdown
# ===============================
def shutdown_logger():
_stop_writer.set()
_writer_thr.join(timeout=3)