Spaces:
Configuration error
Configuration error
| """ | |
| CASCADE Diagnostic Report - Generate comprehensive diagnostic reports. | |
| Combines: | |
| - CodeTracer execution traces | |
| - BugDetector static analysis | |
| - ExecutionMonitor runtime anomalies | |
| - GhostLog forensic reconstruction | |
| Into a unified diagnostic report. | |
| """ | |
| import time | |
| import json | |
| import hashlib | |
| from typing import Any, Dict, List, Optional, Set | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from datetime import datetime | |
| from cascade.core.graph import CausationGraph | |
| from cascade.forensics.analyzer import DataForensics, GhostLog | |
| class DiagnosticFinding: | |
| """A single diagnostic finding.""" | |
| finding_id: str | |
| category: str # "static", "runtime", "forensic", "trace" | |
| severity: str # "critical", "error", "warning", "info" | |
| title: str | |
| description: str | |
| location: Optional[Dict[str, Any]] = None # file, line, function | |
| evidence: List[str] = field(default_factory=list) | |
| related_findings: List[str] = field(default_factory=list) | |
| suggestions: List[str] = field(default_factory=list) | |
| confidence: float = 1.0 | |
| timestamp: float = field(default_factory=time.time) | |
| class DiagnosticReport: | |
| """ | |
| A comprehensive diagnostic report. | |
| Aggregates findings from multiple sources: | |
| - Static analysis (BugDetector) | |
| - Runtime monitoring (ExecutionMonitor) | |
| - Execution tracing (CodeTracer) | |
| - Forensic analysis (GhostLog) | |
| """ | |
| report_id: str | |
| title: str | |
| created_at: float | |
| target: str # File, directory, or module analyzed | |
| findings: List[DiagnosticFinding] = field(default_factory=list) | |
| summary: Dict[str, Any] = field(default_factory=dict) | |
| # Source data | |
| static_analysis: Dict[str, Any] = field(default_factory=dict) | |
| runtime_analysis: Dict[str, Any] = field(default_factory=dict) | |
| trace_analysis: Dict[str, Any] = field(default_factory=dict) | |
| forensic_analysis: Dict[str, Any] = field(default_factory=dict) | |
| def add_finding(self, finding: DiagnosticFinding): | |
| """Add a finding to the report.""" | |
| self.findings.append(finding) | |
| def get_findings_by_severity(self, severity: str) -> List[DiagnosticFinding]: | |
| """Get findings filtered by severity.""" | |
| return [f for f in self.findings if f.severity == severity] | |
| def get_findings_by_category(self, category: str) -> List[DiagnosticFinding]: | |
| """Get findings filtered by category.""" | |
| return [f for f in self.findings if f.category == category] | |
| def compute_summary(self): | |
| """Compute summary statistics.""" | |
| self.summary = { | |
| "total_findings": len(self.findings), | |
| "by_severity": {}, | |
| "by_category": {}, | |
| "critical_count": 0, | |
| "has_critical": False, | |
| } | |
| for finding in self.findings: | |
| # Count by severity | |
| sev = finding.severity | |
| self.summary["by_severity"][sev] = self.summary["by_severity"].get(sev, 0) + 1 | |
| # Count by category | |
| cat = finding.category | |
| self.summary["by_category"][cat] = self.summary["by_category"].get(cat, 0) + 1 | |
| self.summary["critical_count"] = self.summary["by_severity"].get("critical", 0) | |
| self.summary["has_critical"] = self.summary["critical_count"] > 0 | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary.""" | |
| return { | |
| "report_id": self.report_id, | |
| "title": self.title, | |
| "created_at": self.created_at, | |
| "target": self.target, | |
| "summary": self.summary, | |
| "findings": [ | |
| { | |
| "id": f.finding_id, | |
| "category": f.category, | |
| "severity": f.severity, | |
| "title": f.title, | |
| "description": f.description, | |
| "location": f.location, | |
| "evidence": f.evidence, | |
| "suggestions": f.suggestions, | |
| "confidence": f.confidence, | |
| } | |
| for f in self.findings | |
| ], | |
| "static_analysis": self.static_analysis, | |
| "runtime_analysis": self.runtime_analysis, | |
| "trace_analysis": self.trace_analysis, | |
| "forensic_analysis": self.forensic_analysis, | |
| } | |
| def to_json(self, indent: int = 2) -> str: | |
| """Convert to JSON string.""" | |
| return json.dumps(self.to_dict(), indent=indent, default=str) | |
| def save(self, path: str): | |
| """Save report to file.""" | |
| with open(path, 'w') as f: | |
| f.write(self.to_json()) | |
| def load(cls, path: str) -> "DiagnosticReport": | |
| """Load report from file.""" | |
| with open(path, 'r') as f: | |
| data = json.load(f) | |
| report = cls( | |
| report_id=data["report_id"], | |
| title=data["title"], | |
| created_at=data["created_at"], | |
| target=data["target"], | |
| ) | |
| report.summary = data.get("summary", {}) | |
| report.static_analysis = data.get("static_analysis", {}) | |
| report.runtime_analysis = data.get("runtime_analysis", {}) | |
| report.trace_analysis = data.get("trace_analysis", {}) | |
| report.forensic_analysis = data.get("forensic_analysis", {}) | |
| for f_data in data.get("findings", []): | |
| finding = DiagnosticFinding( | |
| finding_id=f_data["id"], | |
| category=f_data["category"], | |
| severity=f_data["severity"], | |
| title=f_data["title"], | |
| description=f_data["description"], | |
| location=f_data.get("location"), | |
| evidence=f_data.get("evidence", []), | |
| suggestions=f_data.get("suggestions", []), | |
| confidence=f_data.get("confidence", 1.0), | |
| ) | |
| report.findings.append(finding) | |
| return report | |
| class DiagnosticEngine: | |
| """ | |
| Engine for running comprehensive diagnostics. | |
| Usage: | |
| engine = DiagnosticEngine() | |
| # Analyze a file | |
| report = engine.analyze_file("path/to/file.py") | |
| # Analyze a directory | |
| report = engine.analyze_directory("path/to/project") | |
| # Analyze with runtime monitoring | |
| report = engine.analyze_execution(my_function, args) | |
| # Print report | |
| print(report.to_markdown()) | |
| """ | |
| def __init__(self): | |
| from .code_tracer import CodeTracer | |
| from .bug_detector import BugDetector | |
| from .execution_monitor import ExecutionMonitor | |
| self.tracer = CodeTracer() | |
| self.detector = BugDetector() | |
| self.monitor_class = ExecutionMonitor | |
| self._report_counter = 0 | |
| def analyze_file(self, file_path: str) -> DiagnosticReport: | |
| """Run static analysis on a single file.""" | |
| self._report_counter += 1 | |
| report = DiagnosticReport( | |
| report_id=self._generate_report_id(file_path), | |
| title=f"Diagnostic Report: {Path(file_path).name}", | |
| created_at=time.time(), | |
| target=file_path, | |
| ) | |
| # Run static analysis | |
| issues = self.detector.scan_file(file_path) | |
| for issue in issues: | |
| finding = DiagnosticFinding( | |
| finding_id=issue.issue_id, | |
| category="static", | |
| severity=issue.severity, | |
| title=issue.pattern_name.replace("_", " ").title(), | |
| description=issue.message, | |
| location={ | |
| "file": issue.file_path, | |
| "line": issue.line_number, | |
| "column": issue.column, | |
| }, | |
| evidence=[issue.code_snippet] if issue.code_snippet else [], | |
| suggestions=[issue.suggestion] if issue.suggestion else [], | |
| confidence=issue.confidence, | |
| ) | |
| report.add_finding(finding) | |
| report.static_analysis = self.detector.get_summary() | |
| report.compute_summary() | |
| return report | |
| def analyze_directory(self, dir_path: str, recursive: bool = True) -> DiagnosticReport: | |
| """Run static analysis on a directory.""" | |
| self._report_counter += 1 | |
| report = DiagnosticReport( | |
| report_id=self._generate_report_id(dir_path), | |
| title=f"Diagnostic Report: {Path(dir_path).name}", | |
| created_at=time.time(), | |
| target=dir_path, | |
| ) | |
| # Run static analysis | |
| issues = self.detector.scan_directory(dir_path, recursive) | |
| for issue in issues: | |
| finding = DiagnosticFinding( | |
| finding_id=issue.issue_id, | |
| category="static", | |
| severity=issue.severity, | |
| title=issue.pattern_name.replace("_", " ").title(), | |
| description=issue.message, | |
| location={ | |
| "file": issue.file_path, | |
| "line": issue.line_number, | |
| "column": issue.column, | |
| }, | |
| evidence=[issue.code_snippet] if issue.code_snippet else [], | |
| suggestions=[issue.suggestion] if issue.suggestion else [], | |
| confidence=issue.confidence, | |
| ) | |
| report.add_finding(finding) | |
| report.static_analysis = self.detector.get_summary() | |
| report.compute_summary() | |
| return report | |
| def analyze_execution(self, func, *args, **kwargs) -> DiagnosticReport: | |
| """Run diagnostics on function execution.""" | |
| self._report_counter += 1 | |
| func_name = getattr(func, '__name__', str(func)) | |
| report = DiagnosticReport( | |
| report_id=self._generate_report_id(func_name), | |
| title=f"Execution Diagnostic: {func_name}", | |
| created_at=time.time(), | |
| target=func_name, | |
| ) | |
| # Create a monitor for this execution | |
| monitor = self.monitor_class() | |
| result = None | |
| exception = None | |
| with monitor.monitoring(): | |
| try: | |
| result = func(*args, **kwargs) | |
| except Exception as e: | |
| exception = e | |
| # Convert anomalies to findings | |
| for anomaly in monitor.get_anomalies(): | |
| finding = DiagnosticFinding( | |
| finding_id=f"anomaly_{anomaly.frame_id}_{anomaly.timestamp}", | |
| category="runtime", | |
| severity=anomaly.severity, | |
| title=anomaly.anomaly_type.replace("_", " ").title(), | |
| description=anomaly.description, | |
| location=anomaly.context, | |
| confidence=1.0, | |
| ) | |
| report.add_finding(finding) | |
| # Add execution summary | |
| report.runtime_analysis = monitor.get_summary() | |
| report.runtime_analysis["hotspots"] = [ | |
| {"function": f, "total_ms": t, "calls": c} | |
| for f, t, c in monitor.get_hotspots(10) | |
| ] | |
| if exception: | |
| report.runtime_analysis["exception"] = str(exception) | |
| report.compute_summary() | |
| return report | |
| def _generate_report_id(self, target: str) -> str: | |
| """Generate a unique report ID.""" | |
| content = f"{target}:{time.time()}:{self._report_counter}" | |
| return hashlib.sha256(content.encode()).hexdigest()[:16] | |
| def to_markdown(self, report: DiagnosticReport) -> str: | |
| """Convert a report to Markdown format.""" | |
| lines = [ | |
| f"# {report.title}", | |
| "", | |
| f"**Report ID:** `{report.report_id}`", | |
| f"**Generated:** {datetime.fromtimestamp(report.created_at).isoformat()}", | |
| f"**Target:** `{report.target}`", | |
| "", | |
| "## Summary", | |
| "", | |
| f"- **Total Findings:** {report.summary.get('total_findings', 0)}", | |
| ] | |
| # Severity breakdown | |
| by_severity = report.summary.get("by_severity", {}) | |
| if by_severity: | |
| lines.append("") | |
| lines.append("### By Severity") | |
| lines.append("") | |
| icons = {"critical": "🔴", "error": "❌", "warning": "⚠️", "info": "ℹ️"} | |
| for sev in ["critical", "error", "warning", "info"]: | |
| count = by_severity.get(sev, 0) | |
| if count: | |
| lines.append(f"- {icons.get(sev, '•')} **{sev.title()}:** {count}") | |
| # Findings | |
| if report.findings: | |
| lines.extend(["", "## Findings", ""]) | |
| for finding in sorted(report.findings, | |
| key=lambda f: ["critical", "error", "warning", "info"].index(f.severity) | |
| if f.severity in ["critical", "error", "warning", "info"] else 99): | |
| icon = {"critical": "🔴", "error": "❌", "warning": "⚠️", "info": "ℹ️"}.get(finding.severity, "•") | |
| lines.append(f"### {icon} {finding.title}") | |
| lines.append("") | |
| lines.append(f"**Severity:** {finding.severity} | **Category:** {finding.category}") | |
| lines.append("") | |
| lines.append(finding.description) | |
| if finding.location: | |
| loc = finding.location | |
| if "file" in loc: | |
| lines.append("") | |
| lines.append(f"**Location:** `{loc.get('file', '')}:{loc.get('line', '')}`") | |
| if finding.evidence: | |
| lines.append("") | |
| lines.append("**Evidence:**") | |
| for ev in finding.evidence: | |
| lines.append(f"```") | |
| lines.append(ev) | |
| lines.append(f"```") | |
| if finding.suggestions: | |
| lines.append("") | |
| lines.append("**Suggestions:**") | |
| for sug in finding.suggestions: | |
| lines.append(f"- {sug}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| # ============================================================================= | |
| # CONVENIENCE FUNCTION | |
| # ============================================================================= | |
| def diagnose(target, **kwargs) -> DiagnosticReport: | |
| """ | |
| Convenience function to run diagnostics. | |
| Usage: | |
| # Analyze a file | |
| report = diagnose("path/to/file.py") | |
| # Analyze a directory | |
| report = diagnose("path/to/project/") | |
| # Analyze a function | |
| report = diagnose(my_function, arg1, arg2) | |
| """ | |
| engine = DiagnosticEngine() | |
| if callable(target): | |
| # It's a function | |
| return engine.analyze_execution(target, **kwargs) | |
| elif isinstance(target, str): | |
| path = Path(target) | |
| if path.is_file(): | |
| return engine.analyze_file(target) | |
| elif path.is_dir(): | |
| return engine.analyze_directory(target) | |
| raise ValueError(f"Cannot diagnose target: {target}") | |