File size: 5,098 Bytes
fea1bd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# C:\Users\marco\agente_gemini\HASHIRU_6_1\tools\pyexec.py
from __future__ import annotations
import ast
import io
import textwrap
from contextlib import redirect_stdout
from typing import Any
from config import PY_UNSAFE_DEFAULT
from utils.audit import audit_event
SAFE_BUILTINS: dict[str, Any] = {
"print": print,
"len": len,
"range": range,
"min": min,
"max": max,
"sum": sum,
"enumerate": enumerate,
"sorted": sorted,
"int": int,
"float": float,
"str": str,
"bool": bool,
"list": list,
"dict": dict,
"set": set,
"tuple": tuple,
"abs": abs,
"round": round,
"zip": zip,
"any": any,
"all": all,
}
def _has_forbidden_nodes(tree: ast.AST) -> tuple[bool, str]:
"""Detecta importações e acessos perigosos no modo SAFE."""
for node in ast.walk(tree):
if isinstance(node, (ast.Import, ast.ImportFrom)):
return True, "Uso de import não é permitido no modo SAFE."
if isinstance(node, ast.Call) and isinstance(node.func, ast.Name):
if node.func.id in {"eval", "exec", "__import__", "open"}:
return True, f"Uso de {node.func.id} não é permitido no modo SAFE."
if isinstance(node, ast.Attribute) and isinstance(node.value, ast.Name):
# bloqueios básicos a módulos típicos
if node.value.id in {"os", "sys", "subprocess", "pathlib"}:
return True, f"Acesso a {node.value.id} não é permitido no modo SAFE."
return False, ""
def _parse_flags(args: str) -> dict[str, Any]:
"""
Interpreta flags:
--unsafe (exige token CONFIRMO)
--preview (não executa; apenas mostra o código detectado)
"""
tokens = args.split()
flags = {"unsafe": False, "preview": False, "confirmed": False, "rest": ""}
# Varre tokens preservando ordem e coletando o restante
rest = []
for t in tokens:
tl = t.lower()
if tl == "--unsafe":
flags["unsafe"] = True
elif tl == "--preview":
flags["preview"] = True
elif tl == "confirmo":
flags["confirmed"] = True
else:
rest.append(t)
flags["rest"] = " ".join(rest).strip()
return flags
def _normalize_block(block: str) -> str:
if not block:
return ""
# Remove indent acidental e quebras supérfluas
return textwrap.dedent(block).strip("\n\r ")
async def handle_py(args: str, block: str) -> str:
"""
/py <<<codigo>>> -> Modo SAFE
/py --unsafe CONFIRMO <<<...>>> -> Modo UNSAFE (permite import)
Regras:
- SAFE: sem import, sem eval/exec/__import__/open/os/sys/subprocess.
- UNSAFE: requer --unsafe e CONFIRMO.
- --preview: apenas retorna o código detectado (sem executar).
"""
flags = _parse_flags(args)
code = _normalize_block(block)
if flags["preview"]:
preview = code if code else "(vazio)"
return "**Preview de código (/py)**\n```\n" + preview[:6000] + "\n```"
if not code:
return "Uso: /py [--unsafe CONFIRMO] [--preview] <<<código Python>>>"
try:
if flags["unsafe"] or PY_UNSAFE_DEFAULT:
# Segurança: requer confirmação explícita
if not flags["confirmed"]:
return (
"⚠️ Modo UNSAFE requer confirmação explícita.\n"
"Use: `/py --unsafe CONFIRMO <<<seu código>>>`"
)
audit_event("pyexec_unsafe", {"preview": False})
# Execução "plena" (o usuário assume os riscos)
buf = io.StringIO()
glb: dict[str, Any] = {}
loc: dict[str, Any] = {}
with redirect_stdout(buf):
exec(code, glb, loc) # noqa: S102 - exec aprovado no modo UNSAFE
output = buf.getvalue()
if not output.strip():
output = "_(sem saída)_"
if len(output) > 6000:
output = output[:6000] + "\n...[truncado]..."
return "**/py (UNSAFE)**\n```\n" + output + "\n```"
# Modo SAFE
audit_event("pyexec_safe", {"preview": False})
# Parse/AST + checagens
tree = ast.parse(code, mode="exec")
bad, reason = _has_forbidden_nodes(tree)
if bad:
return f"🚫 Bloqueado no modo SAFE: {reason}"
buf = io.StringIO()
safe_globals = {"__builtins__": SAFE_BUILTINS}
safe_locals: dict[str, Any] = {}
with redirect_stdout(buf):
exec(compile(tree, "<safe>", "exec"), safe_globals, safe_locals)
output = buf.getvalue()
if not output.strip():
output = "_(sem saída)_"
if len(output) > 6000:
output = output[:6000] + "\n...[truncado]..."
return "**/py (SAFE)**\n```\n" + output + "\n```"
except SyntaxError as e:
return f"❌ Erro de sintaxe em /py: {e}"
except Exception as e:
return f"❌ Erro em /py: {e}"
# Alias para compatibilidade, caso o roteador espere outro nome
handle_pyexec = handle_py
|