Spaces:
Configuration error
Configuration error
File size: 10,917 Bytes
77bcbf1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | """
CASCADE System Observer - Process logs into CASCADE events.
Observes log files or streams and emits CASCADE-compatible events
with full hash chain provenance.
"""
import time
import json
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any, List, Generator, Union
from dataclasses import dataclass, field
from cascade.system.adapter import (
LogAdapter, ParsedEvent, auto_detect_adapter,
JSONLAdapter, ApacheLogAdapter, GenericLogAdapter,
)
from cascade.core.event import Event
from cascade.analysis.metrics import MetricsEngine
@dataclass
class SystemObservation:
"""Result of observing a log source."""
source: str # File path or stream name
adapter_name: str
events: List[ParsedEvent] = field(default_factory=list)
# Hash chain
merkle_root: str = ""
chain_length: int = 0
# Statistics
event_counts: Dict[str, int] = field(default_factory=dict) # type -> count
component_counts: Dict[str, int] = field(default_factory=dict) # component -> count
time_range: tuple = (0.0, 0.0) # (min_ts, max_ts)
# Errors
parse_errors: int = 0
def compute_merkle_root(self) -> str:
"""Compute Merkle root of all event hashes."""
if not self.events:
return ""
hashes = [e.event_hash for e in self.events]
# Build Merkle tree
while len(hashes) > 1:
if len(hashes) % 2 == 1:
hashes.append(hashes[-1]) # Duplicate last if odd
new_level = []
for i in range(0, len(hashes), 2):
combined = hashes[i] + hashes[i + 1]
new_hash = hashlib.sha256(combined.encode()).hexdigest()[:16]
new_level.append(new_hash)
hashes = new_level
self.merkle_root = hashes[0] if hashes else ""
self.chain_length = len(self.events)
return self.merkle_root
def to_summary(self) -> Dict[str, Any]:
"""Generate summary for display."""
return {
"source": self.source,
"adapter": self.adapter_name,
"total_events": len(self.events),
"merkle_root": self.merkle_root,
"chain_length": self.chain_length,
"event_types": self.event_counts,
"components": self.component_counts,
"time_range": {
"start": self.time_range[0],
"end": self.time_range[1],
"duration_sec": self.time_range[1] - self.time_range[0] if self.time_range[1] > 0 else 0,
},
"parse_errors": self.parse_errors,
}
class SystemObserver:
"""
Observe system logs and emit CASCADE events.
This is the bridge between external system logs and CASCADE visualization.
"""
def __init__(self, adapter: LogAdapter = None):
"""
Args:
adapter: Log adapter to use. If None, will auto-detect.
"""
self.adapter = adapter
self.observations: List[SystemObservation] = []
self.metrics_engine = MetricsEngine()
def observe_file(self, filepath: str, adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]:
"""
Observe a log file and emit CASCADE events.
Args:
filepath: Path to log file
adapter: Override adapter (auto-detect if None)
Yields:
CASCADE Event objects
Returns:
SystemObservation with summary and provenance
"""
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Log file not found: {filepath}")
# Auto-detect adapter if needed
if adapter is None:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
sample = [f.readline() for _ in range(20)]
adapter = auto_detect_adapter(sample)
observation = SystemObservation(
source=str(filepath),
adapter_name=adapter.name,
)
min_ts = float("inf")
max_ts = 0.0
# Parse file
for parsed in adapter.parse_file(filepath):
observation.events.append(parsed)
# Update stats
observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1
observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1
min_ts = min(min_ts, parsed.timestamp)
max_ts = max(max_ts, parsed.timestamp)
# Convert to CASCADE Event and yield
cascade_event = Event(
timestamp=parsed.timestamp,
event_type=parsed.event_type,
component=parsed.component,
data={
**parsed.data,
"hash": parsed.event_hash,
"parent_hash": parsed.parent_hash,
"source": "system_log",
"adapter": adapter.name,
},
)
self.metrics_engine.ingest(cascade_event)
yield cascade_event
# Finalize observation
observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts)
observation.compute_merkle_root()
self.observations.append(observation)
return observation
def observe_lines(self, lines: List[str], source_name: str = "input", adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]:
"""
Observe log lines (e.g., from text input or upload).
Args:
lines: List of log lines
source_name: Name for this source
adapter: Override adapter (auto-detect if None)
Yields:
CASCADE Event objects
"""
if adapter is None:
adapter = auto_detect_adapter(lines[:20])
observation = SystemObservation(
source=source_name,
adapter_name=adapter.name,
)
min_ts = float("inf")
max_ts = 0.0
for parsed in adapter.parse_lines(lines):
observation.events.append(parsed)
observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1
observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1
min_ts = min(min_ts, parsed.timestamp)
max_ts = max(max_ts, parsed.timestamp)
cascade_event = Event(
timestamp=parsed.timestamp,
event_type=parsed.event_type,
component=parsed.component,
data={
**parsed.data,
"hash": parsed.event_hash,
"parent_hash": parsed.parent_hash,
"source": "system_log",
"adapter": adapter.name,
},
)
self.metrics_engine.ingest(cascade_event)
yield cascade_event
observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts)
observation.compute_merkle_root()
self.observations.append(observation)
return observation
def get_all_events_for_viz(self) -> List[Dict[str, Any]]:
"""Get all events formatted for CASCADE visualization."""
all_events = []
for obs in self.observations:
for parsed in obs.events:
all_events.append({
"event": parsed.to_cascade_event(),
"metrics": self.metrics_engine.summary(),
"triage": self.metrics_engine.triage(),
})
return all_events
def get_provenance_summary(self) -> Dict[str, Any]:
"""Get provenance summary for all observations."""
return {
"observations": [obs.to_summary() for obs in self.observations],
"total_events": sum(len(obs.events) for obs in self.observations),
"total_sources": len(self.observations),
}
def observe_log_file(filepath: str) -> Generator[Event, None, SystemObservation]:
"""
Convenience function to observe a log file.
Usage:
for event in observe_log_file("access.log"):
print(event)
"""
observer = SystemObserver()
return observer.observe_file(filepath)
def observe_log_stream(lines: List[str], source: str = "stream") -> Generator[Event, None, SystemObservation]:
"""
Convenience function to observe log lines.
Usage:
lines = log_text.split("\\n")
for event in observe_log_stream(lines):
print(event)
"""
observer = SystemObserver()
return observer.observe_lines(lines, source)
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# TAPE WRITING - Write observations to tape for playback
# βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def write_system_tape(observation: SystemObservation, tape_dir: str = "./logs") -> str:
"""
Write system observation to tape file for playback.
Args:
observation: SystemObservation to write
tape_dir: Directory for tape files
Returns:
Path to tape file
"""
tape_path = Path(tape_dir)
tape_path.mkdir(parents=True, exist_ok=True)
session_id = int(time.time())
filename = tape_path / f"system_tape_{session_id}.jsonl"
with open(filename, "w", encoding="utf-8") as f:
# Write header
header = {
"seq": 0,
"ts": time.time(),
"type": "header",
"source": observation.source,
"adapter": observation.adapter_name,
"merkle_root": observation.merkle_root,
"chain_length": observation.chain_length,
}
f.write(json.dumps(header) + "\n")
# Write events
for i, parsed in enumerate(observation.events, 1):
record = {
"seq": i,
"ts": parsed.timestamp,
"event": parsed.to_cascade_event(),
}
f.write(json.dumps(record, default=str) + "\n")
return str(filename)
|