File size: 10,917 Bytes
77bcbf1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
"""
CASCADE System Observer - Process logs into CASCADE events.

Observes log files or streams and emits CASCADE-compatible events
with full hash chain provenance.
"""

import time
import json
import hashlib
from pathlib import Path
from typing import Optional, Dict, Any, List, Generator, Union
from dataclasses import dataclass, field

from cascade.system.adapter import (
    LogAdapter, ParsedEvent, auto_detect_adapter,
    JSONLAdapter, ApacheLogAdapter, GenericLogAdapter,
)
from cascade.core.event import Event
from cascade.analysis.metrics import MetricsEngine


@dataclass
class SystemObservation:
    """Result of observing a log source."""
    
    source: str  # File path or stream name
    adapter_name: str
    events: List[ParsedEvent] = field(default_factory=list)
    
    # Hash chain
    merkle_root: str = ""
    chain_length: int = 0
    
    # Statistics
    event_counts: Dict[str, int] = field(default_factory=dict)  # type -> count
    component_counts: Dict[str, int] = field(default_factory=dict)  # component -> count
    time_range: tuple = (0.0, 0.0)  # (min_ts, max_ts)
    
    # Errors
    parse_errors: int = 0
    
    def compute_merkle_root(self) -> str:
        """Compute Merkle root of all event hashes."""
        if not self.events:
            return ""
        
        hashes = [e.event_hash for e in self.events]
        
        # Build Merkle tree
        while len(hashes) > 1:
            if len(hashes) % 2 == 1:
                hashes.append(hashes[-1])  # Duplicate last if odd
            
            new_level = []
            for i in range(0, len(hashes), 2):
                combined = hashes[i] + hashes[i + 1]
                new_hash = hashlib.sha256(combined.encode()).hexdigest()[:16]
                new_level.append(new_hash)
            hashes = new_level
        
        self.merkle_root = hashes[0] if hashes else ""
        self.chain_length = len(self.events)
        return self.merkle_root
    
    def to_summary(self) -> Dict[str, Any]:
        """Generate summary for display."""
        return {
            "source": self.source,
            "adapter": self.adapter_name,
            "total_events": len(self.events),
            "merkle_root": self.merkle_root,
            "chain_length": self.chain_length,
            "event_types": self.event_counts,
            "components": self.component_counts,
            "time_range": {
                "start": self.time_range[0],
                "end": self.time_range[1],
                "duration_sec": self.time_range[1] - self.time_range[0] if self.time_range[1] > 0 else 0,
            },
            "parse_errors": self.parse_errors,
        }


class SystemObserver:
    """
    Observe system logs and emit CASCADE events.
    
    This is the bridge between external system logs and CASCADE visualization.
    """
    
    def __init__(self, adapter: LogAdapter = None):
        """
        Args:
            adapter: Log adapter to use. If None, will auto-detect.
        """
        self.adapter = adapter
        self.observations: List[SystemObservation] = []
        self.metrics_engine = MetricsEngine()
    
    def observe_file(self, filepath: str, adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]:
        """
        Observe a log file and emit CASCADE events.
        
        Args:
            filepath: Path to log file
            adapter: Override adapter (auto-detect if None)
            
        Yields:
            CASCADE Event objects
            
        Returns:
            SystemObservation with summary and provenance
        """
        path = Path(filepath)
        if not path.exists():
            raise FileNotFoundError(f"Log file not found: {filepath}")
        
        # Auto-detect adapter if needed
        if adapter is None:
            with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
                sample = [f.readline() for _ in range(20)]
            adapter = auto_detect_adapter(sample)
        
        observation = SystemObservation(
            source=str(filepath),
            adapter_name=adapter.name,
        )
        
        min_ts = float("inf")
        max_ts = 0.0
        
        # Parse file
        for parsed in adapter.parse_file(filepath):
            observation.events.append(parsed)
            
            # Update stats
            observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1
            observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1
            
            min_ts = min(min_ts, parsed.timestamp)
            max_ts = max(max_ts, parsed.timestamp)
            
            # Convert to CASCADE Event and yield
            cascade_event = Event(
                timestamp=parsed.timestamp,
                event_type=parsed.event_type,
                component=parsed.component,
                data={
                    **parsed.data,
                    "hash": parsed.event_hash,
                    "parent_hash": parsed.parent_hash,
                    "source": "system_log",
                    "adapter": adapter.name,
                },
            )
            
            self.metrics_engine.ingest(cascade_event)
            yield cascade_event
        
        # Finalize observation
        observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts)
        observation.compute_merkle_root()
        self.observations.append(observation)
        
        return observation
    
    def observe_lines(self, lines: List[str], source_name: str = "input", adapter: LogAdapter = None) -> Generator[Event, None, SystemObservation]:
        """
        Observe log lines (e.g., from text input or upload).
        
        Args:
            lines: List of log lines
            source_name: Name for this source
            adapter: Override adapter (auto-detect if None)
            
        Yields:
            CASCADE Event objects
        """
        if adapter is None:
            adapter = auto_detect_adapter(lines[:20])
        
        observation = SystemObservation(
            source=source_name,
            adapter_name=adapter.name,
        )
        
        min_ts = float("inf")
        max_ts = 0.0
        
        for parsed in adapter.parse_lines(lines):
            observation.events.append(parsed)
            
            observation.event_counts[parsed.event_type] = observation.event_counts.get(parsed.event_type, 0) + 1
            observation.component_counts[parsed.component] = observation.component_counts.get(parsed.component, 0) + 1
            
            min_ts = min(min_ts, parsed.timestamp)
            max_ts = max(max_ts, parsed.timestamp)
            
            cascade_event = Event(
                timestamp=parsed.timestamp,
                event_type=parsed.event_type,
                component=parsed.component,
                data={
                    **parsed.data,
                    "hash": parsed.event_hash,
                    "parent_hash": parsed.parent_hash,
                    "source": "system_log",
                    "adapter": adapter.name,
                },
            )
            
            self.metrics_engine.ingest(cascade_event)
            yield cascade_event
        
        observation.time_range = (min_ts if min_ts != float("inf") else 0, max_ts)
        observation.compute_merkle_root()
        self.observations.append(observation)
        
        return observation
    
    def get_all_events_for_viz(self) -> List[Dict[str, Any]]:
        """Get all events formatted for CASCADE visualization."""
        all_events = []
        for obs in self.observations:
            for parsed in obs.events:
                all_events.append({
                    "event": parsed.to_cascade_event(),
                    "metrics": self.metrics_engine.summary(),
                    "triage": self.metrics_engine.triage(),
                })
        return all_events
    
    def get_provenance_summary(self) -> Dict[str, Any]:
        """Get provenance summary for all observations."""
        return {
            "observations": [obs.to_summary() for obs in self.observations],
            "total_events": sum(len(obs.events) for obs in self.observations),
            "total_sources": len(self.observations),
        }


def observe_log_file(filepath: str) -> Generator[Event, None, SystemObservation]:
    """
    Convenience function to observe a log file.
    
    Usage:
        for event in observe_log_file("access.log"):
            print(event)
    """
    observer = SystemObserver()
    return observer.observe_file(filepath)


def observe_log_stream(lines: List[str], source: str = "stream") -> Generator[Event, None, SystemObservation]:
    """
    Convenience function to observe log lines.
    
    Usage:
        lines = log_text.split("\\n")
        for event in observe_log_stream(lines):
            print(event)
    """
    observer = SystemObserver()
    return observer.observe_lines(lines, source)


# ═══════════════════════════════════════════════════════════════════════════════
# TAPE WRITING - Write observations to tape for playback
# ═══════════════════════════════════════════════════════════════════════════════

def write_system_tape(observation: SystemObservation, tape_dir: str = "./logs") -> str:
    """
    Write system observation to tape file for playback.
    
    Args:
        observation: SystemObservation to write
        tape_dir: Directory for tape files
        
    Returns:
        Path to tape file
    """
    tape_path = Path(tape_dir)
    tape_path.mkdir(parents=True, exist_ok=True)
    
    session_id = int(time.time())
    filename = tape_path / f"system_tape_{session_id}.jsonl"
    
    with open(filename, "w", encoding="utf-8") as f:
        # Write header
        header = {
            "seq": 0,
            "ts": time.time(),
            "type": "header",
            "source": observation.source,
            "adapter": observation.adapter_name,
            "merkle_root": observation.merkle_root,
            "chain_length": observation.chain_length,
        }
        f.write(json.dumps(header) + "\n")
        
        # Write events
        for i, parsed in enumerate(observation.events, 1):
            record = {
                "seq": i,
                "ts": parsed.timestamp,
                "event": parsed.to_cascade_event(),
            }
            f.write(json.dumps(record, default=str) + "\n")
    
    return str(filename)