Spaces:
Configuration error
Configuration error
| """ | |
| CASCADE Forensics - Main Analyzer | |
| The data remembers. This module reads those memories. | |
| Generates: | |
| - GHOST LOG: Inferred sequence of operations | |
| - SKELETON: Probable system architecture | |
| - DNA: Technology fingerprints | |
| - SOUL: Behavioral predictions | |
| """ | |
| import hashlib | |
| import json | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import List, Dict, Any, Optional | |
| from collections import OrderedDict | |
| from cascade.forensics.artifacts import ( | |
| Artifact, ArtifactDetector, | |
| TimestampArtifacts, IDPatternArtifacts, TextArtifacts, | |
| NumericArtifacts, NullPatternArtifacts, SchemaArtifacts, | |
| ) | |
| from cascade.forensics.fingerprints import TechFingerprinter, Fingerprint | |
| class InferredOperation: | |
| """A single inferred operation from the ghost log.""" | |
| sequence: int | |
| operation: str | |
| description: str | |
| confidence: float | |
| evidence: List[str] = field(default_factory=list) | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "seq": self.sequence, | |
| "op": self.operation, | |
| "desc": self.description, | |
| "confidence": self.confidence, | |
| "evidence": self.evidence, | |
| } | |
| class GhostLog: | |
| """ | |
| Inferred processing history - the ghost of the system. | |
| This is a reconstruction of what PROBABLY happened | |
| based on artifacts left in the data. | |
| """ | |
| operations: List[InferredOperation] = field(default_factory=list) | |
| # Provenance | |
| analysis_timestamp: float = field(default_factory=time.time) | |
| data_hash: str = "" | |
| ghost_hash: str = "" | |
| def add_operation(self, op: str, desc: str, confidence: float, evidence: List[str] = None): | |
| """Add an inferred operation to the ghost log.""" | |
| self.operations.append(InferredOperation( | |
| sequence=len(self.operations) + 1, | |
| operation=op, | |
| description=desc, | |
| confidence=confidence, | |
| evidence=evidence or [], | |
| )) | |
| def finalize(self) -> str: | |
| """Compute hash of the ghost log for provenance.""" | |
| content = json.dumps([op.to_dict() for op in self.operations], sort_keys=True) | |
| self.ghost_hash = hashlib.sha256(content.encode()).hexdigest()[:16] | |
| return self.ghost_hash | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "operations": [op.to_dict() for op in self.operations], | |
| "analysis_timestamp": self.analysis_timestamp, | |
| "data_hash": self.data_hash, | |
| "ghost_hash": self.ghost_hash, | |
| } | |
| def to_narrative(self) -> str: | |
| """Generate human-readable narrative of inferred processing.""" | |
| if not self.operations: | |
| return "No processing artifacts detected." | |
| lines = ["## Ghost Log - Inferred Processing History\n"] | |
| lines.append("*Based on artifacts left in the data, this is what probably happened:*\n") | |
| for op in self.operations: | |
| conf_str = "●" * int(op.confidence * 5) + "○" * (5 - int(op.confidence * 5)) | |
| lines.append(f"**{op.sequence}. {op.operation}** [{conf_str}]") | |
| lines.append(f" {op.description}") | |
| if op.evidence: | |
| lines.append(f" *Evidence: {', '.join(op.evidence[:3])}*") | |
| lines.append("") | |
| return "\n".join(lines) | |
| class ForensicsReport: | |
| """Complete forensics analysis report.""" | |
| # Artifacts detected | |
| artifacts: List[Artifact] = field(default_factory=list) | |
| # Inferred processing | |
| ghost_log: GhostLog = field(default_factory=GhostLog) | |
| # Technology fingerprints | |
| fingerprints: List[Fingerprint] = field(default_factory=list) | |
| # Synthesized architecture | |
| likely_stack: Dict[str, Any] = field(default_factory=dict) | |
| # Security concerns | |
| security_concerns: List[Dict[str, Any]] = field(default_factory=list) | |
| # Metadata | |
| analysis_timestamp: float = field(default_factory=time.time) | |
| row_count: int = 0 | |
| column_count: int = 0 | |
| data_hash: str = "" | |
| def to_dict(self) -> Dict[str, Any]: | |
| return { | |
| "artifacts": [a.to_dict() for a in self.artifacts], | |
| "ghost_log": self.ghost_log.to_dict(), | |
| "fingerprints": [f.to_dict() for f in self.fingerprints], | |
| "likely_stack": self.likely_stack, | |
| "security_concerns": self.security_concerns, | |
| "metadata": { | |
| "timestamp": self.analysis_timestamp, | |
| "rows": self.row_count, | |
| "columns": self.column_count, | |
| "data_hash": self.data_hash, | |
| } | |
| } | |
| def summary(self) -> Dict[str, Any]: | |
| """Generate summary for display.""" | |
| return { | |
| "artifacts_found": len(self.artifacts), | |
| "operations_inferred": len(self.ghost_log.operations), | |
| "technologies_identified": len(self.fingerprints), | |
| "security_concerns": len(self.security_concerns), | |
| "top_fingerprints": [f.technology for f in self.fingerprints[:5]], | |
| "data_hash": self.data_hash, | |
| "ghost_hash": self.ghost_log.ghost_hash, | |
| } | |
| class DataForensics: | |
| """ | |
| Main forensics analyzer. | |
| Usage: | |
| forensics = DataForensics() | |
| report = forensics.analyze(df) | |
| print(report.ghost_log.to_narrative()) | |
| print(report.likely_stack) | |
| """ | |
| def __init__(self): | |
| self.detectors = [ | |
| TimestampArtifacts(), | |
| IDPatternArtifacts(), | |
| TextArtifacts(), | |
| NumericArtifacts(), | |
| NullPatternArtifacts(), | |
| SchemaArtifacts(), | |
| ] | |
| self.fingerprinter = TechFingerprinter() | |
| def analyze(self, df) -> ForensicsReport: | |
| """ | |
| Analyze a dataframe for processing artifacts. | |
| Args: | |
| df: Pandas DataFrame to analyze | |
| Returns: | |
| ForensicsReport with all findings | |
| """ | |
| report = ForensicsReport() | |
| report.row_count = len(df) | |
| report.column_count = len(df.columns) | |
| # Compute data hash | |
| try: | |
| # Sample hash for large datasets | |
| if len(df) > 10000: | |
| sample = df.sample(10000, random_state=42) | |
| else: | |
| sample = df | |
| content = sample.to_json() | |
| report.data_hash = hashlib.sha256(content.encode()).hexdigest()[:16] | |
| except: | |
| report.data_hash = "unknown" | |
| # Run all detectors | |
| all_artifacts = [] | |
| for detector in self.detectors: | |
| try: | |
| # Some detectors analyze all columns at once | |
| if hasattr(detector, 'detect_all'): | |
| artifacts = detector.detect_all(df) | |
| all_artifacts.extend(artifacts) | |
| # Column-by-column analysis | |
| for col in df.columns: | |
| artifacts = detector.detect(df, col) | |
| all_artifacts.extend(artifacts) | |
| except Exception as e: | |
| # Don't let one detector crash the whole analysis | |
| pass | |
| report.artifacts = all_artifacts | |
| # Build ghost log from artifacts | |
| report.ghost_log = self._build_ghost_log(all_artifacts, df) | |
| report.ghost_log.data_hash = report.data_hash | |
| report.ghost_log.finalize() | |
| # Generate technology fingerprints | |
| report.fingerprints = self.fingerprinter.analyze(all_artifacts) | |
| report.likely_stack = self.fingerprinter.get_likely_stack() | |
| report.security_concerns = self.fingerprinter.get_security_concerns() | |
| return report | |
| def _build_ghost_log(self, artifacts: List[Artifact], df) -> GhostLog: | |
| """ | |
| Build inferred processing history from artifacts. | |
| This is where we reconstruct the sequence of operations | |
| that probably created this data. | |
| """ | |
| ghost = GhostLog() | |
| # Group artifacts by type for logical ordering | |
| by_type = {} | |
| for a in artifacts: | |
| if a.artifact_type not in by_type: | |
| by_type[a.artifact_type] = [] | |
| by_type[a.artifact_type].append(a) | |
| # Infer operations in logical order | |
| # 1. Data sourcing (schema artifacts come first) | |
| if "framework_fingerprint" in by_type: | |
| for a in by_type["framework_fingerprint"]: | |
| ghost.add_operation( | |
| "DATA_SOURCE", | |
| f"Data originated from {a.details.get('framework', 'database')}: {a.evidence}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "naming_convention" in by_type: | |
| for a in by_type["naming_convention"]: | |
| ghost.add_operation( | |
| "SCHEMA_ORIGIN", | |
| f"Schema follows {a.details.get('convention', 'unknown')} convention", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| # 2. Merging (if multiple sources detected) | |
| if "mixed_conventions" in by_type or "id_prefix" in by_type: | |
| ghost.add_operation( | |
| "DATA_MERGE", | |
| "Multiple data sources were merged together", | |
| 0.75, | |
| [a.evidence for a in by_type.get("mixed_conventions", []) + by_type.get("id_prefix", [])] | |
| ) | |
| # 3. ID generation | |
| if "uuid_version" in by_type: | |
| for a in by_type["uuid_version"]: | |
| ghost.add_operation( | |
| "ID_GENERATION", | |
| f"IDs generated using {a.details.get('meaning', 'UUID')}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "hash_id" in by_type: | |
| for a in by_type["hash_id"]: | |
| ghost.add_operation( | |
| "ID_GENERATION", | |
| f"IDs are {a.details.get('probable_algorithm', 'hash')}-based (content-addressed)", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| # 4. Processing / Transformation | |
| if "case_normalization" in by_type: | |
| for a in by_type["case_normalization"]: | |
| ghost.add_operation( | |
| "TEXT_NORMALIZATION", | |
| f"Text converted to {a.details.get('case', 'normalized')} case", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "whitespace_trimming" in by_type: | |
| ghost.add_operation( | |
| "TEXT_CLEANING", | |
| "Whitespace trimmed from text fields", | |
| 0.70, | |
| [a.evidence for a in by_type["whitespace_trimming"]] | |
| ) | |
| if "truncation" in by_type: | |
| for a in by_type["truncation"]: | |
| ghost.add_operation( | |
| "FIELD_TRUNCATION", | |
| f"Text truncated at {a.details.get('max_length', '?')} characters", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "numeric_rounding" in by_type: | |
| for a in by_type["numeric_rounding"]: | |
| ghost.add_operation( | |
| "NUMERIC_ROUNDING", | |
| f"Numbers rounded: {a.evidence}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| # 5. Filtering / Deletion | |
| if "sequential_id_gaps" in by_type: | |
| for a in by_type["sequential_id_gaps"]: | |
| gap_ratio = a.details.get('gap_ratio', 0) | |
| ghost.add_operation( | |
| "RECORD_FILTERING", | |
| f"~{gap_ratio*100:.0f}% of records were filtered or deleted", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "hard_cutoff" in by_type: | |
| for a in by_type["hard_cutoff"]: | |
| ghost.add_operation( | |
| "VALUE_CAPPING", | |
| f"Values capped at {a.details.get('cutoff', '?')}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| # 6. Batch processing patterns | |
| if "timestamp_rounding" in by_type: | |
| for a in by_type["timestamp_rounding"]: | |
| ghost.add_operation( | |
| "BATCH_PROCESSING", | |
| f"Data processed in batches: {a.evidence}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "regular_intervals" in by_type: | |
| for a in by_type["regular_intervals"]: | |
| ghost.add_operation( | |
| "SCHEDULED_JOB", | |
| f"Regular processing schedule detected: {a.details.get('interval_desc', 'unknown')}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "temporal_clustering" in by_type: | |
| ghost.add_operation( | |
| "BURST_PROCESSING", | |
| "Event-driven or burst batch processing detected", | |
| 0.75, | |
| [a.evidence for a in by_type["temporal_clustering"]] | |
| ) | |
| # 7. Data quality issues | |
| if "encoding_artifact" in by_type: | |
| for a in by_type["encoding_artifact"]: | |
| ghost.add_operation( | |
| "ENCODING_ERROR", | |
| f"Character encoding conversion failed: {a.evidence}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "sentinel_value" in by_type: | |
| for a in by_type["sentinel_value"]: | |
| ghost.add_operation( | |
| "NULL_HANDLING", | |
| f"NULLs represented as sentinel value {a.details.get('sentinel', '?')}", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| if "high_null_rate" in by_type: | |
| for a in by_type["high_null_rate"]: | |
| ghost.add_operation( | |
| "OPTIONAL_FIELD", | |
| f"Column {a.column} is optional or had ETL issues ({a.details.get('null_rate', 0)*100:.0f}% null)", | |
| a.confidence, | |
| [a.evidence] | |
| ) | |
| # 8. Export (often the last step) | |
| if any("PANDAS" in a.inferred_operation for a in artifacts): | |
| ghost.add_operation( | |
| "DATA_EXPORT", | |
| "Data exported via Pandas to CSV", | |
| 0.90, | |
| ["Unnamed column artifact"] | |
| ) | |
| return ghost | |
| def analyze_file(self, filepath: str) -> ForensicsReport: | |
| """ | |
| Analyze a data file. | |
| Supports: CSV, JSON, JSONL, Parquet, Excel | |
| """ | |
| import pandas as pd | |
| from pathlib import Path | |
| path = Path(filepath) | |
| suffix = path.suffix.lower() | |
| if suffix == '.csv': | |
| df = pd.read_csv(filepath) | |
| elif suffix == '.json': | |
| df = pd.read_json(filepath) | |
| elif suffix == '.jsonl': | |
| df = pd.read_json(filepath, lines=True) | |
| elif suffix == '.parquet': | |
| df = pd.read_parquet(filepath) | |
| elif suffix in ['.xlsx', '.xls']: | |
| df = pd.read_excel(filepath) | |
| else: | |
| # Try CSV as default | |
| df = pd.read_csv(filepath) | |
| return self.analyze(df) | |
| def analyze_dataframe(df) -> ForensicsReport: | |
| """Convenience function to analyze a dataframe.""" | |
| forensics = DataForensics() | |
| return forensics.analyze(df) | |
| def analyze_file(filepath: str) -> ForensicsReport: | |
| """Convenience function to analyze a file.""" | |
| forensics = DataForensics() | |
| return forensics.analyze_file(filepath) | |