tostido's picture
feat(diagnostics): Add code bug exposure system v0.6.0
1754798
"""
CASCADE Diagnostic Report - Generate comprehensive diagnostic reports.
Combines:
- CodeTracer execution traces
- BugDetector static analysis
- ExecutionMonitor runtime anomalies
- GhostLog forensic reconstruction
Into a unified diagnostic report.
"""
import time
import json
import hashlib
from typing import Any, Dict, List, Optional, Set
from dataclasses import dataclass, field
from pathlib import Path
from datetime import datetime
from cascade.core.graph import CausationGraph
from cascade.forensics.analyzer import DataForensics, GhostLog
@dataclass
class DiagnosticFinding:
"""A single diagnostic finding."""
finding_id: str
category: str # "static", "runtime", "forensic", "trace"
severity: str # "critical", "error", "warning", "info"
title: str
description: str
location: Optional[Dict[str, Any]] = None # file, line, function
evidence: List[str] = field(default_factory=list)
related_findings: List[str] = field(default_factory=list)
suggestions: List[str] = field(default_factory=list)
confidence: float = 1.0
timestamp: float = field(default_factory=time.time)
@dataclass
class DiagnosticReport:
"""
A comprehensive diagnostic report.
Aggregates findings from multiple sources:
- Static analysis (BugDetector)
- Runtime monitoring (ExecutionMonitor)
- Execution tracing (CodeTracer)
- Forensic analysis (GhostLog)
"""
report_id: str
title: str
created_at: float
target: str # File, directory, or module analyzed
findings: List[DiagnosticFinding] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
# Source data
static_analysis: Dict[str, Any] = field(default_factory=dict)
runtime_analysis: Dict[str, Any] = field(default_factory=dict)
trace_analysis: Dict[str, Any] = field(default_factory=dict)
forensic_analysis: Dict[str, Any] = field(default_factory=dict)
def add_finding(self, finding: DiagnosticFinding):
"""Add a finding to the report."""
self.findings.append(finding)
def get_findings_by_severity(self, severity: str) -> List[DiagnosticFinding]:
"""Get findings filtered by severity."""
return [f for f in self.findings if f.severity == severity]
def get_findings_by_category(self, category: str) -> List[DiagnosticFinding]:
"""Get findings filtered by category."""
return [f for f in self.findings if f.category == category]
def compute_summary(self):
"""Compute summary statistics."""
self.summary = {
"total_findings": len(self.findings),
"by_severity": {},
"by_category": {},
"critical_count": 0,
"has_critical": False,
}
for finding in self.findings:
# Count by severity
sev = finding.severity
self.summary["by_severity"][sev] = self.summary["by_severity"].get(sev, 0) + 1
# Count by category
cat = finding.category
self.summary["by_category"][cat] = self.summary["by_category"].get(cat, 0) + 1
self.summary["critical_count"] = self.summary["by_severity"].get("critical", 0)
self.summary["has_critical"] = self.summary["critical_count"] > 0
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"report_id": self.report_id,
"title": self.title,
"created_at": self.created_at,
"target": self.target,
"summary": self.summary,
"findings": [
{
"id": f.finding_id,
"category": f.category,
"severity": f.severity,
"title": f.title,
"description": f.description,
"location": f.location,
"evidence": f.evidence,
"suggestions": f.suggestions,
"confidence": f.confidence,
}
for f in self.findings
],
"static_analysis": self.static_analysis,
"runtime_analysis": self.runtime_analysis,
"trace_analysis": self.trace_analysis,
"forensic_analysis": self.forensic_analysis,
}
def to_json(self, indent: int = 2) -> str:
"""Convert to JSON string."""
return json.dumps(self.to_dict(), indent=indent, default=str)
def save(self, path: str):
"""Save report to file."""
with open(path, 'w') as f:
f.write(self.to_json())
@classmethod
def load(cls, path: str) -> "DiagnosticReport":
"""Load report from file."""
with open(path, 'r') as f:
data = json.load(f)
report = cls(
report_id=data["report_id"],
title=data["title"],
created_at=data["created_at"],
target=data["target"],
)
report.summary = data.get("summary", {})
report.static_analysis = data.get("static_analysis", {})
report.runtime_analysis = data.get("runtime_analysis", {})
report.trace_analysis = data.get("trace_analysis", {})
report.forensic_analysis = data.get("forensic_analysis", {})
for f_data in data.get("findings", []):
finding = DiagnosticFinding(
finding_id=f_data["id"],
category=f_data["category"],
severity=f_data["severity"],
title=f_data["title"],
description=f_data["description"],
location=f_data.get("location"),
evidence=f_data.get("evidence", []),
suggestions=f_data.get("suggestions", []),
confidence=f_data.get("confidence", 1.0),
)
report.findings.append(finding)
return report
class DiagnosticEngine:
"""
Engine for running comprehensive diagnostics.
Usage:
engine = DiagnosticEngine()
# Analyze a file
report = engine.analyze_file("path/to/file.py")
# Analyze a directory
report = engine.analyze_directory("path/to/project")
# Analyze with runtime monitoring
report = engine.analyze_execution(my_function, args)
# Print report
print(report.to_markdown())
"""
def __init__(self):
from .code_tracer import CodeTracer
from .bug_detector import BugDetector
from .execution_monitor import ExecutionMonitor
self.tracer = CodeTracer()
self.detector = BugDetector()
self.monitor_class = ExecutionMonitor
self._report_counter = 0
def analyze_file(self, file_path: str) -> DiagnosticReport:
"""Run static analysis on a single file."""
self._report_counter += 1
report = DiagnosticReport(
report_id=self._generate_report_id(file_path),
title=f"Diagnostic Report: {Path(file_path).name}",
created_at=time.time(),
target=file_path,
)
# Run static analysis
issues = self.detector.scan_file(file_path)
for issue in issues:
finding = DiagnosticFinding(
finding_id=issue.issue_id,
category="static",
severity=issue.severity,
title=issue.pattern_name.replace("_", " ").title(),
description=issue.message,
location={
"file": issue.file_path,
"line": issue.line_number,
"column": issue.column,
},
evidence=[issue.code_snippet] if issue.code_snippet else [],
suggestions=[issue.suggestion] if issue.suggestion else [],
confidence=issue.confidence,
)
report.add_finding(finding)
report.static_analysis = self.detector.get_summary()
report.compute_summary()
return report
def analyze_directory(self, dir_path: str, recursive: bool = True) -> DiagnosticReport:
"""Run static analysis on a directory."""
self._report_counter += 1
report = DiagnosticReport(
report_id=self._generate_report_id(dir_path),
title=f"Diagnostic Report: {Path(dir_path).name}",
created_at=time.time(),
target=dir_path,
)
# Run static analysis
issues = self.detector.scan_directory(dir_path, recursive)
for issue in issues:
finding = DiagnosticFinding(
finding_id=issue.issue_id,
category="static",
severity=issue.severity,
title=issue.pattern_name.replace("_", " ").title(),
description=issue.message,
location={
"file": issue.file_path,
"line": issue.line_number,
"column": issue.column,
},
evidence=[issue.code_snippet] if issue.code_snippet else [],
suggestions=[issue.suggestion] if issue.suggestion else [],
confidence=issue.confidence,
)
report.add_finding(finding)
report.static_analysis = self.detector.get_summary()
report.compute_summary()
return report
def analyze_execution(self, func, *args, **kwargs) -> DiagnosticReport:
"""Run diagnostics on function execution."""
self._report_counter += 1
func_name = getattr(func, '__name__', str(func))
report = DiagnosticReport(
report_id=self._generate_report_id(func_name),
title=f"Execution Diagnostic: {func_name}",
created_at=time.time(),
target=func_name,
)
# Create a monitor for this execution
monitor = self.monitor_class()
result = None
exception = None
with monitor.monitoring():
try:
result = func(*args, **kwargs)
except Exception as e:
exception = e
# Convert anomalies to findings
for anomaly in monitor.get_anomalies():
finding = DiagnosticFinding(
finding_id=f"anomaly_{anomaly.frame_id}_{anomaly.timestamp}",
category="runtime",
severity=anomaly.severity,
title=anomaly.anomaly_type.replace("_", " ").title(),
description=anomaly.description,
location=anomaly.context,
confidence=1.0,
)
report.add_finding(finding)
# Add execution summary
report.runtime_analysis = monitor.get_summary()
report.runtime_analysis["hotspots"] = [
{"function": f, "total_ms": t, "calls": c}
for f, t, c in monitor.get_hotspots(10)
]
if exception:
report.runtime_analysis["exception"] = str(exception)
report.compute_summary()
return report
def _generate_report_id(self, target: str) -> str:
"""Generate a unique report ID."""
content = f"{target}:{time.time()}:{self._report_counter}"
return hashlib.sha256(content.encode()).hexdigest()[:16]
def to_markdown(self, report: DiagnosticReport) -> str:
"""Convert a report to Markdown format."""
lines = [
f"# {report.title}",
"",
f"**Report ID:** `{report.report_id}`",
f"**Generated:** {datetime.fromtimestamp(report.created_at).isoformat()}",
f"**Target:** `{report.target}`",
"",
"## Summary",
"",
f"- **Total Findings:** {report.summary.get('total_findings', 0)}",
]
# Severity breakdown
by_severity = report.summary.get("by_severity", {})
if by_severity:
lines.append("")
lines.append("### By Severity")
lines.append("")
icons = {"critical": "🔴", "error": "❌", "warning": "⚠️", "info": "ℹ️"}
for sev in ["critical", "error", "warning", "info"]:
count = by_severity.get(sev, 0)
if count:
lines.append(f"- {icons.get(sev, '•')} **{sev.title()}:** {count}")
# Findings
if report.findings:
lines.extend(["", "## Findings", ""])
for finding in sorted(report.findings,
key=lambda f: ["critical", "error", "warning", "info"].index(f.severity)
if f.severity in ["critical", "error", "warning", "info"] else 99):
icon = {"critical": "🔴", "error": "❌", "warning": "⚠️", "info": "ℹ️"}.get(finding.severity, "•")
lines.append(f"### {icon} {finding.title}")
lines.append("")
lines.append(f"**Severity:** {finding.severity} | **Category:** {finding.category}")
lines.append("")
lines.append(finding.description)
if finding.location:
loc = finding.location
if "file" in loc:
lines.append("")
lines.append(f"**Location:** `{loc.get('file', '')}:{loc.get('line', '')}`")
if finding.evidence:
lines.append("")
lines.append("**Evidence:**")
for ev in finding.evidence:
lines.append(f"```")
lines.append(ev)
lines.append(f"```")
if finding.suggestions:
lines.append("")
lines.append("**Suggestions:**")
for sug in finding.suggestions:
lines.append(f"- {sug}")
lines.append("")
return "\n".join(lines)
# =============================================================================
# CONVENIENCE FUNCTION
# =============================================================================
def diagnose(target, **kwargs) -> DiagnosticReport:
"""
Convenience function to run diagnostics.
Usage:
# Analyze a file
report = diagnose("path/to/file.py")
# Analyze a directory
report = diagnose("path/to/project/")
# Analyze a function
report = diagnose(my_function, arg1, arg2)
"""
engine = DiagnosticEngine()
if callable(target):
# It's a function
return engine.analyze_execution(target, **kwargs)
elif isinstance(target, str):
path = Path(target)
if path.is_file():
return engine.analyze_file(target)
elif path.is_dir():
return engine.analyze_directory(target)
raise ValueError(f"Cannot diagnose target: {target}")