Spaces:
Configuration error
Configuration error
| """ | |
| CASCADE System Observer - Process logs into CASCADE events. | |
| Observes log files or streams and emits CASCADE-compatible events | |
| with full hash chain provenance. | |
| """ | |
| import time | |
| import json | |
| import hashlib | |
| from pathlib import Path | |
| from typing import Optional, Dict, Any, List, Generator, Union | |
| from dataclasses import dataclass, field | |
| from cascade.system.adapter import ( | |
| LogAdapter, ParsedEvent, auto_detect_adapter, | |
| JSONLAdapter, ApacheLogAdapter, GenericLogAdapter, | |
| ) | |
| from cascade.core.event import Event | |
| from cascade.analysis.metrics import MetricsEngine | |
| class SystemObservation: | |
| """Result of observing a log source.""" | |
| source: str # File path or stream name | |
| adapter_name: str | |
| events: List[ParsedEvent] = field(default_factory=list) | |
| # Hash chain | |
| merkle_root: str = "" | |
| chain_length: int = 0 | |
| # Statistics | |
| event_counts: Dict[str, int] = field(default_factory=dict) # type -> count | |
| component_counts: Dict[str, int] = field(default_factory=dict) # component -> count | |
| time_range: tuple = (0.0, 0.0) # (min_ts, max_ts) | |
| # Errors | |
| parse_errors: int = 0 | |
| def compute_merkle_root(self) -> str: | |
| """Compute Merkle root of all event hashes.""" | |
| if not self.events: | |
| return "" | |
| hashes = [e.event_hash for e in self.events] | |
| # Build Merkle tree | |
| while len(hashes) > 1: | |
| if len(hashes) % 2 == 1: | |
| hashes.append(hashes[-1]) # Duplicate last if odd | |
| new_level = [] | |
| for i in range(0, len(hashes), 2): | |
| combined = hashes[i] + hashes[i + 1] | |
| new_hash = hashlib.sha256(combined.encode()).hexdigest()[:16] | |
| new_level.append(new_hash) | |
| hashes = new_level | |
| self.merkle_root = hashes[0] if hashes else "" | |
| self.chain_length = len(self.events) | |
| return self.merkle_root | |
| def to_summary(self) -> Dict[str, Any]: | |
| """Generate summary for display.""" | |
| return { | |
| "source": self.source, | |
| "adapter": self.adapter_name, | |
| "total_events": len(self.events), | |
| "merkle_root": self.merkle_root, | |
| "chain_length": self.chain_length, | |
| "event_types": self.event_counts, | |
| "components": self.component_counts, | |
| "time_range": { | |
| "start": self.time_range[0], | |
| "end": self.time_range[1], | |
| "duration_sec": self.time_range[1] - self.time_range[0] if self.time_range[1] > 0 else 0, | |
| }, | |
| "parse_errors": self.parse_errors, | |
| } | |
| class SystemObserver: | |
| """ | |
| Observe system logs and emit CASCADE events. | |
| This is the bridge between external system logs and CASCADE visualization. | |
| """ | |
| def __init__(self, adapter: LogAdapter = None): | |
| """ | |
| Args: | |
| adapter: Log adapter to use. If None, will auto-detect. | |
| """ | |
| self.adapter = adapter | |
| self.observations: List[SystemObservation] = [] | |
| self.metrics_engine = MetricsEngine() | |
| def observe_file(self, filepath: str, adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]: | |
| """ | |
| Observe a log file and emit CASCADE events. | |
| Args: | |
| filepath: Path to log file | |
| adapter: Override adapter (auto-detect if None) | |
| Yields: | |
| CASCADE Event objects | |
| Returns: | |
| SystemObservation with summary and provenance | |
| """ | |
| path = Path(filepath) | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Log file not found: {filepath}") | |
| # Auto-detect adapter if needed | |
| if adapter is None: | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
| sample = [f.readline() for _ in range(20)] | |
| adapter = auto_detect_adapter(sample) | |
| observation = SystemObservation( | |
| source=str(filepath), | |
| adapter_name=adapter.name, | |
| ) | |
| min_ts = float("inf") | |
| max_ts = 0.0 | |
| # Parse file | |
| for parsed in adapter.parse_file(filepath): | |
| observation.events.append(parsed) | |
| # Update stats | |
| observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1 | |
| observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1 | |
| min_ts = min(min_ts, parsed.timestamp) | |
| max_ts = max(max_ts, parsed.timestamp) | |
| # Convert to CASCADE Event and yield | |
| cascade_event = Event( | |
| timestamp=parsed.timestamp, | |
| event_type=parsed.event_type, | |
| component=parsed.component, | |
| data={ | |
| **parsed.data, | |
| "hash": parsed.event_hash, | |
| "parent_hash": parsed.parent_hash, | |
| "source": "system_log", | |
| "adapter": adapter.name, | |
| }, | |
| ) | |
| self.metrics_engine.ingest(cascade_event) | |
| yield cascade_event | |
| # Finalize observation | |
| observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts) | |
| observation.compute_merkle_root() | |
| self.observations.append(observation) | |
| return observation | |
| def observe_lines(self, lines: List[str], source_name: str = "input", adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]: | |
| """ | |
| Observe log lines (e.g., from text input or upload). | |
| Args: | |
| lines: List of log lines | |
| source_name: Name for this source | |
| adapter: Override adapter (auto-detect if None) | |
| Yields: | |
| CASCADE Event objects | |
| """ | |
| if adapter is None: | |
| adapter = auto_detect_adapter(lines[:20]) | |
| observation = SystemObservation( | |
| source=source_name, | |
| adapter_name=adapter.name, | |
| ) | |
| min_ts = float("inf") | |
| max_ts = 0.0 | |
| for parsed in adapter.parse_lines(lines): | |
| observation.events.append(parsed) | |
| observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1 | |
| observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1 | |
| min_ts = min(min_ts, parsed.timestamp) | |
| max_ts = max(max_ts, parsed.timestamp) | |
| cascade_event = Event( | |
| timestamp=parsed.timestamp, | |
| event_type=parsed.event_type, | |
| component=parsed.component, | |
| data={ | |
| **parsed.data, | |
| "hash": parsed.event_hash, | |
| "parent_hash": parsed.parent_hash, | |
| "source": "system_log", | |
| "adapter": adapter.name, | |
| }, | |
| ) | |
| self.metrics_engine.ingest(cascade_event) | |
| yield cascade_event | |
| observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts) | |
| observation.compute_merkle_root() | |
| self.observations.append(observation) | |
| return observation | |
| def get_all_events_for_viz(self) -> List[Dict[str, Any]]: | |
| """Get all events formatted for CASCADE visualization.""" | |
| all_events = [] | |
| for obs in self.observations: | |
| for parsed in obs.events: | |
| all_events.append({ | |
| "event": parsed.to_cascade_event(), | |
| "metrics": self.metrics_engine.summary(), | |
| "triage": self.metrics_engine.triage(), | |
| }) | |
| return all_events | |
| def get_provenance_summary(self) -> Dict[str, Any]: | |
| """Get provenance summary for all observations.""" | |
| return { | |
| "observations": [obs.to_summary() for obs in self.observations], | |
| "total_events": sum(len(obs.events) for obs in self.observations), | |
| "total_sources": len(self.observations), | |
| } | |
| def observe_log_file(filepath: str) -> Generator[Event, None, SystemObservation]: | |
| """ | |
| Convenience function to observe a log file. | |
| Usage: | |
| for event in observe_log_file("access.log"): | |
| print(event) | |
| """ | |
| observer = SystemObserver() | |
| return observer.observe_file(filepath) | |
| def observe_log_stream(lines: List[str], source: str = "stream") -> Generator[Event, None, SystemObservation]: | |
| """ | |
| Convenience function to observe log lines. | |
| Usage: | |
| lines = log_text.split("\\n") | |
| for event in observe_log_stream(lines): | |
| print(event) | |
| """ | |
| observer = SystemObserver() | |
| return observer.observe_lines(lines, source) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # TAPE WRITING - Write observations to tape for playback | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def write_system_tape(observation: SystemObservation, tape_dir: str = "./logs") -> str: | |
| """ | |
| Write system observation to tape file for playback. | |
| Args: | |
| observation: SystemObservation to write | |
| tape_dir: Directory for tape files | |
| Returns: | |
| Path to tape file | |
| """ | |
| tape_path = Path(tape_dir) | |
| tape_path.mkdir(parents=True, exist_ok=True) | |
| session_id = int(time.time()) | |
| filename = tape_path / f"system_tape_{session_id}.jsonl" | |
| with open(filename, "w", encoding="utf-8") as f: | |
| # Write header | |
| header = { | |
| "seq": 0, | |
| "ts": time.time(), | |
| "type": "header", | |
| "source": observation.source, | |
| "adapter": observation.adapter_name, | |
| "merkle_root": observation.merkle_root, | |
| "chain_length": observation.chain_length, | |
| } | |
| f.write(json.dumps(header) + "\n") | |
| # Write events | |
| for i, parsed in enumerate(observation.events, 1): | |
| record = { | |
| "seq": i, | |
| "ts": parsed.timestamp, | |
| "event": parsed.to_cascade_event(), | |
| } | |
| f.write(json.dumps(record, default=str) + "\n") | |
| return str(filename) | |