PRobe / environment /scanner.py
mahithakur's picture
Fix JSON parsing and environment bugs
c22ceaa
"""
Simulated Static-Analysis Scanner - live tool interaction layer.
Calling run_scanner(task, seed) simulates what happens when an agent invokes
an external security/lint scanner (e.g. Bandit, Semgrep, Pylint) against
the code under review.
Noise model (seed-controlled for reproducibility)
--------------------------------------------------
Recall: Each real issue is reported with probability SCANNER_RECALL (0.70).
~30 pct of issues are silently missed - the agent cannot rely
solely on the scanner.
Precision: SCANNER_NOISE_RATE false-positive findings are injected per run
(0 to MAX_FALSE_POSITIVES). False positives reference
plausible-but-wrong line numbers so the agent must verify first.
Important: scanner results are NOT automatically graded. The agent must
still call ADD_COMMENT with the correct line + keyword to earn reward.
"""
from __future__ import annotations
import random
import zlib
from typing import Any
# XOR mask keeps the scanner RNG independent from the mutation-engine RNG
# even when both receive the same episode seed.
_SCANNER_RNG_SEED_MASK: int = 0xDEAD_BEEF
__all__ = ["run_scanner"]
# -- Noise parameters --------------------------------------------------------
SCANNER_RECALL: float = 0.70 # probability each real issue is reported
SCANNER_NOISE_RATE: float = 0.40 # probability a false-positive is injected
MAX_FALSE_POSITIVES: int = 2 # cap on false positives per run
# -- False-positive templates ------------------------------------------------
# Plausible-sounding scanner warnings that do not correspond to any real
# ground-truth issue. Line numbers are randomised at runtime.
_FALSE_POSITIVE_TEMPLATES: list[dict[str, str]] = [
{
"rule": "B105",
"message": "Possible hardcoded password: variable assigned string literal",
"category": "security",
"severity": "LOW",
},
{
"rule": "B324",
"message": "Use of weak MD4 or MD5 hash for security; consider stronger algorithm",
"category": "security",
"severity": "MEDIUM",
},
{
"rule": "W0611",
"message": "Imported but unused module detected in scope",
"category": "style",
"severity": "LOW",
},
{
"rule": "C0301",
"message": "Line too long (82 chars) - PEP 8 violation",
"category": "style",
"severity": "LOW",
},
{
"rule": "B007",
"message": "Loop control variable used in inner scope only; consider renaming to _",
"category": "style",
"severity": "LOW",
},
{
"rule": "W0702",
"message": "No exception type(s) specified in bare except clause",
"category": "bug",
"severity": "MEDIUM",
},
{
"rule": "B603",
"message": "subprocess call without shell=True; verify no injection risk",
"category": "security",
"severity": "LOW",
},
]
_SCANNER_TOOL_NAMES: list[str] = ["bandit 1.7.5", "semgrep 1.45.0", "pylint 3.1.0"]
_UNVERIFIED_FINDINGS_NOTE: str = (
"Scanner findings are UNVERIFIED. Use GET_CONTEXT or ADD_COMMENT "
"to confirm each finding before including it in your review. "
"False positives will be penalised."
)
def run_scanner(task: dict[str, Any], seed: int) -> dict[str, Any]:
"""
Simulate running a static-analysis scanner against the task's code.
Parameters
----------
task:
A (possibly mutated) task dict containing 'code' and 'issues'.
seed:
Reproducibility seed. Different seeds produce different recall/FP draws.
Returns
-------
dict with keys:
tool: Scanner name + version string.
findings: list[dict] - mix of true positives (recalled) and FPs.
missed_count: How many real issues were silently missed.
note: Reminder that findings must be verified before commenting.
"""
rng = random.Random(seed ^ _SCANNER_RNG_SEED_MASK)
total_code_lines = max(len(task["code"].split("\n")), 1)
true_positive_findings, missed_count = _build_true_positive_findings(
task=task,
total_code_lines=total_code_lines,
rng=rng,
)
false_positive_findings = _build_false_positive_findings(
task=task,
total_code_lines=total_code_lines,
rng=rng,
)
all_findings = true_positive_findings + false_positive_findings
# Shuffle so false positives are not trivially identifiable by position.
rng.shuffle(all_findings)
return {
"tool": rng.choice(_SCANNER_TOOL_NAMES),
"findings": all_findings,
"missed_count": missed_count,
"note": _UNVERIFIED_FINDINGS_NOTE,
}
# -- Private helpers ---------------------------------------------------------
def _build_true_positive_findings(
task: dict[str, Any],
total_code_lines: int,
rng: random.Random,
) -> tuple[list[dict[str, Any]], int]:
"""
Recall each real issue with probability SCANNER_RECALL.
Returns a 2-tuple of (findings_list, missed_count).
"""
findings: list[dict[str, Any]] = []
missed_count = 0
for issue in task.get("issues", []):
if rng.random() >= SCANNER_RECALL:
missed_count += 1
continue
start, end = issue["line_range"]
# Report midpoint with small stochastic jitter (+-1) so the agent
# cannot blindly trust the reported line number.
jitter = rng.choice([-1, 0, 0, 1]) # biased toward 0
reported_line = max(1, min(total_code_lines, (start + end) // 2 + jitter))
findings.append({
"line": reported_line,
"rule": _pick_rule_for_category(issue.get("category", "bug"), rng),
"message": _build_issue_message(issue),
"category": issue.get("category", "bug"),
"severity": _map_severity_to_scanner_level(issue.get("severity", "warning")),
"verified": False,
})
return findings, missed_count
def _build_false_positive_findings(
task: dict[str, Any],
total_code_lines: int,
rng: random.Random,
) -> list[dict[str, Any]]:
"""
Inject up to MAX_FALSE_POSITIVES noise findings on lines away from real issues.
"""
false_positive_count = sum(
1 for _ in range(MAX_FALSE_POSITIVES) if rng.random() < SCANNER_NOISE_RATE
)
if false_positive_count == 0:
return []
# Collect line numbers occupied by real issues to avoid placing FPs there.
real_issue_lines: set[int] = {
line_num
for issue in task.get("issues", [])
for line_num in range(issue["line_range"][0], issue["line_range"][1] + 1)
}
candidate_lines = [
line_num for line_num in range(1, total_code_lines + 1)
if line_num not in real_issue_lines
# If every line belongs to a real issue (tiny synthetic tasks), fall back to
# the full line range so rng.choice always has candidates to pick from.
] or list(range(1, total_code_lines + 1))
findings: list[dict[str, Any]] = []
for _ in range(false_positive_count):
template = rng.choice(_FALSE_POSITIVE_TEMPLATES)
findings.append({
"line": rng.choice(candidate_lines),
"rule": template["rule"],
"message": template["message"],
"category": template["category"],
"severity": template["severity"],
"verified": False,
})
return findings
def _pick_rule_for_category(category: str, rng: random.Random) -> str:
"""Return a plausible lint/security rule ID for the given issue category."""
rules_by_category: dict[str, list[str]] = {
"security": ["B101", "B102", "B105", "B106", "B201", "B301", "B501"],
"bug": ["E501", "W0611", "W0702", "E711", "E712"],
"performance": ["W0640", "C0200", "W0108"],
"style": ["C0301", "C0303", "W0611", "C0114"],
"design": ["R0201", "R0902", "R0914", "W0107"],
}
rule_pool = rules_by_category.get(category, rules_by_category["bug"])
return rng.choice(rule_pool)
def _build_issue_message(issue: dict[str, Any]) -> str:
"""Build a plausible but intentionally vague scanner message for an issue."""
messages_by_category: dict[str, list[str]] = {
"security": [
"Potential security vulnerability detected in this expression",
"Sensitive data handling - review for exposure risk",
"Input not sanitised before use in sensitive operation",
"Hardcoded value detected; consider externalising to configuration",
],
"bug": [
"Potential logic error or incorrect operator usage",
"Variable assigned but possibly never read in all paths",
"Exception handling may suppress important errors",
"Index or range expression may be incorrect",
],
"performance": [
"Repeated operation inside loop - consider hoisting",
"Unbounded collection growth detected",
"Synchronous call inside async context",
"Sequential requests could be batched",
],
"style": [
"Code style violation detected",
"Unused identifier in scope",
"Magic number - consider named constant",
],
"design": [
"Resource may not be properly released",
"Thread or task lifecycle not managed",
"Retry logic missing backoff strategy",
],
}
category = issue.get("category", "bug")
message_pool = messages_by_category.get(category, messages_by_category["bug"])
# Prefix with the first few words of the issue description so the message
# is loosely tied to the real issue without being an exact keyword match.
description_prefix = " ".join(issue.get("description", "").split()[:6])
suffix_prefix = f"{description_prefix} - " if description_prefix else ""
# Use a separate RNG seeded on the issue id so this issue always maps to
# the same scanner message regardless of evaluation order or FP count.
# zlib.crc32 instead of builtin hash(): hash() of strings is salted with a
# per-process random value (PEP 456), so the same issue id would produce
# different messages across Python invocations, breaking the module's
# reproducibility-from-seed contract.
issue_id = issue.get("id", "")
issue_rng = random.Random(zlib.crc32(issue_id.encode("utf-8")))
return suffix_prefix + issue_rng.choice(message_pool)
def _map_severity_to_scanner_level(severity: str) -> str:
"""Map PRobe severity labels to scanner-style HIGH/MEDIUM/LOW levels."""
severity_map: dict[str, str] = {
"info": "LOW",
"warning": "MEDIUM",
"error": "HIGH",
"critical": "HIGH",
}
return severity_map.get(severity, "MEDIUM")