""" Database and scenario loading helpers. """ from __future__ import annotations import os import random import sqlite3 from pathlib import Path from typing import Any, Dict, Iterable, List from tasks.catalog import TASK_SPECS DEFAULT_DB_PATH = Path(__file__).resolve().parents[1] / "novatech_logs.db" DB_PATH = Path(os.getenv("DB_PATH", str(DEFAULT_DB_PATH))).expanduser().resolve() def _connect() -> sqlite3.Connection: if not DB_PATH.exists(): raise FileNotFoundError(f"Database not found at '{DB_PATH}'") return sqlite3.connect(str(DB_PATH)) def load_thresholds() -> Dict[str, Dict[str, float]]: conn = _connect() rows = conn.execute( "SELECT metric_name, warning_threshold, critical_threshold, consecutive_count FROM anomaly_thresholds" ).fetchall() conn.close() return { row[0]: { "warning": float(row[1]), "critical": float(row[2]), "consecutive": float(row[3]), } for row in rows } def load_patterns() -> Dict[str, Dict[str, str]]: conn = _connect() rows = conn.execute( "SELECT pattern_keyword, severity, description FROM known_error_patterns ORDER BY pattern_id" ).fetchall() conn.close() return {row[0]: {"severity": row[1], "description": row[2]} for row in rows} def load_all_logs() -> List[Dict[str, Any]]: conn = _connect() rows = conn.execute( """ SELECT log_id, timestamp, server_id, log_level, service_name, message, response_time_ms, cpu_usage_percent, memory_usage_percent FROM server_logs ORDER BY timestamp ASC, log_id ASC """ ).fetchall() conn.close() return [ { "log_id": int(row[0]), "timestamp": str(row[1]), "server_id": str(row[2]), "log_level": str(row[3]), "service_name": str(row[4]), "message": str(row[5]), "response_time_ms": int(row[6] or 0), "cpu_usage_percent": float(row[7] or 0.0), "memory_usage_percent": float(row[8] or 0.0), } for row in rows ] def _within_window(log: Dict[str, Any], start: str, end: str) -> bool: return start <= str(log["timestamp"]) <= end def _base_scope(task_id: str, logs: Iterable[Dict[str, Any]]) -> List[Dict[str, Any]]: spec = TASK_SPECS[task_id] scope_servers = set(spec["scope_servers"]) scope_services = set(spec["scope_services"]) start = str(spec["incident_window_start"]) end = str(spec["incident_window_end"]) return [ log for log in logs if log["server_id"] in scope_servers and log["service_name"] in scope_services and ( _within_window(log, start, end) or log["log_id"] in set(spec["must_include_ids"]) ) ] def build_task_log_pool(task_id: str, seed: int) -> List[Dict[str, Any]]: spec = TASK_SPECS[task_id] rng = random.Random(seed) all_logs = load_all_logs() must_include_ids = set(spec["must_include_ids"]) base_scope = _base_scope(task_id, all_logs) scope_ids = {log["log_id"] for log in base_scope} for log in all_logs: if log["log_id"] in must_include_ids: scope_ids.add(log["log_id"]) scope_logs = [log for log in all_logs if log["log_id"] in scope_ids] noise_candidates = [ log for log in all_logs if log["log_id"] not in scope_ids and log["server_id"] in set(spec["scope_servers"]) and log["service_name"] in set(spec["scope_services"]) ] sample_size = min(int(spec["noise_sample_size"]), len(noise_candidates)) if sample_size: for log in rng.sample(noise_candidates, sample_size): scope_logs.append(log) enriched = [] for index, log in enumerate(scope_logs): log_copy = dict(log) log_copy["_seed_rank"] = rng.random() + (index * 0.00001) enriched.append(log_copy) return enriched