Spaces:
Sleeping
Sleeping
| """Evidence gathering layer using GritQL for deterministic code analysis.""" | |
| import concurrent.futures | |
| import os | |
| import subprocess | |
| import zipfile | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Generator | |
| GRITQL_PATTERNS = [ | |
| { | |
| "category": "secret_password", | |
| "pattern": 'or { `DB_PASSWORD = $_`, `PASSWORD = $_`, `$PASS = $_` where { $PASS <: r"(?i).*password" } }', | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "secret_api_key", | |
| "pattern": 'or { `API_KEY = $_`, `SECRET_KEY = $_`, `STRIPE_KEY = $_` }', | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "secret_aws", | |
| "pattern": '`AWS_SECRET = $_`', | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "secret_js", | |
| "pattern": 'or { `STRIPE_KEY = $_`, `JWT_SECRET = $_` }', | |
| "language": None, | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "connection_string", | |
| "pattern": '`self.connection_string = "$CONN"` where { $CONN <: r"mysql://.+" }', | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "todo_py", | |
| "pattern": "`# TODO: $_`", | |
| "language": "python", | |
| "severity_hint": "LOW", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "todo_js", | |
| "pattern": "`// TODO: $_`", | |
| "language": None, | |
| "severity_hint": "LOW", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "fixme_py", | |
| "pattern": "`# FIXME: $_`", | |
| "language": "python", | |
| "severity_hint": "MEDIUM", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "fixme_js", | |
| "pattern": "`// FIXME: $_`", | |
| "language": None, | |
| "severity_hint": "MEDIUM", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "hack_py", | |
| "pattern": "`# HACK: $_`", | |
| "language": "python", | |
| "severity_hint": "MEDIUM", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "hack_js", | |
| "pattern": "`// HACK: $_`", | |
| "language": None, | |
| "severity_hint": "MEDIUM", | |
| "domain": "quality", | |
| }, | |
| { | |
| "category": "eval_usage", | |
| "pattern": "`eval($_)`", | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "pickle_load", | |
| "pattern": "`pickle.load($_)`", | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "os_system", | |
| "pattern": "`os.system($_)`", | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "subprocess_shell", | |
| "pattern": "`subprocess.call($_, shell=True)`", | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "md5_hash", | |
| "pattern": "`hashlib.md5($_)`", | |
| "language": "python", | |
| "severity_hint": "HIGH", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "sql_injection_fstring", | |
| "pattern": r'`$S` where { $S <: r"f\"SELECT.*\{.*\}\"" }', | |
| "language": "python", | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| { | |
| "category": "sql_injection_js", | |
| "pattern": r'`$STR` where { $STR <: r"`SELECT.*\$\{.*\}`" }', | |
| "language": None, | |
| "severity_hint": "CRITICAL", | |
| "domain": "security", | |
| }, | |
| ] | |
| class Finding: | |
| """A single finding from the evidence layer.""" | |
| category: str | |
| file: str | |
| line: str | |
| code: str | |
| severity_hint: str | |
| domain: str | |
| metadata: dict = field(default_factory=dict) | |
| def __str__(self) -> str: | |
| return f"[{self.severity_hint}] {self.file}:{self.line.strip()} — {self.code.strip()}" | |
| class EvidenceReport: | |
| """Aggregated evidence from all GritQL scans.""" | |
| target_path: str | |
| findings: list[Finding] = field(default_factory=list) | |
| file_count: int = 0 | |
| total_patterns: int = 0 | |
| patterns_with_hits: int = 0 | |
| def findings_by_domain(self) -> dict[str, list[Finding]]: | |
| grouped: dict[str, list[Finding]] = {} | |
| for f in self.findings: | |
| grouped.setdefault(f.domain, []).append(f) | |
| return grouped | |
| def findings_by_severity(self) -> dict[str, list[Finding]]: | |
| grouped: dict[str, list[Finding]] = {} | |
| for f in self.findings: | |
| grouped.setdefault(f.severity_hint, []).append(f) | |
| return grouped | |
| def to_text(self) -> str: | |
| """Format the full report as text for agent context.""" | |
| lines = [f"=== FORENSIC EVIDENCE REPORT ==="] | |
| lines.append(f"Target: {self.target_path}") | |
| lines.append(f"Files scanned: {self.file_count}") | |
| lines.append(f"Total findings: {len(self.findings)}") | |
| lines.append("") | |
| for domain, findings in self.findings_by_domain.items(): | |
| lines.append(f"--- {domain.upper()} EVIDENCE ({len(findings)} findings) ---") | |
| for f in findings: | |
| lines.append(str(f)) | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _parse_gritql_output(raw: str) -> list[tuple[str, str, str]]: | |
| """Parse grit CLI output into (file, line_number, code_snippet) tuples.""" | |
| results = [] | |
| current_file = None | |
| for line in raw.splitlines(): | |
| stripped = line.rstrip() | |
| if not stripped: | |
| continue | |
| if stripped.startswith("Processed") and "files" in stripped: | |
| continue | |
| if stripped and not stripped[0].isspace() and ("." in stripped or "/" in stripped): | |
| current_file = stripped | |
| elif current_file and stripped and stripped[0].isspace(): | |
| content = stripped.strip() | |
| if content and content[0].isdigit(): | |
| parts = content.split(None, 1) | |
| if parts: | |
| line_num = parts[0] | |
| code = parts[1] if len(parts) > 1 else "" | |
| results.append((current_file, line_num, code)) | |
| return results | |
| def run_gritql_scan(pattern_def: dict, target_dir: str) -> list[Finding]: | |
| """Run a single GritQL pattern and return structured findings.""" | |
| cmd = ["grit", "apply", "--dry-run", pattern_def["pattern"], target_dir] | |
| if pattern_def.get("language"): | |
| cmd += ["--language", pattern_def["language"]] | |
| try: | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) | |
| except FileNotFoundError: | |
| raise RuntimeError("'grit' CLI not found. Install with: npm install -g @getgrit/cli") | |
| except subprocess.TimeoutExpired: | |
| return [] | |
| output = result.stdout.strip() | |
| if not output or "found 0 matches" in output: | |
| return [] | |
| matches = _parse_gritql_output(output) | |
| findings = [] | |
| for file_path, line_num, code in matches: | |
| findings.append( | |
| Finding( | |
| category=pattern_def["category"], | |
| file=file_path, | |
| line=line_num, | |
| code=code, | |
| severity_hint=pattern_def["severity_hint"], | |
| domain=pattern_def["domain"], | |
| ) | |
| ) | |
| return findings | |
| def _ensure_grit_initialized(target_dir: str) -> None: | |
| """Run 'grit init' if no .grit directory exists.""" | |
| grit_dir = Path(target_dir) / ".grit" | |
| if not grit_dir.exists(): | |
| try: | |
| subprocess.run( | |
| ["grit", "init"], | |
| cwd=target_dir, | |
| capture_output=True, | |
| timeout=15, | |
| ) | |
| except Exception: | |
| pass | |
| _SOURCE_EXTENSIONS = ( | |
| ".py", ".js", ".ts", ".jsx", ".tsx", ".java", ".go", ".rb", ".php", ".c", ".cpp", | |
| ) | |
| def _count_source_files(target_dir: str) -> int: | |
| """Count source files in a directory tree.""" | |
| count = 0 | |
| for ext in _SOURCE_EXTENSIONS: | |
| count += sum(1 for _ in Path(target_dir).rglob(f"*{ext}")) | |
| return count | |
| def _deduplicate_findings(findings: list[Finding]) -> list[Finding]: | |
| """Merge findings on same file+line into one Finding with multiple categories.""" | |
| seen: dict[tuple[str, str], Finding] = {} | |
| for f in findings: | |
| key = (f.file, f.line.strip()) | |
| if key in seen: | |
| existing = seen[key] | |
| if f.category not in existing.metadata.get("categories", []): | |
| existing.metadata.setdefault("categories", [existing.category]) | |
| existing.metadata["categories"].append(f.category) | |
| sev_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} | |
| if sev_order.get(f.severity_hint, 99) < sev_order.get(existing.severity_hint, 99): | |
| existing.severity_hint = f.severity_hint | |
| else: | |
| f_copy = Finding( | |
| category=f.category, | |
| file=f.file, | |
| line=f.line, | |
| code=f.code, | |
| severity_hint=f.severity_hint, | |
| domain=f.domain, | |
| metadata=f.metadata.copy(), | |
| ) | |
| seen[key] = f_copy | |
| return list(seen.values()) | |
| def safe_extract_zip(zip_path: str, target_dir: str) -> None: | |
| """Extract a zip file safely, preventing zip-slip attacks.""" | |
| with zipfile.ZipFile(zip_path, "r") as zf: | |
| for member in zf.infolist(): | |
| member_path = os.path.realpath(os.path.join(target_dir, member.filename)) | |
| if not member_path.startswith(os.path.realpath(target_dir) + os.sep): | |
| raise ValueError(f"Zip slip detected: {member.filename} escapes target directory") | |
| zf.extractall(target_dir) | |
| def gather_evidence(target_dir: str) -> EvidenceReport: | |
| """Run all GritQL patterns and return a structured evidence report.""" | |
| _ensure_grit_initialized(target_dir) | |
| file_count = _count_source_files(target_dir) | |
| all_findings: list[Finding] = [] | |
| patterns_with_hits = 0 | |
| for p in GRITQL_PATTERNS: | |
| findings = run_gritql_scan(p, target_dir) | |
| if findings: | |
| patterns_with_hits += 1 | |
| all_findings.extend(findings) | |
| return EvidenceReport( | |
| target_path=target_dir, | |
| findings=all_findings, | |
| file_count=file_count, | |
| total_patterns=len(GRITQL_PATTERNS), | |
| patterns_with_hits=patterns_with_hits, | |
| ) | |
| def gather_evidence_streaming(target_dir: str) -> Generator: | |
| """Run GritQL patterns one by one, yielding status strings then the final EvidenceReport.""" | |
| _ensure_grit_initialized(target_dir) | |
| file_count = _count_source_files(target_dir) | |
| all_findings: list[Finding] = [] | |
| patterns_with_hits = 0 | |
| total = len(GRITQL_PATTERNS) | |
| for i, p in enumerate(GRITQL_PATTERNS): | |
| yield f"Scanning pattern {i + 1}/{total}: **{p['category']}**..." | |
| findings = run_gritql_scan(p, target_dir) | |
| if findings: | |
| patterns_with_hits += 1 | |
| all_findings.extend(findings) | |
| yield EvidenceReport( | |
| target_path=target_dir, | |
| findings=all_findings, | |
| file_count=file_count, | |
| total_patterns=total, | |
| patterns_with_hits=patterns_with_hits, | |
| ) | |
| def gather_evidence_parallel( | |
| target_dir: str, | |
| max_workers: int = 4, | |
| timeout: int = 60, | |
| ) -> EvidenceReport: | |
| """Run all GritQL patterns in parallel and return a structured evidence report.""" | |
| _ensure_grit_initialized(target_dir) | |
| file_count = _count_source_files(target_dir) | |
| all_findings: list[Finding] = [] | |
| patterns_with_hits = 0 | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| futures = { | |
| executor.submit(run_gritql_scan, p, target_dir): p | |
| for p in GRITQL_PATTERNS | |
| } | |
| for future in concurrent.futures.as_completed(futures): | |
| try: | |
| findings = future.result(timeout=timeout) | |
| except Exception: | |
| findings = [] | |
| if findings: | |
| patterns_with_hits += 1 | |
| all_findings.extend(findings) | |
| all_findings = _deduplicate_findings(all_findings) | |
| return EvidenceReport( | |
| target_path=target_dir, | |
| findings=all_findings, | |
| file_count=file_count, | |
| total_patterns=len(GRITQL_PATTERNS), | |
| patterns_with_hits=patterns_with_hits, | |
| ) | |