shreyas-joshi's picture
Add training scripts and utilities for NodeAudit and GraphReview
902cd29
from __future__ import annotations
import json
import os
from concurrent.futures import ThreadPoolExecutor
import subprocess
from pathlib import Path
import sys
from pydantic import BaseModel
class LinterIssue(BaseModel):
tool: str
line: int
severity: str
code: str
message: str
_PYLINT_SEVERITY_MAP = {
"fatal": "high",
"error": "high",
"warning": "medium",
"refactor": "low",
"convention": "low",
"info": "low",
}
_BANDIT_SEVERITY_MAP = {
"high": "high",
"medium": "medium",
"low": "low",
}
def _timeout_seconds() -> int:
return int(os.getenv("GRAPHREVIEW_LINTER_TIMEOUT_SECONDS", "20"))
def run_pylint(path: Path) -> list[LinterIssue]:
cmd = [
sys.executable,
"-m",
"pylint",
str(path),
"--output-format=json2",
"--score=n",
"--reports=n",
"--errors-only",
]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=_timeout_seconds(),
)
except subprocess.TimeoutExpired:
return []
payload = (proc.stdout or "").strip()
if not payload:
return []
try:
data = json.loads(payload)
except json.JSONDecodeError:
return []
messages = data.get("messages", []) if isinstance(data, dict) else []
issues: list[LinterIssue] = []
for message in messages:
severity = _PYLINT_SEVERITY_MAP.get(str(message.get("type", "")).lower(), "low")
issues.append(
LinterIssue(
tool="pylint",
line=int(message.get("line", 0)),
severity=severity,
code=str(message.get("messageId", "PL0000")),
message=str(message.get("message", "")),
)
)
return issues
def run_bandit(path: Path) -> list[LinterIssue]:
cmd = [sys.executable, "-m", "bandit", "-q", "-f", "json", str(path)]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=_timeout_seconds(),
)
except subprocess.TimeoutExpired:
return []
payload = (proc.stdout or "").strip()
if not payload:
return []
try:
data = json.loads(payload)
except json.JSONDecodeError:
return []
results = data.get("results", []) if isinstance(data, dict) else []
issues: list[LinterIssue] = []
for item in results:
raw_sev = str(item.get("issue_severity", "LOW")).lower()
issues.append(
LinterIssue(
tool="bandit",
line=int(item.get("line_number", 0)),
severity=_BANDIT_SEVERITY_MAP.get(raw_sev, "low"),
code=str(item.get("test_id", "B000")),
message=str(item.get("issue_text", "")),
)
)
return issues
def run_pyright(path: Path) -> list[LinterIssue]:
pyright_bin = str((Path(sys.executable).resolve().parent / "pyright"))
cmd = [pyright_bin if Path(pyright_bin).exists() else "pyright", "--strict", "--outputjson", str(path)]
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False,
timeout=_timeout_seconds(),
)
except FileNotFoundError:
# Optional dependency in lightweight/docker environments.
return []
except subprocess.TimeoutExpired:
return []
payload = (proc.stdout or "").strip()
if not payload:
return []
try:
parsed = json.loads(payload)
except json.JSONDecodeError:
return []
issues: list[LinterIssue] = []
for item in parsed.get("generalDiagnostics", []):
if not isinstance(item, dict):
continue
if str(item.get("severity") or "").lower() != "error":
continue
line = int(((item.get("range") or {}).get("start") or {}).get("line") or 0) + 1
issues.append(
LinterIssue(
tool="pyright",
line=line,
severity="high",
code=str(item.get("rule") or "PYRIGHT"),
message=str(item.get("message") or ""),
)
)
return issues
def run_linters(path: Path) -> list[LinterIssue]:
with ThreadPoolExecutor(max_workers=3) as pool:
py_future = pool.submit(run_pylint, path)
ba_future = pool.submit(run_bandit, path)
fl_future = pool.submit(run_pyright, path)
issues = py_future.result()
issues.extend(ba_future.result())
issues.extend(fl_future.result())
return sorted(issues, key=lambda item: (item.line, item.tool, item.code, item.message))