| import subprocess, json, ast, os |
| from pathlib import Path |
|
|
| def run(cmd, cwd=None): |
| p = subprocess.run(cmd, capture_output=True, text=True, cwd=cwd) |
| return p.stdout |
|
|
| def scan_semgrep(path): |
| result = subprocess.run( |
| ["semgrep", "scan", "--config", "rules/taint.yaml", "--json", path], |
| capture_output=True, |
| text=True |
| ) |
|
|
| raw = result.stdout.strip() |
|
|
| if not raw: |
| return [] |
|
|
| try: |
| data = json.loads(raw) |
| return data.get("results", []) |
| except json.JSONDecodeError: |
| print("Semgrep returned non-JSON output:") |
| print(raw) |
| return [] |
|
|
| def scan_bandit(path): |
| result = subprocess.run( |
| ["bandit", "-r", path, "-f", "json"], |
| capture_output=True, |
| text=True |
| ) |
|
|
| |
| |
| |
| raw = result.stdout.strip() |
|
|
| if not raw: |
| return [] |
|
|
| try: |
| data = json.loads(raw) |
| return data.get("results", []) |
| except json.JSONDecodeError: |
| |
| print("Bandit returned non-JSON output:") |
| print(raw) |
| return [] |
| |
|
|
| def ast_inspect(file): |
| issues = [] |
| try: |
| tree = ast.parse(Path(file).read_text()) |
| for node in ast.walk(tree): |
| if isinstance(node, ast.Call): |
| if getattr(node.func, "id", "") in {"eval", "exec"}: |
| issues.append("Use of eval/exec enables arbitrary code execution.") |
| except Exception: |
| pass |
| return issues |
|
|
| def explain(finding): |
| base = f"**Why:** {finding.get('extra', {}).get('message','')}\n" |
| base += "\n**How to fix:** Replace unsafe API with validated input or safe alternative." |
| return base |
|
|
| def scan_path(path): |
| findings = [] |
|
|
| for r in scan_semgrep(path): |
| findings.append({ |
| "file": r["path"], |
| "line": r["start"]["line"], |
| "severity": r["extra"]["severity"], |
| "what": r["extra"]["message"], |
| "explain": explain(r) |
| }) |
|
|
| for b in scan_bandit(path): |
| findings.append({ |
| "file": b["filename"], |
| "line": b["line_number"], |
| "severity": b["issue_severity"], |
| "what": b["issue_text"], |
| "explain": "Insecure API usage detected." |
| }) |
|
|
| for py in Path(path).rglob("*.py"): |
| for issue in ast_inspect(py): |
| findings.append({ |
| "file": str(py), |
| "line": 0, |
| "severity": "WARNING", |
| "what": issue, |
| "explain": "AST analysis confirms unsafe construct." |
| }) |
|
|
| return findings |