Spaces:
Configuration error
Configuration error
| """ | |
| CASCADE System Observatory - Log Adapters | |
| Transform any log format into CASCADE events. | |
| Each adapter parses a specific log format and emits standardized events. | |
| The key insight: all logs are just events with timestamps, components, and data. | |
| CASCADE doesn't care WHERE the events come from - it visualizes causation. | |
| Enhanced with: | |
| - drain3: IBM's template mining for auto-discovering log structure | |
| - dateparser: Universal timestamp parsing for any format | |
| """ | |
| import re | |
| import json | |
| import hashlib | |
| import time | |
| from abc import ABC, abstractmethod | |
| from typing import Optional, Dict, Any, List, Generator, Tuple | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| from pathlib import Path | |
| # Universal parsing libraries | |
| try: | |
| import dateparser | |
| HAS_DATEPARSER = True | |
| except ImportError: | |
| HAS_DATEPARSER = False | |
| try: | |
| from drain3 import TemplateMiner | |
| from drain3.template_miner_config import TemplateMinerConfig | |
| HAS_DRAIN3 = True | |
| except ImportError: | |
| HAS_DRAIN3 = False | |
| class ParsedEvent: | |
| """Standardized event parsed from any log format.""" | |
| timestamp: float | |
| event_type: str | |
| component: str | |
| data: Dict[str, Any] | |
| raw_line: str = "" | |
| # Hash chain for provenance | |
| event_hash: str = field(default="") | |
| parent_hash: str = field(default="") | |
| def __post_init__(self): | |
| if not self.event_hash: | |
| self.event_hash = self._compute_hash() | |
| def _compute_hash(self) -> str: | |
| """Compute deterministic hash of this event.""" | |
| content = json.dumps({ | |
| "ts": self.timestamp, | |
| "type": self.event_type, | |
| "component": self.component, | |
| "data": self.data, | |
| "parent": self.parent_hash, | |
| }, sort_keys=True, default=str) | |
| return hashlib.sha256(content.encode()).hexdigest()[:16] | |
| def to_cascade_event(self) -> Dict[str, Any]: | |
| """Convert to CASCADE event format for visualization.""" | |
| return { | |
| "event_id": f"sys_{self.event_hash}", | |
| "timestamp": self.timestamp, | |
| "event_type": self.event_type, | |
| "component": self.component, | |
| "data": { | |
| **self.data, | |
| "hash": self.event_hash, | |
| "parent_hash": self.parent_hash, | |
| }, | |
| } | |
| class LogAdapter(ABC): | |
| """ | |
| Base class for log adapters. | |
| Implement parse_line() to convert your log format to ParsedEvent. | |
| """ | |
| name: str = "base" | |
| description: str = "Base log adapter" | |
| def __init__(self): | |
| self.event_count = 0 | |
| self.last_hash = "" | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| """ | |
| Parse a single log line. | |
| Args: | |
| line: Raw log line | |
| Returns: | |
| ParsedEvent if successfully parsed, None to skip this line | |
| """ | |
| pass | |
| def parse_file(self, filepath: str) -> Generator[ParsedEvent, None, None]: | |
| """Parse all lines in a file.""" | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
| for line in f: | |
| event = self.parse_line(line.strip()) | |
| if event: | |
| event.parent_hash = self.last_hash | |
| event.event_hash = event._compute_hash() | |
| self.last_hash = event.event_hash | |
| self.event_count += 1 | |
| yield event | |
| def parse_lines(self, lines: List[str]) -> Generator[ParsedEvent, None, None]: | |
| """Parse a list of lines.""" | |
| for line in lines: | |
| event = self.parse_line(line.strip()) | |
| if event: | |
| event.parent_hash = self.last_hash | |
| event.event_hash = event._compute_hash() | |
| self.last_hash = event.event_hash | |
| self.event_count += 1 | |
| yield event | |
| class UniversalAdapter(LogAdapter): | |
| """ | |
| THE UNIVERSAL ADAPTER - One parser to rule them all. | |
| Handles ANY log format through recursive field discovery: | |
| - JSON at any nesting depth (CASCADE, ELK, Datadog, custom) | |
| - Apache/Nginx access logs | |
| - Kubernetes events | |
| - Syslog format | |
| - Generic timestamped text | |
| - Raw text (fallback) | |
| The philosophy: logs are just events. Every line has: | |
| - A timestamp (explicit or implicit from order) | |
| - A source/component (explicit or inferred) | |
| - A severity/type (explicit or inferred) | |
| - A message (always present) | |
| This adapter finds these fields regardless of format. | |
| """ | |
| name = "universal" | |
| description = "Universal Log Parser - handles any format" | |
| # Field name variations (searched recursively in JSON) | |
| TIMESTAMP_ALIASES = { | |
| "timestamp", "time", "ts", "@timestamp", "datetime", "date", "t", | |
| "created", "created_at", "logged_at", "event_time", "log_time", | |
| "when", "epoch", "unix_time", "utc_time", "local_time" | |
| } | |
| COMPONENT_ALIASES = { | |
| "component", "service", "logger", "source", "module", "name", | |
| "app", "application", "origin", "host", "hostname", "container", | |
| "pod", "namespace", "class", "category", "tag", "facility" | |
| } | |
| EVENT_TYPE_ALIASES = { | |
| "event_type", "level", "severity", "type", "log_level", "loglevel", | |
| "priority", "status", "kind", "action", "verb", "method" | |
| } | |
| MESSAGE_ALIASES = { | |
| "message", "msg", "text", "body", "content", "raw", "raw_message", | |
| "description", "detail", "details", "info", "payload", "log" | |
| } | |
| # Severity indicators (for inferring event type from text) | |
| SEVERITY_PATTERNS = { | |
| "critical": r'\b(CRITICAL|FATAL|EMERGENCY|PANIC)\b', | |
| "error": r'\b(ERROR|ERR|EXCEPTION|FAIL(ED|URE)?)\b', | |
| "warning": r'\b(WARN(ING)?|CAUTION|ALERT)\b', | |
| "info": r'\b(INFO|NOTICE|LOG)\b', | |
| "debug": r'\b(DEBUG|TRACE|VERBOSE)\b', | |
| } | |
| # Timestamp regex patterns (ordered by specificity) | |
| TIMESTAMP_PATTERNS = [ | |
| # ISO 8601 variants | |
| (r'(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z|[+-]\d{2}:?\d{2})?)', 'iso'), | |
| # Unix timestamp (float or int) | |
| (r'\b(1[5-9]\d{8}(?:\.\d+)?)\b', 'unix'), # 1500000000+ range | |
| # Apache/Nginx format | |
| (r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}(?:\s*[+-]\d{4})?)\]', 'apache'), | |
| # Syslog format | |
| (r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})', 'syslog'), | |
| # Common date formats | |
| (r'(\d{4}/\d{2}/\d{2}\s+\d{2}:\d{2}:\d{2})', 'slash'), | |
| (r'(\d{2}-\d{2}-\d{4}\s+\d{2}:\d{2}:\d{2})', 'us'), | |
| ] | |
| # Component extraction patterns (handle leading whitespace after timestamp removal) | |
| COMPONENT_PATTERNS = [ | |
| # [component] or (component) - with optional leading whitespace | |
| r'^\s*\[([^\]]+)\]', | |
| r'^\s*\(([^\)]+)\)', | |
| # component: at start (with optional leading whitespace) | |
| r'^\s*([A-Za-z][\w\-\.]+):', | |
| # Kubernetes style: namespace/pod | |
| r'\b([a-z][\w\-]+/[a-z][\w\-]+)\b', | |
| # Docker container ID | |
| r'\b([a-f0-9]{12})\b', | |
| ] | |
| # Common delimiters to auto-detect (ordered by specificity) | |
| DELIMITERS = [' | ', ' - ', '\t', ' :: ', ' -- '] | |
| def __init__(self): | |
| super().__init__() | |
| self._line_number = 0 | |
| self._base_time = time.time() | |
| self._detected_delimiter = None | |
| self._detected_format = None # Cached format info after learning | |
| self._sample_lines = [] # Collect lines for format learning | |
| self._learning_complete = False | |
| # Initialize drain3 template miner if available | |
| self._template_miner = None | |
| if HAS_DRAIN3: | |
| config = TemplateMinerConfig() | |
| # drain3 config is ready to use without explicit load | |
| self._template_miner = TemplateMiner(config=config) | |
| def parse_lines(self, lines: List[str]) -> Generator[ParsedEvent, None, None]: | |
| """ | |
| Parse lines with upfront format learning. | |
| Override base class to learn format from first N lines before parsing any. | |
| """ | |
| # Learn format from first 50 lines (or all if less) | |
| sample = [l.strip() for l in lines[:50] if l.strip() and not l.strip().startswith('{')] | |
| if sample and not self._learning_complete: | |
| self._learn_format(sample) | |
| # Now parse all lines with learned format | |
| for line in lines: | |
| event = self.parse_line(line.strip()) | |
| if event: | |
| event.parent_hash = self.last_hash | |
| event.event_hash = event._compute_hash() | |
| self.last_hash = event.event_hash | |
| self.event_count += 1 | |
| yield event | |
| def parse_file(self, filepath: str) -> Generator[ParsedEvent, None, None]: | |
| """ | |
| Parse file with upfront format learning. | |
| Override base class to learn format before yielding events. | |
| """ | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
| # Read first batch to learn format | |
| sample_lines = [] | |
| all_lines = [] | |
| for i, line in enumerate(f): | |
| all_lines.append(line) | |
| if i < 50 and line.strip() and not line.strip().startswith('{'): | |
| sample_lines.append(line.strip()) | |
| # Learn format | |
| if sample_lines and not self._learning_complete: | |
| self._learn_format(sample_lines) | |
| # Re-read and parse with learned format | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
| for line in f: | |
| event = self.parse_line(line.strip()) | |
| if event: | |
| event.parent_hash = self.last_hash | |
| event.event_hash = event._compute_hash() | |
| self.last_hash = event.event_hash | |
| self.event_count += 1 | |
| yield event | |
| def _learn_format(self, lines: List[str]) -> None: | |
| """ | |
| Learn log format from sample lines using statistical analysis. | |
| Detects: delimiter, field positions, timestamp format. | |
| """ | |
| if self._learning_complete or len(lines) < 3: | |
| return | |
| # Count delimiter occurrences across lines | |
| delimiter_counts = {d: 0 for d in self.DELIMITERS} | |
| for line in lines[:50]: # Sample first 50 lines | |
| for delim in self.DELIMITERS: | |
| count = line.count(delim) | |
| if count >= 2: # At least 3 fields | |
| delimiter_counts[delim] += count | |
| # Find most common delimiter | |
| best_delim = max(delimiter_counts, key=delimiter_counts.get) | |
| if delimiter_counts[best_delim] > len(lines) * 2: # Significant presence | |
| self._detected_delimiter = best_delim | |
| self._analyze_delimited_format(lines, best_delim) | |
| self._learning_complete = True | |
| def _analyze_delimited_format(self, lines: List[str], delimiter: str) -> None: | |
| """ | |
| Analyze field positions in delimiter-separated log format. | |
| Learns which field contains timestamp, level, component, message. | |
| """ | |
| # Split sample lines | |
| field_samples = [] | |
| for line in lines[:20]: | |
| parts = [p.strip() for p in line.split(delimiter)] | |
| if len(parts) >= 3: | |
| field_samples.append(parts) | |
| if not field_samples: | |
| return | |
| # Analyze each field position | |
| num_fields = min(len(s) for s in field_samples) | |
| format_info = { | |
| 'delimiter': delimiter, | |
| 'timestamp_idx': None, | |
| 'level_idx': None, | |
| 'component_idx': None, | |
| 'message_idx': num_fields - 1, # Default: last field is message | |
| } | |
| LEVEL_KEYWORDS = {'DEBUG', 'INFO', 'WARNING', 'WARN', 'ERROR', 'CRITICAL', 'FATAL', 'TRACE'} | |
| for idx in range(num_fields): | |
| field_values = [s[idx] if idx < len(s) else '' for s in field_samples] | |
| # Check if this field contains timestamps | |
| timestamp_score = sum(1 for v in field_values if self._looks_like_timestamp(v)) | |
| if timestamp_score > len(field_values) * 0.7: | |
| format_info['timestamp_idx'] = idx | |
| continue | |
| # Check if this field contains log levels | |
| level_score = sum(1 for v in field_values if v.upper() in LEVEL_KEYWORDS) | |
| if level_score > len(field_values) * 0.5: | |
| format_info['level_idx'] = idx | |
| continue | |
| # Check if this field looks like component names | |
| # Components are typically: lowercase, underscores/dots, consistent format | |
| component_score = sum(1 for v in field_values | |
| if v and re.match(r'^[a-zA-Z][\w\.\-]*$', v) | |
| and v.upper() not in LEVEL_KEYWORDS | |
| and not self._looks_like_timestamp(v)) | |
| if component_score > len(field_values) * 0.5 and format_info['component_idx'] is None: | |
| format_info['component_idx'] = idx | |
| self._detected_format = format_info | |
| def _looks_like_timestamp(self, value: str) -> bool: | |
| """Check if a string looks like a timestamp.""" | |
| if not value: | |
| return False | |
| # Check for time-like patterns: digits with : or - or . | |
| if re.match(r'^\d{1,4}[-/:]\d{1,2}[-/:T]', value): | |
| return True | |
| if re.match(r'^\d{2}:\d{2}:\d{2}', value): | |
| return True | |
| # Unix timestamp | |
| if re.match(r'^1[5-9]\d{8}', value): | |
| return True | |
| return False | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| """ | |
| Universal parse - handles any format with intelligent auto-detection. | |
| """ | |
| if not line or not line.strip(): | |
| return None | |
| line = line.strip() | |
| self._line_number += 1 | |
| # Collect samples for format learning | |
| if not self._learning_complete and len(self._sample_lines) < 50: | |
| self._sample_lines.append(line) | |
| if len(self._sample_lines) >= 10: | |
| self._learn_format(self._sample_lines) | |
| # Try JSON first (handles all structured formats) | |
| if line.startswith('{'): | |
| try: | |
| obj = json.loads(line) | |
| return self._parse_json(obj, line) | |
| except json.JSONDecodeError: | |
| pass | |
| # Use learned delimited format if detected | |
| if self._detected_format: | |
| return self._parse_delimited(line) | |
| # Fall back to traditional text parsing | |
| return self._parse_text(line) | |
| def _parse_delimited(self, line: str) -> ParsedEvent: | |
| """ | |
| Parse line using auto-detected delimiter format. | |
| This is the intelligent parsing path for structured text logs. | |
| """ | |
| fmt = self._detected_format | |
| parts = [p.strip() for p in line.split(fmt['delimiter'])] | |
| # Extract fields by learned positions | |
| timestamp = None | |
| level = 'info' | |
| component = 'system' | |
| message = line | |
| if fmt['timestamp_idx'] is not None and fmt['timestamp_idx'] < len(parts): | |
| ts_str = parts[fmt['timestamp_idx']] | |
| timestamp = self._parse_timestamp_universal(ts_str) | |
| if fmt['level_idx'] is not None and fmt['level_idx'] < len(parts): | |
| level = self._normalize_event_type(parts[fmt['level_idx']]) | |
| if fmt['component_idx'] is not None and fmt['component_idx'] < len(parts): | |
| component = parts[fmt['component_idx']] | |
| if fmt['message_idx'] is not None and fmt['message_idx'] < len(parts): | |
| # Message is everything from message_idx onwards (may be split by delimiter) | |
| msg_start = fmt['message_idx'] | |
| message = fmt['delimiter'].join(parts[msg_start:]) | |
| if timestamp is None: | |
| timestamp = self._base_time + (self._line_number * 0.001) | |
| # Feed to drain3 for template mining (if available) | |
| if self._template_miner: | |
| result = self._template_miner.add_log_message(message) | |
| # Could use result.get_template() for pattern analysis | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=level, | |
| component=component, | |
| data={"message": message}, | |
| raw_line=line, | |
| ) | |
| def _parse_timestamp_universal(self, ts_str: str) -> Optional[float]: | |
| """ | |
| Parse any timestamp format using dateparser (if available) or fallback regex. | |
| """ | |
| if not ts_str: | |
| return None | |
| # Try dateparser first (handles almost any format) | |
| if HAS_DATEPARSER: | |
| try: | |
| parsed = dateparser.parse(ts_str, settings={ | |
| 'RETURN_AS_TIMEZONE_AWARE': False, | |
| 'PREFER_DATES_FROM': 'past', | |
| }) | |
| if parsed: | |
| return parsed.timestamp() | |
| except Exception: | |
| pass | |
| # Fallback to existing patterns | |
| return self._parse_timestamp_string(ts_str, 'auto') | |
| def _parse_json(self, obj: dict, raw_line: str) -> ParsedEvent: | |
| """ | |
| Parse JSON object with recursive field discovery. | |
| Handles any nesting depth - finds fields wherever they are. | |
| """ | |
| # Recursively search for known fields | |
| timestamp = self._find_field(obj, self.TIMESTAMP_ALIASES) | |
| component = self._find_field(obj, self.COMPONENT_ALIASES) | |
| event_type = self._find_field(obj, self.EVENT_TYPE_ALIASES) | |
| message = self._find_field(obj, self.MESSAGE_ALIASES) | |
| # Parse timestamp | |
| if timestamp is not None: | |
| ts = self._parse_timestamp_value(timestamp) | |
| else: | |
| ts = self._base_time + (self._line_number * 0.001) | |
| # Normalize event type | |
| if event_type: | |
| event_type = self._normalize_event_type(str(event_type)) | |
| else: | |
| # Infer from message content | |
| event_type = self._infer_severity(str(message) if message else str(obj)) | |
| # Default component | |
| if not component: | |
| component = "system" | |
| else: | |
| component = str(component) | |
| # Build data dict | |
| data = self._flatten_to_data(obj, message) | |
| return ParsedEvent( | |
| timestamp=ts, | |
| event_type=event_type, | |
| component=component, | |
| data=data, | |
| raw_line=raw_line, | |
| ) | |
| def _find_field(self, obj: Any, aliases: set, depth: int = 0) -> Any: | |
| """ | |
| Recursively search for a field by any of its aliases. | |
| Returns the first match found (breadth-first within each level). | |
| """ | |
| if depth > 10: # Prevent infinite recursion | |
| return None | |
| if isinstance(obj, dict): | |
| # Check this level first | |
| for key in obj: | |
| if key.lower() in aliases: | |
| return obj[key] | |
| # Then recurse into nested dicts | |
| for key, value in obj.items(): | |
| if isinstance(value, dict): | |
| result = self._find_field(value, aliases, depth + 1) | |
| if result is not None: | |
| return result | |
| elif isinstance(value, list) and value and isinstance(value[0], dict): | |
| # Check first item of list of dicts | |
| result = self._find_field(value[0], aliases, depth + 1) | |
| if result is not None: | |
| return result | |
| return None | |
| def _flatten_to_data(self, obj: dict, message: Any = None) -> Dict[str, Any]: | |
| """ | |
| Flatten JSON object to data dict for visualization. | |
| Preserves important nested structures while making data accessible. | |
| """ | |
| data = {} | |
| # Set message | |
| if message is not None: | |
| if isinstance(message, dict): | |
| data.update(message) | |
| else: | |
| data["message"] = str(message) | |
| # Extract key fields at any level | |
| for key, value in obj.items(): | |
| key_lower = key.lower() | |
| # Skip already processed fields | |
| if key_lower in self.TIMESTAMP_ALIASES | self.MESSAGE_ALIASES: | |
| continue | |
| # Include scalar values directly | |
| if isinstance(value, (str, int, float, bool)) or value is None: | |
| data[key] = value | |
| # Include small dicts inline | |
| elif isinstance(value, dict) and len(value) <= 5: | |
| for k, v in value.items(): | |
| if isinstance(v, (str, int, float, bool)): | |
| data[f"{key}.{k}"] = v | |
| # Summarize large structures | |
| elif isinstance(value, dict): | |
| data[f"_{key}_keys"] = list(value.keys())[:10] | |
| elif isinstance(value, list): | |
| data[f"_{key}_count"] = len(value) | |
| return data | |
| def _parse_text(self, line: str) -> ParsedEvent: | |
| """ | |
| Parse unstructured text log line. | |
| Extracts timestamp, component, severity from text patterns. | |
| """ | |
| timestamp = None | |
| component = "system" | |
| remaining = line | |
| # Try to extract timestamp | |
| for pattern, fmt in self.TIMESTAMP_PATTERNS: | |
| match = re.search(pattern, line, re.IGNORECASE) | |
| if match: | |
| ts_str = match.group(1) | |
| timestamp = self._parse_timestamp_string(ts_str, fmt) | |
| # Remove timestamp from remaining text | |
| remaining = line[:match.start()] + line[match.end():] | |
| break | |
| if timestamp is None: | |
| timestamp = self._base_time + (self._line_number * 0.001) | |
| # Try to extract component | |
| SEVERITY_WORDS = {'error', 'err', 'warn', 'warning', 'info', 'debug', 'trace', 'fatal', 'critical'} | |
| for pattern in self.COMPONENT_PATTERNS: | |
| match = re.search(pattern, remaining) | |
| if match: | |
| candidate = match.group(1) | |
| # Don't treat severity keywords as components | |
| if candidate.lower() not in SEVERITY_WORDS: | |
| component = candidate | |
| remaining = remaining[match.end():].strip() | |
| break | |
| # Infer severity from text | |
| event_type = self._infer_severity(line) | |
| # Clean up message | |
| message = remaining.strip() | |
| if message.startswith(':'): | |
| message = message[1:].strip() | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=component, | |
| data={"message": message}, | |
| raw_line=line, | |
| ) | |
| def _parse_timestamp_value(self, value: Any) -> float: | |
| """Parse timestamp from any format.""" | |
| if isinstance(value, (int, float)): | |
| # Unix timestamp - check if milliseconds | |
| if value > 1e12: | |
| return value / 1000 | |
| return float(value) | |
| if isinstance(value, str): | |
| return self._parse_timestamp_string(value, 'auto') | |
| return self._base_time + (self._line_number * 0.001) | |
| def _parse_timestamp_string(self, ts_str: str, fmt: str) -> float: | |
| """Parse timestamp string to Unix timestamp.""" | |
| try: | |
| if fmt == 'unix' or (fmt == 'auto' and ts_str.replace('.', '').isdigit()): | |
| val = float(ts_str) | |
| return val / 1000 if val > 1e12 else val | |
| if fmt == 'iso' or fmt == 'auto': | |
| # ISO 8601 | |
| try: | |
| dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) | |
| return dt.timestamp() | |
| except: | |
| pass | |
| if fmt == 'apache': | |
| # Apache: 10/Oct/2000:13:55:36 +0700 | |
| dt = datetime.strptime(ts_str.split()[0], "%d/%b/%Y:%H:%M:%S") | |
| return dt.timestamp() | |
| if fmt == 'syslog': | |
| # Syslog: Oct 10 13:55:36 (no year) | |
| current_year = datetime.now().year | |
| dt = datetime.strptime(f"{current_year} {ts_str}", "%Y %b %d %H:%M:%S") | |
| return dt.timestamp() | |
| # Try common formats | |
| for date_fmt in [ | |
| "%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%d %H:%M:%S", | |
| "%Y/%m/%d %H:%M:%S", "%d-%m-%Y %H:%M:%S", | |
| ]: | |
| try: | |
| dt = datetime.strptime(ts_str.split('+')[0].split('Z')[0], date_fmt) | |
| return dt.timestamp() | |
| except: | |
| continue | |
| except: | |
| pass | |
| return self._base_time + (self._line_number * 0.001) | |
| def _normalize_event_type(self, event_type: str) -> str: | |
| """Normalize event type to standard values.""" | |
| et = event_type.lower().strip() | |
| # Map common variations | |
| if et in ('err', 'exception', 'fail', 'failed', 'failure', 'fatal', 'critical', 'emergency', 'panic'): | |
| return 'error' | |
| if et in ('warn', 'caution', 'alert'): | |
| return 'warning' | |
| if et in ('information', 'notice', 'log'): | |
| return 'info' | |
| if et in ('trace', 'verbose', 'fine', 'finer', 'finest'): | |
| return 'debug' | |
| if et in ('state_change', 'transition', 'change'): | |
| return 'state_change' | |
| if et in ('checkpoint', 'save', 'snapshot'): | |
| return 'checkpoint' | |
| if et in ('progress', 'step', 'iteration'): | |
| return 'progress' | |
| if et in ('config', 'configuration', 'setting', 'setup'): | |
| return 'config' | |
| if et in ('metric', 'measure', 'stat', 'stats'): | |
| return 'metric' | |
| if et in ('anomaly', 'outlier', 'unusual'): | |
| return 'anomaly' | |
| return et if et else 'info' | |
| def _infer_severity(self, text: str) -> str: | |
| """Infer severity/event type from text content.""" | |
| for severity, pattern in self.SEVERITY_PATTERNS.items(): | |
| if re.search(pattern, text, re.IGNORECASE): | |
| return severity | |
| return 'info' | |
| class JSONLAdapter(LogAdapter): | |
| """ | |
| Parse JSON Lines format (one JSON object per line). | |
| Expected fields (flexible): | |
| - timestamp/time/ts/@timestamp: Unix timestamp or ISO string | |
| - level/severity/type: Event type (info, error, warning, etc.) | |
| - component/service/logger/source: Which component | |
| - message/msg/data: Event data | |
| Also supports CASCADE's nested format: | |
| - {"event": {"timestamp": ..., "component": ..., "event_type": ...}, "metrics": {...}, "triage": {...}} | |
| """ | |
| name = "jsonl" | |
| description = "JSON Lines (structured logs)" | |
| TIMESTAMP_FIELDS = ["timestamp", "time", "ts", "@timestamp", "datetime", "date"] | |
| LEVEL_FIELDS = ["level", "severity", "type", "log_level", "loglevel", "event_type"] | |
| COMPONENT_FIELDS = ["component", "service", "logger", "source", "module", "name"] | |
| MESSAGE_FIELDS = ["message", "msg", "data", "content", "text", "body", "raw_message", "raw"] | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| if not line: | |
| return None | |
| try: | |
| obj = json.loads(line) | |
| except json.JSONDecodeError: | |
| return None | |
| # Check for CASCADE nested format: {"event": {...}, "metrics": {...}, "triage": {...}} | |
| if "event" in obj and isinstance(obj["event"], dict): | |
| return self._parse_cascade_format(obj) | |
| # Standard JSONL parsing | |
| return self._parse_standard_format(obj, line) | |
| def _parse_cascade_format(self, obj: dict) -> Optional[ParsedEvent]: | |
| """Parse CASCADE's nested tape format.""" | |
| evt = obj["event"] | |
| # Direct field extraction from CASCADE format | |
| timestamp = evt.get("timestamp", time.time()) | |
| event_type = evt.get("event_type", "info") | |
| component = evt.get("component", "system") | |
| # Build data from CASCADE event | |
| data = {} | |
| # Get raw message | |
| if "raw" in evt: | |
| data["message"] = evt["raw"] | |
| elif "data" in evt and isinstance(evt["data"], dict): | |
| if "raw_message" in evt["data"]: | |
| data["message"] = evt["data"]["raw_message"] | |
| data.update(evt["data"]) | |
| # Include event_id if present | |
| if "event_id" in evt: | |
| data["event_id"] = evt["event_id"] | |
| # Include metrics summary if present (for visualization) | |
| if "metrics" in obj and isinstance(obj["metrics"], dict): | |
| metrics = obj["metrics"] | |
| if "event_count" in metrics: | |
| data["_event_count"] = metrics["event_count"] | |
| if "health_status" in metrics: | |
| data["_health"] = metrics["health_status"].get("overall", "unknown") | |
| # Include triage status if present | |
| if "triage" in obj and isinstance(obj["triage"], dict): | |
| triage = obj["triage"] | |
| data["_triage_status"] = triage.get("status", "UNKNOWN") | |
| data["_triage_action"] = triage.get("action", "") | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=component, | |
| data=data, | |
| raw_line=json.dumps(obj), | |
| ) | |
| def _parse_standard_format(self, obj: dict, line: str) -> Optional[ParsedEvent]: | |
| """Parse standard JSONL format.""" | |
| # Extract timestamp | |
| timestamp = None | |
| for field in self.TIMESTAMP_FIELDS: | |
| if field in obj: | |
| timestamp = self._parse_timestamp(obj[field]) | |
| break | |
| if timestamp is None: | |
| timestamp = time.time() | |
| # Extract event type | |
| event_type = "info" | |
| for field in self.LEVEL_FIELDS: | |
| if field in obj: | |
| event_type = str(obj[field]).lower() | |
| break | |
| # Extract component | |
| component = "system" | |
| for field in self.COMPONENT_FIELDS: | |
| if field in obj: | |
| component = str(obj[field]) | |
| break | |
| # Extract message/data | |
| data = {} | |
| for field in self.MESSAGE_FIELDS: | |
| if field in obj: | |
| msg = obj[field] | |
| if isinstance(msg, dict): | |
| data.update(msg) | |
| else: | |
| data["message"] = str(msg) | |
| break | |
| # Include all other fields in data | |
| for k, v in obj.items(): | |
| if k not in self.TIMESTAMP_FIELDS + self.LEVEL_FIELDS + self.COMPONENT_FIELDS + self.MESSAGE_FIELDS: | |
| data[k] = v | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=component, | |
| data=data, | |
| raw_line=line, | |
| ) | |
| def _parse_timestamp(self, value: Any) -> float: | |
| """Parse various timestamp formats to Unix timestamp.""" | |
| if isinstance(value, (int, float)): | |
| # Already numeric - check if milliseconds | |
| if value > 1e12: | |
| return value / 1000 | |
| return value | |
| if isinstance(value, str): | |
| # Try ISO format | |
| try: | |
| dt = datetime.fromisoformat(value.replace("Z", "+00:00")) | |
| return dt.timestamp() | |
| except: | |
| pass | |
| # Try common formats | |
| for fmt in [ | |
| "%Y-%m-%dT%H:%M:%S.%f", | |
| "%Y-%m-%dT%H:%M:%S", | |
| "%Y-%m-%d %H:%M:%S.%f", | |
| "%Y-%m-%d %H:%M:%S", | |
| "%d/%b/%Y:%H:%M:%S", | |
| ]: | |
| try: | |
| dt = datetime.strptime(value.split("+")[0].split("-")[0] if "+" in value else value, fmt) | |
| return dt.timestamp() | |
| except: | |
| continue | |
| return time.time() | |
| class ApacheLogAdapter(LogAdapter): | |
| """ | |
| Parse Apache Combined Log Format. | |
| Format: %h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-agent}i" | |
| Example: 127.0.0.1 - - [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08 [en] (Win98; I ;Nav)" | |
| """ | |
| name = "apache" | |
| description = "Apache Combined Log Format" | |
| # Regex for Apache combined format | |
| PATTERN = re.compile( | |
| r'^(?P<ip>[\d\.]+)\s+' | |
| r'(?P<ident>\S+)\s+' | |
| r'(?P<user>\S+)\s+' | |
| r'\[(?P<timestamp>[^\]]+)\]\s+' | |
| r'"(?P<method>\w+)\s+(?P<path>\S+)\s+(?P<protocol>[^"]+)"\s+' | |
| r'(?P<status>\d+)\s+' | |
| r'(?P<size>\S+)' | |
| r'(?:\s+"(?P<referer>[^"]*)"\s+"(?P<useragent>[^"]*)")?' | |
| ) | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| if not line: | |
| return None | |
| match = self.PATTERN.match(line) | |
| if not match: | |
| return None | |
| d = match.groupdict() | |
| # Parse timestamp [10/Oct/2000:13:55:36 -0700] | |
| try: | |
| ts_str = d["timestamp"].split()[0] # Remove timezone | |
| dt = datetime.strptime(ts_str, "%d/%b/%Y:%H:%M:%S") | |
| timestamp = dt.timestamp() | |
| except: | |
| timestamp = time.time() | |
| # Determine event type by status code | |
| status = int(d.get("status", 200)) | |
| if status >= 500: | |
| event_type = "error" | |
| elif status >= 400: | |
| event_type = "warning" | |
| elif status >= 300: | |
| event_type = "redirect" | |
| else: | |
| event_type = "request" | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=f"http:{d.get('method', 'GET')}", | |
| data={ | |
| "ip": d.get("ip"), | |
| "method": d.get("method"), | |
| "path": d.get("path"), | |
| "status": status, | |
| "size": int(d.get("size", 0)) if d.get("size", "-") != "-" else 0, | |
| "referer": d.get("referer", ""), | |
| "user_agent": d.get("useragent", ""), | |
| }, | |
| raw_line=line, | |
| ) | |
| class NginxLogAdapter(ApacheLogAdapter): | |
| """ | |
| Parse Nginx access logs (same format as Apache combined by default). | |
| """ | |
| name = "nginx" | |
| description = "Nginx Access Log Format" | |
| class KubernetesLogAdapter(LogAdapter): | |
| """ | |
| Parse Kubernetes events and pod logs. | |
| Handles: | |
| - kubectl get events output | |
| - Pod log format: timestamp stdout/stderr F message | |
| - JSON structured logs from pods | |
| """ | |
| name = "kubernetes" | |
| description = "Kubernetes Events & Pod Logs" | |
| # Pod log pattern: 2024-01-01T00:00:00.000000000Z stdout F message | |
| POD_LOG_PATTERN = re.compile( | |
| r'^(?P<timestamp>\d{4}-\d{2}-\d{2}T[\d:.]+Z?)\s+' | |
| r'(?P<stream>stdout|stderr)\s+' | |
| r'(?P<flag>\S+)\s+' | |
| r'(?P<message>.*)$' | |
| ) | |
| # Kubectl events pattern | |
| EVENT_PATTERN = re.compile( | |
| r'^(?P<last_seen>\S+)\s+' | |
| r'(?P<type>\S+)\s+' | |
| r'(?P<reason>\S+)\s+' | |
| r'(?P<object>\S+)\s+' | |
| r'(?P<message>.*)$' | |
| ) | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| if not line: | |
| return None | |
| # Try JSON first (structured pod logs) | |
| if line.startswith("{"): | |
| jsonl = JSONLAdapter() | |
| result = jsonl.parse_line(line) | |
| if result: | |
| result.component = f"k8s:{result.component}" | |
| return result | |
| # Try pod log format | |
| match = self.POD_LOG_PATTERN.match(line) | |
| if match: | |
| d = match.groupdict() | |
| try: | |
| dt = datetime.fromisoformat(d["timestamp"].replace("Z", "+00:00")) | |
| timestamp = dt.timestamp() | |
| except: | |
| timestamp = time.time() | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type="error" if d["stream"] == "stderr" else "log", | |
| component=f"k8s:pod", | |
| data={ | |
| "stream": d["stream"], | |
| "message": d["message"], | |
| }, | |
| raw_line=line, | |
| ) | |
| # Try kubectl events format | |
| match = self.EVENT_PATTERN.match(line) | |
| if match: | |
| d = match.groupdict() | |
| event_type = d.get("type", "Normal").lower() | |
| if event_type == "warning": | |
| event_type = "warning" | |
| elif event_type == "normal": | |
| event_type = "info" | |
| return ParsedEvent( | |
| timestamp=time.time(), # Events don't have exact timestamp in this format | |
| event_type=event_type, | |
| component=f"k8s:{d.get('object', 'unknown').split('/')[0]}", | |
| data={ | |
| "reason": d.get("reason"), | |
| "object": d.get("object"), | |
| "message": d.get("message"), | |
| }, | |
| raw_line=line, | |
| ) | |
| return None | |
| class GenericLogAdapter(LogAdapter): | |
| """ | |
| Parse generic timestamped logs. | |
| Attempts to extract: | |
| - Timestamp from beginning of line | |
| - Log level (INFO, ERROR, WARN, DEBUG, etc.) | |
| - Component name (often in brackets) | |
| - Message | |
| """ | |
| name = "generic" | |
| description = "Generic Timestamped Logs" | |
| # Common timestamp patterns at start of line | |
| TIMESTAMP_PATTERNS = [ | |
| (re.compile(r'^(\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?)'), "%Y-%m-%dT%H:%M:%S"), | |
| (re.compile(r'^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})'), "%Y/%m/%d %H:%M:%S"), | |
| (re.compile(r'^(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})'), None), # Syslog | |
| (re.compile(r'^(\d{10,13})'), None), # Unix timestamp | |
| ] | |
| # Level patterns | |
| LEVEL_PATTERN = re.compile(r'\b(DEBUG|INFO|NOTICE|WARN(?:ING)?|ERROR|CRIT(?:ICAL)?|FATAL|SEVERE|TRACE)\b', re.I) | |
| # Component patterns (in brackets or before colon) | |
| COMPONENT_PATTERN = re.compile(r'\[([^\]]+)\]|^[^:]+:\s*(\S+):') | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| if not line: | |
| return None | |
| timestamp = time.time() | |
| remaining = line | |
| # Extract timestamp | |
| for pattern, fmt in self.TIMESTAMP_PATTERNS: | |
| match = pattern.match(line) | |
| if match: | |
| ts_str = match.group(1) | |
| try: | |
| if fmt: | |
| dt = datetime.strptime(ts_str.split(".")[0].replace("T", " "), fmt.replace("T", " ")) | |
| timestamp = dt.timestamp() | |
| elif ts_str.isdigit(): | |
| ts = int(ts_str) | |
| timestamp = ts / 1000 if ts > 1e12 else ts | |
| except: | |
| pass | |
| remaining = line[match.end():].strip() | |
| break | |
| # Extract level | |
| event_type = "info" | |
| level_match = self.LEVEL_PATTERN.search(remaining) | |
| if level_match: | |
| level = level_match.group(1).upper() | |
| if level in ("ERROR", "CRITICAL", "CRIT", "FATAL", "SEVERE"): | |
| event_type = "error" | |
| elif level in ("WARN", "WARNING"): | |
| event_type = "warning" | |
| elif level == "DEBUG": | |
| event_type = "debug" | |
| elif level == "TRACE": | |
| event_type = "trace" | |
| # Extract component | |
| component = "system" | |
| comp_match = self.COMPONENT_PATTERN.search(remaining) | |
| if comp_match: | |
| component = comp_match.group(1) or comp_match.group(2) or "system" | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=component, | |
| data={"message": remaining}, | |
| raw_line=line, | |
| ) | |
| class RegexAdapter(LogAdapter): | |
| r""" | |
| Parse logs using a custom regex pattern. | |
| The regex should have named groups: | |
| - timestamp (optional): Timestamp string | |
| - type/level (optional): Event type | |
| - component (optional): Component name | |
| - message (optional): Message or data | |
| Example pattern: | |
| r'^(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (?P<level>\w+) (?P<component>\S+) (?P<message>.*)$' | |
| """ | |
| name = "regex" | |
| description = "Custom Regex Pattern" | |
| def __init__(self, pattern: str, timestamp_format: str = None): | |
| """ | |
| Args: | |
| pattern: Regex pattern with named groups | |
| timestamp_format: strptime format for timestamp group | |
| """ | |
| super().__init__() | |
| self.pattern = re.compile(pattern) | |
| self.timestamp_format = timestamp_format | |
| def parse_line(self, line: str) -> Optional[ParsedEvent]: | |
| if not line: | |
| return None | |
| match = self.pattern.match(line) | |
| if not match: | |
| return None | |
| d = match.groupdict() | |
| # Parse timestamp | |
| timestamp = time.time() | |
| if "timestamp" in d and d["timestamp"]: | |
| if self.timestamp_format: | |
| try: | |
| dt = datetime.strptime(d["timestamp"], self.timestamp_format) | |
| timestamp = dt.timestamp() | |
| except: | |
| pass | |
| else: | |
| # Try to parse automatically | |
| try: | |
| dt = datetime.fromisoformat(d["timestamp"]) | |
| timestamp = dt.timestamp() | |
| except: | |
| pass | |
| # Event type | |
| event_type = d.get("type") or d.get("level") or "info" | |
| event_type = event_type.lower() | |
| # Component | |
| component = d.get("component") or d.get("source") or "system" | |
| # Message/data | |
| message = d.get("message") or d.get("data") or "" | |
| data = {"message": message} | |
| # Include any other captured groups | |
| for k, v in d.items(): | |
| if k not in ("timestamp", "type", "level", "component", "source", "message", "data") and v: | |
| data[k] = v | |
| return ParsedEvent( | |
| timestamp=timestamp, | |
| event_type=event_type, | |
| component=component, | |
| data=data, | |
| raw_line=line, | |
| ) | |
| def auto_detect_adapter(sample_lines: List[str]) -> LogAdapter: | |
| """ | |
| Auto-detect the best adapter for a set of sample lines. | |
| In practice, UniversalAdapter handles everything. | |
| This function exists for backwards compatibility and edge cases. | |
| Args: | |
| sample_lines: First N lines of the log file | |
| Returns: | |
| Most suitable LogAdapter instance (usually UniversalAdapter) | |
| """ | |
| # UniversalAdapter handles everything - it's the future-proof choice | |
| return UniversalAdapter() | |
| def detect_log_format(filepath: str) -> str: | |
| """ | |
| Detect log format from file. | |
| Returns adapter name: 'universal', 'jsonl', 'apache', 'nginx', 'kubernetes', 'generic' | |
| """ | |
| try: | |
| with open(filepath, "r", encoding="utf-8", errors="ignore") as f: | |
| lines = [f.readline() for _ in range(20)] | |
| # Quick heuristic for reporting purposes | |
| samples = [l.strip() for l in lines if l.strip()][:5] | |
| if not samples: | |
| return "universal" | |
| # Check if JSON | |
| json_count = sum(1 for s in samples if s.startswith('{')) | |
| if json_count >= len(samples) // 2: | |
| return "jsonl" | |
| # Check for Apache format | |
| if any(re.search(r'\[\d{2}/\w{3}/\d{4}:', s) for s in samples): | |
| return "apache" | |
| # Check for Kubernetes | |
| if any('"kind"' in s or 'namespace' in s.lower() for s in samples): | |
| return "kubernetes" | |
| return "universal" | |
| except: | |
| return "universal" | |
| def detect_data_type(lines: List[str]) -> Dict[str, Any]: | |
| """ | |
| Detect whether data looks like logs vs a dataset. | |
| Returns: | |
| { | |
| "type": "logs" | "dataset" | "mixed" | "unknown", | |
| "confidence": 0.0-1.0, | |
| "signals": ["what made us think this"], | |
| "recommendation": "Use Log Observatory" | "Use Dataset Forensics" | |
| } | |
| """ | |
| if not lines: | |
| return {"type": "unknown", "confidence": 0.0, "signals": [], "recommendation": "No data"} | |
| # Sample up to 100 lines | |
| samples = [l.strip() for l in lines[:100] if l.strip()] | |
| if not samples: | |
| return {"type": "unknown", "confidence": 0.0, "signals": [], "recommendation": "No data"} | |
| log_signals = [] | |
| dataset_signals = [] | |
| # === LOG SIGNALS === | |
| # Timestamps in log format | |
| timestamp_patterns = [ | |
| r'\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}', # ISO | |
| r'\[\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2}', # Apache | |
| r'\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}', # Syslog | |
| ] | |
| timestamp_matches = sum(1 for s in samples | |
| for p in timestamp_patterns | |
| if re.search(p, s)) | |
| if timestamp_matches > len(samples) * 0.3: | |
| log_signals.append(f"timestamps_found:{timestamp_matches}") | |
| # Log level indicators | |
| log_levels = r'\b(DEBUG|INFO|WARN|WARNING|ERROR|CRITICAL|FATAL|TRACE)\b' | |
| level_matches = sum(1 for s in samples if re.search(log_levels, s, re.IGNORECASE)) | |
| if level_matches > len(samples) * 0.2: | |
| log_signals.append(f"log_levels:{level_matches}") | |
| # Component patterns [component] or component: | |
| component_pattern = r'(\[[\w.-]+\]|^[\w.-]+:)' | |
| component_matches = sum(1 for s in samples if re.search(component_pattern, s)) | |
| if component_matches > len(samples) * 0.2: | |
| log_signals.append(f"components:{component_matches}") | |
| # JSON with event-like keys | |
| event_keys = ['timestamp', 'time', 'ts', 'level', 'message', 'msg', 'component', | |
| 'event', 'event_type', 'severity', 'logger', 'source'] | |
| json_event_count = 0 | |
| for s in samples: | |
| if s.startswith('{'): | |
| try: | |
| obj = json.loads(s) | |
| if any(k in obj for k in event_keys): | |
| json_event_count += 1 | |
| except: | |
| pass | |
| if json_event_count > len(samples) * 0.3: | |
| log_signals.append(f"json_events:{json_event_count}") | |
| # HTTP methods/status codes | |
| http_pattern = r'\b(GET|POST|PUT|DELETE|PATCH)\b|\s[1-5]\d{2}\s' | |
| http_matches = sum(1 for s in samples if re.search(http_pattern, s)) | |
| if http_matches > len(samples) * 0.1: | |
| log_signals.append(f"http_traffic:{http_matches}") | |
| # === DATASET SIGNALS === | |
| # CSV-like structure (consistent columns) | |
| if samples: | |
| first = samples[0] | |
| if ',' in first: | |
| comma_counts = [s.count(',') for s in samples[:10]] | |
| if len(set(comma_counts)) <= 2: # Consistent column count | |
| dataset_signals.append(f"csv_structure:cols={comma_counts[0]+1}") | |
| # JSON with data-like keys (not event keys) | |
| data_keys = ['id', 'name', 'title', 'text', 'content', 'label', 'category', | |
| 'value', 'price', 'count', 'score', 'rating', 'description', | |
| 'user', 'author', 'url', 'image', 'date', 'created'] | |
| json_data_count = 0 | |
| for s in samples: | |
| if s.startswith('{'): | |
| try: | |
| obj = json.loads(s) | |
| # Data if has data keys but NOT event keys | |
| has_data = any(k in obj for k in data_keys) | |
| has_event = any(k in obj for k in event_keys) | |
| if has_data and not has_event: | |
| json_data_count += 1 | |
| except: | |
| pass | |
| if json_data_count > len(samples) * 0.3: | |
| dataset_signals.append(f"json_data:{json_data_count}") | |
| # Long text content (datasets often have text fields) | |
| long_text_count = sum(1 for s in samples if len(s) > 500) | |
| if long_text_count > len(samples) * 0.2: | |
| dataset_signals.append(f"long_text:{long_text_count}") | |
| # Numeric-heavy (datasets often have numbers) | |
| numeric_heavy = sum(1 for s in samples if len(re.findall(r'\d+\.?\d*', s)) > 5) | |
| if numeric_heavy > len(samples) * 0.5: | |
| dataset_signals.append(f"numeric_data:{numeric_heavy}") | |
| # === DECISION === | |
| log_score = len(log_signals) | |
| data_score = len(dataset_signals) | |
| total = log_score + data_score | |
| if total == 0: | |
| return { | |
| "type": "unknown", | |
| "confidence": 0.3, | |
| "signals": ["no clear signals"], | |
| "recommendation": "Try both - Logs tab and Dataset tab" | |
| } | |
| if log_score > data_score * 2: | |
| return { | |
| "type": "logs", | |
| "confidence": min(log_score / 5, 1.0), | |
| "signals": log_signals, | |
| "recommendation": "Use Log Observatory (Observe tab)" | |
| } | |
| elif data_score > log_score * 2: | |
| return { | |
| "type": "dataset", | |
| "confidence": min(data_score / 4, 1.0), | |
| "signals": dataset_signals, | |
| "recommendation": "Use Dataset Forensics (Extract Ghost Log)" | |
| } | |
| else: | |
| return { | |
| "type": "mixed", | |
| "confidence": 0.5, | |
| "signals": log_signals + dataset_signals, | |
| "recommendation": "Data has both log and dataset characteristics - try Logs first" | |
| } | |