debugZero / server /bug_injector.py
The-Fool-09's picture
Upload folder using huggingface_hub
3ba81b5 verified
import ast
import random
import copy
BUILTIN_PAIRS = {
"min": "max", "max": "min",
"any": "all", "all": "any",
"sum": "len", "len": "sum"
}
BLOCKED_IMPORTS = ["os", "sys", "subprocess", "shutil", "pathlib"]
def is_safe_injection(code: str) -> bool:
for blocked in BLOCKED_IMPORTS:
if f"import {blocked}" in code or f"from {blocked}" in code:
return False
return True
class BugInjectorVisitor(ast.NodeTransformer):
def __init__(self, target_operator: str):
super().__init__()
self.target_operator = target_operator
self.mutated = False
def visit_Constant(self, node):
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "off_by_one" and isinstance(node.value, int) and not isinstance(node.value, bool):
shift = random.choice([-1, 1])
node.value += shift
self.mutated = True
return node
return node
def visit_Compare(self, node):
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "wrong_operator":
if isinstance(node.ops[0], ast.Lt):
node.ops[0] = ast.GtE()
self.mutated = True
elif isinstance(node.ops[0], ast.LtE):
node.ops[0] = ast.Gt()
self.mutated = True
elif isinstance(node.ops[0], ast.Gt):
node.ops[0] = ast.LtE()
self.mutated = True
elif isinstance(node.ops[0], ast.GtE):
node.ops[0] = ast.Lt()
self.mutated = True
elif isinstance(node.ops[0], ast.Eq):
node.ops[0] = ast.NotEq()
self.mutated = True
elif isinstance(node.ops[0], ast.NotEq):
node.ops[0] = ast.Eq()
self.mutated = True
return node
def visit_BinOp(self, node):
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "wrong_operator":
if isinstance(node.op, ast.Add):
node.op = ast.Sub()
self.mutated = True
elif isinstance(node.op, ast.Sub):
node.op = ast.Add()
self.mutated = True
elif isinstance(node.op, ast.Mult):
node.op = ast.FloorDiv()
self.mutated = True
elif isinstance(node.op, ast.Div):
node.op = ast.Mult()
self.mutated = True
return node
def visit_Call(self, node):
self.generic_visit(node)
if self.mutated:
return node
if isinstance(node.func, ast.Name):
# wrong built-in
if self.target_operator == "wrong_builtin" and node.func.id in BUILTIN_PAIRS:
node.func.id = BUILTIN_PAIRS[node.func.id]
self.mutated = True
# loop boundary shift
elif self.target_operator == "loop_boundary_shift" and node.func.id == "range":
if len(node.args) == 1:
# `range(n)` -> `range(n+1)`
node.args[0] = ast.BinOp(left=node.args[0], op=ast.Add(), right=ast.Constant(value=1))
self.mutated = True
elif len(node.args) == 2:
# `range(a, b)` -> `range(a-1, b)`
node.args[0] = ast.BinOp(left=node.args[0], op=ast.Sub(), right=ast.Constant(value=1))
self.mutated = True
return node
def visit_If(self, node):
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "condition_negation":
# `if x > 0` -> `if not x > 0`
node.test = ast.UnaryOp(op=ast.Not(), operand=node.test)
self.mutated = True
if self.target_operator == "missing_base_case":
for idx, child in enumerate(node.body):
if isinstance(child, ast.Return):
node.body[idx] = ast.Pass()
self.mutated = True
break
return node
def visit_Slice(self, node):
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "slice_boundary_corruption":
if node.lower is not None:
node.lower = ast.BinOp(left=node.lower, op=ast.Add(), right=ast.Constant(value=1))
self.mutated = True
elif node.upper is not None:
node.upper = ast.BinOp(left=node.upper, op=ast.Sub(), right=ast.Constant(value=1))
self.mutated = True
return node
def visit_Name(self, node):
self.generic_visit(node)
return node
def visit_Assign(self, node):
# variable swap strategy applied by exchanging two target assign variables when there are multiple targets
self.generic_visit(node)
if self.mutated:
return node
if self.target_operator == "variable_swap" and getattr(node, "targets", None):
if isinstance(node.targets[0], ast.Tuple) and len(node.targets[0].elts) >= 2:
# swap a, b = x, y -> b, a = x, y
node.targets[0].elts[0], node.targets[0].elts[1] = node.targets[0].elts[1], node.targets[0].elts[0]
self.mutated = True
return node
def inject_bug(original_code: str, proposed_operator: str) -> tuple[str, bool]:
"""
4 critical checks:
- parse succeeds
- mutation actually changed code
- blocked imports checked
- built pairs correctly swapped
"""
try:
tree = ast.parse(original_code)
except SyntaxError:
return original_code, False
injector = BugInjectorVisitor(proposed_operator)
mutated_tree = injector.visit(copy.deepcopy(tree))
ast.fix_missing_locations(mutated_tree)
mutated_code = ast.unparse(mutated_tree)
# 2. Check if mutation actually changed something
if mutated_code.strip() == original_code.strip():
return original_code, False
# 3. Blocked imports
if not is_safe_injection(mutated_code):
return original_code, False
# 4. AST parsing check
try:
ast.parse(mutated_code)
except SyntaxError:
return original_code, False
return mutated_code, True
def infer_bug_operator(original_code: str, candidate_code: str) -> str | None:
try:
original_tree = ast.parse(original_code)
candidate_tree = ast.parse(candidate_code)
except SyntaxError:
return None
if ast.dump(original_tree) == ast.dump(candidate_tree):
return None
original_nodes = list(ast.walk(original_tree))
candidate_nodes = list(ast.walk(candidate_tree))
inferred = (
_infer_wrong_builtin(original_nodes, candidate_nodes)
or _infer_loop_boundary_shift(original_nodes, candidate_nodes)
or _infer_slice_boundary_corruption(original_nodes, candidate_nodes)
or _infer_condition_negation(original_nodes, candidate_nodes)
or _infer_wrong_operator(original_nodes, candidate_nodes)
or _infer_off_by_one(original_nodes, candidate_nodes)
)
return inferred
def _infer_wrong_builtin(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if not isinstance(original_node, ast.Call) or not isinstance(candidate_node, ast.Call):
continue
if not isinstance(original_node.func, ast.Name) or not isinstance(candidate_node.func, ast.Name):
continue
expected = BUILTIN_PAIRS.get(original_node.func.id)
if expected and candidate_node.func.id == expected:
return "wrong_builtin"
return None
def _infer_loop_boundary_shift(
original_nodes: list[ast.AST],
candidate_nodes: list[ast.AST],
) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if not isinstance(original_node, ast.Call) or not isinstance(candidate_node, ast.Call):
continue
if not isinstance(original_node.func, ast.Name) or original_node.func.id != "range":
continue
if not isinstance(candidate_node.func, ast.Name) or candidate_node.func.id != "range":
continue
if len(original_node.args) != len(candidate_node.args):
continue
for original_arg, candidate_arg in zip(original_node.args, candidate_node.args):
if _is_shifted_by_one(original_arg, candidate_arg):
return "loop_boundary_shift"
return None
def _infer_slice_boundary_corruption(
original_nodes: list[ast.AST],
candidate_nodes: list[ast.AST],
) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if not isinstance(original_node, ast.Slice) or not isinstance(candidate_node, ast.Slice):
continue
if original_node.lower is not None and candidate_node.lower is not None:
if _is_shifted_by_one(original_node.lower, candidate_node.lower):
return "slice_boundary_corruption"
if original_node.upper is not None and candidate_node.upper is not None:
if _is_shifted_by_one(original_node.upper, candidate_node.upper):
return "slice_boundary_corruption"
return None
def _infer_condition_negation(
original_nodes: list[ast.AST],
candidate_nodes: list[ast.AST],
) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if not isinstance(original_node, ast.If) or not isinstance(candidate_node, ast.If):
continue
if (
isinstance(candidate_node.test, ast.UnaryOp)
and isinstance(candidate_node.test.op, ast.Not)
and ast.dump(candidate_node.test.operand) == ast.dump(original_node.test)
):
return "condition_negation"
return None
def _infer_wrong_operator(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if isinstance(original_node, ast.Compare) and isinstance(candidate_node, ast.Compare):
if original_node.ops and candidate_node.ops and type(original_node.ops[0]) is not type(candidate_node.ops[0]):
return "wrong_operator"
if isinstance(original_node, ast.BinOp) and isinstance(candidate_node, ast.BinOp):
if type(original_node.op) is not type(candidate_node.op):
return "wrong_operator"
return None
def _infer_off_by_one(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None:
for original_node, candidate_node in zip(original_nodes, candidate_nodes):
if not isinstance(original_node, ast.Constant) or not isinstance(candidate_node, ast.Constant):
continue
if isinstance(original_node.value, bool) or isinstance(candidate_node.value, bool):
continue
if isinstance(original_node.value, int) and isinstance(candidate_node.value, int):
if abs(candidate_node.value - original_node.value) == 1:
return "off_by_one"
return None
def _is_shifted_by_one(original_node: ast.AST, candidate_node: ast.AST) -> bool:
if not isinstance(candidate_node, ast.BinOp):
return False
if ast.dump(candidate_node.left) != ast.dump(original_node):
return False
if not isinstance(candidate_node.right, ast.Constant) or candidate_node.right.value != 1:
return False
return isinstance(candidate_node.op, (ast.Add, ast.Sub))