Spaces:
Sleeping
Sleeping
| # logger.py (Model-separated, non-blocking logger, per-model CSVs) | |
| # ------------------------------------------------------------- | |
| import os | |
| import csv | |
| import threading | |
| import time | |
| from datetime import datetime | |
| import numpy as np | |
| LOG_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "logs")) | |
| os.makedirs(LOG_DIR, exist_ok=True) | |
| # ADD THESE TWO LINES: | |
| LOG_FILE = os.path.join(LOG_DIR, "bcc_logs.csv") # Fallback to BCC log | |
| def classify_risk(score): | |
| """Simple helper to convert numeric score to text for the UI""" | |
| if score > 0.8: return "High" | |
| if score > 0.4: return "Medium" | |
| return "Low" | |
| BCC_LOG_FILE = os.path.join(LOG_DIR, "bcc_logs.csv") | |
| CICIDS_LOG_FILE = os.path.join(LOG_DIR, "cicids_logs.csv") | |
| _MAX_RECENT = 500 | |
| _FLUSH_INTERVAL = 2.0 | |
| _FLUSH_BATCH = 50 | |
| _headers = [ | |
| "time", "src_ip", "sport", "dst_ip", "dport", "proto", | |
| "prediction", "risk_level", "risk_score", | |
| "src_country", "src_city", "src_lat", "src_lon", | |
| "dst_country", "dst_city", "dst_lat", "dst_lon" | |
| ] | |
| # In-memory per-model buffers & stats | |
| _model_events = { | |
| "bcc": [], # list of dicts | |
| "cicids": [] | |
| } | |
| _model_stats = { | |
| "bcc": {}, | |
| "cicids": {} | |
| } | |
| # active model (default) | |
| _active_model_lock = threading.Lock() | |
| _active_model = "bcc" | |
| # writer buffers and locks | |
| _write_buffer = [] # list of dicts, each item must include "model" key | |
| _buffer_lock = threading.Lock() | |
| _events_lock = threading.Lock() | |
| _stop_writer = threading.Event() | |
| # ------------------------- | |
| # Helpers: file name for model | |
| # ------------------------- | |
| def _file_for_model(model): | |
| if model == "cicids": | |
| return CICIDS_LOG_FILE | |
| return BCC_LOG_FILE | |
| # ------------------------- | |
| # Full overwrite for a model CSV | |
| # ------------------------- | |
| def _flush_full_overwrite_model(model): | |
| """Rewrite the entire CSV for a specific model from its in-memory buffer.""" | |
| fname = _file_for_model(model) | |
| try: | |
| with _events_lock: | |
| rows = list(_model_events.get(model, [])) | |
| with open(fname, "w", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=_headers) | |
| writer.writeheader() | |
| for row in rows: | |
| writer.writerow({k: row.get(k, "") for k in _headers}) | |
| # optional debug print | |
| # print(f"[logger] {model} CSV fully rewritten: {len(rows)} rows -> {fname}") | |
| except Exception as e: | |
| print("[logger] Full overwrite failed:", e) | |
| # ------------------------- | |
| # Flush small batches to disk (append) | |
| # ------------------------- | |
| def _flush_to_disk(): | |
| global _write_buffer | |
| with _buffer_lock: | |
| if not _write_buffer: | |
| return | |
| batch = _write_buffer[:_FLUSH_BATCH] | |
| _write_buffer = _write_buffer[len(batch):] | |
| # group by model for efficient writes | |
| groups = {} | |
| for row in batch: | |
| m = row.get("model", "bcc") | |
| groups.setdefault(m, []).append(row) | |
| for model, rows in groups.items(): | |
| fname = _file_for_model(model) | |
| try: | |
| file_empty = not os.path.exists(fname) or os.stat(fname).st_size == 0 | |
| with open(fname, "a", newline="", encoding="utf-8") as f: | |
| writer = csv.DictWriter(f, fieldnames=_headers) | |
| if file_empty: | |
| writer.writeheader() | |
| for r in rows: | |
| # write only header keys (ignore extra) | |
| writer.writerow({k: r.get(k, "") for k in _headers}) | |
| except Exception as e: | |
| print("[logger] Append write error for", model, ":", e) | |
| # ------------------------- | |
| # Background writer thread | |
| # ------------------------- | |
| def _writer_thread(): | |
| while not _stop_writer.is_set(): | |
| time.sleep(_FLUSH_INTERVAL) | |
| _flush_to_disk() | |
| # flush remaining on shutdown | |
| _flush_to_disk() | |
| _writer_thr = threading.Thread(target=_writer_thread, daemon=True) | |
| _writer_thr.start() | |
| # ------------------------- | |
| # Load existing CSVs into _model_events on startup (keep last _MAX_RECENT) | |
| # ------------------------- | |
| def _load_recent_model(model): | |
| fname = _file_for_model(model) | |
| if not os.path.exists(fname): | |
| return [] | |
| try: | |
| with open(fname, "r", encoding="utf-8") as f: | |
| reader = list(csv.DictReader(f)) | |
| return reader[-_MAX_RECENT:] | |
| except Exception: | |
| return [] | |
| def _load_all_recent(): | |
| global _model_events | |
| with _events_lock: | |
| _model_events["bcc"] = _load_recent_model("bcc") | |
| _model_events["cicids"] = _load_recent_model("cicids") | |
| _load_all_recent() | |
| # =============================== | |
| # Public API: push_event | |
| # =============================== | |
| def push_event(evt): | |
| """ | |
| evt: dict containing event fields expected (prediction, src_ip, dst_ip, etc.) | |
| Uses current active model to store event. | |
| Also enqueues to write buffer for background flush. | |
| """ | |
| global _write_buffer | |
| # attach model at time of push | |
| with _active_model_lock: | |
| model = _active_model | |
| e = dict(evt) | |
| e.setdefault("time", datetime.now().strftime("%H:%M:%S")) | |
| e.setdefault("risk_level", "Low") | |
| e.setdefault("risk_score", 0) | |
| # add to in-memory buffer for model | |
| with _events_lock: | |
| _model_events.setdefault(model, []) | |
| _model_events[model].append(e) | |
| if len(_model_events[model]) > _MAX_RECENT: | |
| _model_events[model] = _model_events[model][-_MAX_RECENT:] | |
| # update stats | |
| pred = str(e.get("prediction", "Unknown")) | |
| _model_stats.setdefault(model, {}) | |
| _model_stats[model][pred] = _model_stats[model].get(pred, 0) + 1 | |
| # add to write buffer with model tag for background writer | |
| item = dict(e) | |
| item["model"] = model | |
| with _buffer_lock: | |
| _write_buffer.append(item) | |
| # if buffer grows big, flush asynchronously | |
| if len(_write_buffer) > (_FLUSH_BATCH * 4): | |
| threading.Thread(target=_flush_to_disk, daemon=True).start() | |
| # =============================== | |
| # Public API: get recent & stats | |
| # =============================== | |
| def get_recent_events(model="bcc", n=None): | |
| with _events_lock: | |
| data = list(_model_events.get(model, [])) | |
| if n: | |
| return data[-n:] | |
| return data | |
| def get_model_stats(model="bcc"): | |
| with _events_lock: | |
| # return a shallow copy to avoid external mutation | |
| return dict(_model_stats.get(model, {})) | |
| # ------------------------- | |
| # Convenience: summary across active model (legacy) | |
| # ------------------------- | |
| def summarize_counts(): | |
| with _active_model_lock: | |
| model = _active_model | |
| return get_model_stats(model) | |
| # =============================== | |
| # Model selection API | |
| # =============================== | |
| def set_active_model(model): | |
| if model not in ("bcc", "cicids"): | |
| raise ValueError("invalid model") | |
| with _active_model_lock: | |
| global _active_model | |
| _active_model = model | |
| # no immediate clearing — in-memory buffers persist per model | |
| return _active_model | |
| def get_active_model(): | |
| with _active_model_lock: | |
| return _active_model | |
| # =============================== | |
| # CLEAR / DELETE (model-wise) | |
| # =============================== | |
| def clear_last_events(model="bcc", n=99999): | |
| with _events_lock: | |
| ev = _model_events.get(model, []) | |
| if n >= len(ev): | |
| _model_events[model] = [] | |
| else: | |
| _model_events[model] = ev[:-n] | |
| # reset stats for this model | |
| _model_stats[model] = {} | |
| # rewrite model CSV fully | |
| _flush_full_overwrite_model(model) | |
| return True | |
| def delete_by_index(model="bcc", idx=0): | |
| with _events_lock: | |
| ev = _model_events.get(model, []) | |
| if 0 <= idx < len(ev): | |
| ev.pop(idx) | |
| _model_events[model] = ev | |
| # recompute stats (simple recompute) | |
| _model_stats[model] = {} | |
| for e in ev: | |
| pred = str(e.get("prediction", "Unknown")) | |
| _model_stats[model][pred] = _model_stats[model].get(pred, 0) + 1 | |
| _flush_full_overwrite_model(model) | |
| return True | |
| return False | |
| def delete_by_prediction(model="bcc", pred=None): | |
| if pred is None: | |
| return False | |
| with _events_lock: | |
| ev = _model_events.get(model, []) | |
| _model_events[model] = [e for e in ev if e.get("prediction") != pred] | |
| # recompute stats | |
| _model_stats[model] = {} | |
| for e in _model_events[model]: | |
| p = str(e.get("prediction", "Unknown")) | |
| _model_stats[model][p] = _model_stats[model].get(p, 0) + 1 | |
| _flush_full_overwrite_model(model) | |
| return True | |
| # =============================== | |
| # Shutdown | |
| # =============================== | |
| def shutdown_logger(): | |
| _stop_writer.set() | |
| _writer_thr.join(timeout=3) | |