Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| import subprocess | |
| from pathlib import Path | |
| import sys | |
| from pydantic import BaseModel | |
| class LinterIssue(BaseModel): | |
| tool: str | |
| line: int | |
| severity: str | |
| code: str | |
| message: str | |
| _PYLINT_SEVERITY_MAP = { | |
| "fatal": "high", | |
| "error": "high", | |
| "warning": "medium", | |
| "refactor": "low", | |
| "convention": "low", | |
| "info": "low", | |
| } | |
| _BANDIT_SEVERITY_MAP = { | |
| "high": "high", | |
| "medium": "medium", | |
| "low": "low", | |
| } | |
| def _timeout_seconds() -> int: | |
| return int(os.getenv("GRAPHREVIEW_LINTER_TIMEOUT_SECONDS", "20")) | |
| def run_pylint(path: Path) -> list[LinterIssue]: | |
| cmd = [ | |
| sys.executable, | |
| "-m", | |
| "pylint", | |
| str(path), | |
| "--output-format=json2", | |
| "--score=n", | |
| "--reports=n", | |
| "--errors-only", | |
| ] | |
| try: | |
| proc = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=False, | |
| timeout=_timeout_seconds(), | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return [] | |
| payload = (proc.stdout or "").strip() | |
| if not payload: | |
| return [] | |
| try: | |
| data = json.loads(payload) | |
| except json.JSONDecodeError: | |
| return [] | |
| messages = data.get("messages", []) if isinstance(data, dict) else [] | |
| issues: list[LinterIssue] = [] | |
| for message in messages: | |
| severity = _PYLINT_SEVERITY_MAP.get(str(message.get("type", "")).lower(), "low") | |
| issues.append( | |
| LinterIssue( | |
| tool="pylint", | |
| line=int(message.get("line", 0)), | |
| severity=severity, | |
| code=str(message.get("messageId", "PL0000")), | |
| message=str(message.get("message", "")), | |
| ) | |
| ) | |
| return issues | |
| def run_bandit(path: Path) -> list[LinterIssue]: | |
| cmd = [sys.executable, "-m", "bandit", "-q", "-f", "json", str(path)] | |
| try: | |
| proc = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=False, | |
| timeout=_timeout_seconds(), | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return [] | |
| payload = (proc.stdout or "").strip() | |
| if not payload: | |
| return [] | |
| try: | |
| data = json.loads(payload) | |
| except json.JSONDecodeError: | |
| return [] | |
| results = data.get("results", []) if isinstance(data, dict) else [] | |
| issues: list[LinterIssue] = [] | |
| for item in results: | |
| raw_sev = str(item.get("issue_severity", "LOW")).lower() | |
| issues.append( | |
| LinterIssue( | |
| tool="bandit", | |
| line=int(item.get("line_number", 0)), | |
| severity=_BANDIT_SEVERITY_MAP.get(raw_sev, "low"), | |
| code=str(item.get("test_id", "B000")), | |
| message=str(item.get("issue_text", "")), | |
| ) | |
| ) | |
| return issues | |
| def run_pyright(path: Path) -> list[LinterIssue]: | |
| pyright_bin = str((Path(sys.executable).resolve().parent / "pyright")) | |
| cmd = [pyright_bin if Path(pyright_bin).exists() else "pyright", "--strict", "--outputjson", str(path)] | |
| try: | |
| proc = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| check=False, | |
| timeout=_timeout_seconds(), | |
| ) | |
| except FileNotFoundError: | |
| # Optional dependency in lightweight/docker environments. | |
| return [] | |
| except subprocess.TimeoutExpired: | |
| return [] | |
| payload = (proc.stdout or "").strip() | |
| if not payload: | |
| return [] | |
| try: | |
| parsed = json.loads(payload) | |
| except json.JSONDecodeError: | |
| return [] | |
| issues: list[LinterIssue] = [] | |
| for item in parsed.get("generalDiagnostics", []): | |
| if not isinstance(item, dict): | |
| continue | |
| if str(item.get("severity") or "").lower() != "error": | |
| continue | |
| line = int(((item.get("range") or {}).get("start") or {}).get("line") or 0) + 1 | |
| issues.append( | |
| LinterIssue( | |
| tool="pyright", | |
| line=line, | |
| severity="high", | |
| code=str(item.get("rule") or "PYRIGHT"), | |
| message=str(item.get("message") or ""), | |
| ) | |
| ) | |
| return issues | |
| def run_linters(path: Path) -> list[LinterIssue]: | |
| with ThreadPoolExecutor(max_workers=3) as pool: | |
| py_future = pool.submit(run_pylint, path) | |
| ba_future = pool.submit(run_bandit, path) | |
| fl_future = pool.submit(run_pyright, path) | |
| issues = py_future.result() | |
| issues.extend(ba_future.result()) | |
| issues.extend(fl_future.result()) | |
| return sorted(issues, key=lambda item: (item.line, item.tool, item.code, item.message)) | |