| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """ |
| Policy Engine β Cricket-shaped gating layer for Faux_Clawdbot. |
| |
| Rim: immutable constitutional constraints (code, not config). |
| Mesh: evolving gating decisions (static bootstrap for now). |
| Audit: persistent JSONL log of every tool call and its outcome. |
| """ |
|
|
| import json |
| import os |
| import re |
| import time |
| from pathlib import Path |
|
|
| |
| |
| |
|
|
| _AUDIT_DIR = Path(__file__).resolve().parent / "data" / "audit" |
|
|
|
|
| def _ensure_audit_dir() -> None: |
| _AUDIT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
| def _audit_log( |
| tool_name: str, |
| args: dict, |
| allowed: bool, |
| reason: str, |
| ) -> None: |
| """Append one JSON line to the audit log. |
| |
| Content values longer than 200 characters are truncated in the log entry |
| to avoid dumping huge payloads into the audit trail. |
| """ |
| _ensure_audit_dir() |
|
|
| sanitized_args = {} |
| for k, v in args.items(): |
| if isinstance(v, str) and len(v) > 200: |
| sanitized_args[k] = v[:200] + "...[truncated]" |
| else: |
| sanitized_args[k] = v |
|
|
| entry = { |
| "timestamp": time.time(), |
| "iso_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| "tool": tool_name, |
| "args": sanitized_args, |
| "allowed": allowed, |
| "reason": reason, |
| } |
|
|
| log_path = _AUDIT_DIR / "policy.jsonl" |
| with open(log_path, "a", encoding="utf-8") as f: |
| f.write(json.dumps(entry, default=str) + "\n") |
|
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| |
| _SENSITIVE_PATTERNS: tuple[re.Pattern, ...] = ( |
| re.compile(r"\.env$", re.IGNORECASE), |
| re.compile(r"\.env\.", re.IGNORECASE), |
| re.compile(r".*\.key$", re.IGNORECASE), |
| re.compile(r"^credentials\..*", re.IGNORECASE), |
| ) |
|
|
| |
| _SHELL_ALLOWLIST: tuple[str, ...] = ( |
| "python", |
| "pip", |
| "pytest", |
| "git", |
| "npm", |
| "node", |
| "ls", |
| "grep", |
| "find", |
| "wc", |
| "head", |
| "tail", |
| "diff", |
| ) |
|
|
| |
| _SECRET_PATTERNS: tuple[re.Pattern, ...] = ( |
| re.compile(r"sk-[A-Za-z0-9]{20,}"), |
| re.compile(r"ghp_[A-Za-z0-9]{36,}"), |
| re.compile(r"gho_[A-Za-z0-9]{36,}"), |
| re.compile(r"ghs_[A-Za-z0-9]{36,}"), |
| re.compile(r"ghr_[A-Za-z0-9]{36,}"), |
| re.compile(r"AKIA[0-9A-Z]{16}"), |
| re.compile(r"Bearer\s+[A-Za-z0-9\-._~+/]+=*", re.IGNORECASE), |
| re.compile(r"xox[bpras]-[A-Za-z0-9\-]+"), |
| re.compile(r"sk-ant-[A-Za-z0-9\-]{20,}"), |
| re.compile(r"eyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\."), |
| ) |
|
|
|
|
| def can_access_path( |
| path: str, |
| mode: str, |
| workspace: Path, |
| read_only_paths: list[Path] | None = None, |
| ) -> tuple[bool, str]: |
| """Check whether *path* is allowed for the given *mode* within *workspace*. |
| |
| Parameters |
| ---------- |
| path : str |
| The filesystem path the tool wants to access. |
| mode : str |
| ``"read"`` or ``"write"``. |
| workspace : Path |
| The root directory the bot is allowed to operate within. |
| read_only_paths : list[Path] | None |
| Additional directories that are readable but not writable (e.g. sibling |
| ecosystem repos). Only consulted when the path falls outside *workspace* |
| and mode is ``"read"``. |
| |
| Returns |
| ------- |
| tuple[bool, str] |
| (allowed, reason) |
| """ |
| try: |
| p = Path(path) |
| |
| resolved = (workspace / p).resolve() if not p.is_absolute() else p.resolve() |
| except (OSError, ValueError) as exc: |
| return False, f"Path resolution failed: {exc}" |
|
|
| workspace_resolved = workspace.resolve() |
|
|
| |
| try: |
| resolved.relative_to(workspace_resolved) |
| except ValueError: |
| |
| if mode == "read" and read_only_paths: |
| for ro_root in read_only_paths: |
| try: |
| resolved.relative_to(Path(ro_root).resolve()) |
| return True, f"Read permitted via external allowlist: {ro_root}" |
| except ValueError: |
| continue |
| return False, ( |
| f"Path escapes workspace. Resolved path '{resolved}' " |
| f"is not inside '{workspace_resolved}'." |
| ) |
|
|
| |
| if mode == "write": |
| filename = resolved.name |
| for pat in _SENSITIVE_PATTERNS: |
| if pat.search(filename): |
| return False, ( |
| f"Write denied β '{filename}' matches sensitive file " |
| f"pattern '{pat.pattern}'." |
| ) |
|
|
| return True, "Path access permitted." |
|
|
|
|
| def can_execute_shell(command: str) -> tuple[bool, str]: |
| """Check whether *command* is on the shell allowlist. |
| |
| Only the first token (the binary name) is checked against the allowlist. |
| |
| Returns |
| ------- |
| tuple[bool, str] |
| (allowed, reason) |
| """ |
| stripped = command.strip() |
| if not stripped: |
| return False, "Empty command denied." |
|
|
| |
| |
| tokens = stripped.split() |
| while tokens and "=" in tokens[0] and not tokens[0].startswith("="): |
| tokens = tokens[1:] |
| if not tokens: |
| return False, "Command is only env var assignments β no binary found." |
|
|
| |
| first_token = tokens[0] |
| |
| binary = os.path.basename(first_token) |
|
|
| for allowed_prefix in _SHELL_ALLOWLIST: |
| |
| if binary == allowed_prefix or binary.startswith(allowed_prefix): |
| return True, f"Shell command permitted (matched '{allowed_prefix}')." |
|
|
| return False, ( |
| f"Shell command denied β '{binary}' is not on the allowlist. " |
| f"Allowed: {', '.join(_SHELL_ALLOWLIST)}." |
| ) |
|
|
|
|
| def can_write_content(path: str, content: str) -> tuple[bool, str]: |
| """Check whether *content* to be written to *path* contains secrets. |
| |
| Returns |
| ------- |
| tuple[bool, str] |
| (allowed, reason) |
| """ |
| for pat in _SECRET_PATTERNS: |
| match = pat.search(content) |
| if match: |
| |
| snippet_start = max(0, match.start() - 10) |
| snippet_end = min(len(content), match.end() + 5) |
| context_hint = content[snippet_start:match.start()] + "***REDACTED***" |
| return False, ( |
| f"Content contains what appears to be a secret or token " |
| f"(pattern: '{pat.pattern}'). Context: ...{context_hint}..." |
| ) |
|
|
| return True, "Content approved β no secret patterns detected." |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| _GATED_TOOLS: frozenset[str] = frozenset({ |
| "write_file", |
| "edit_file", |
| "shell_execute", |
| "push_to_github", |
| "pull_from_github", |
| "notebook_add", |
| "notebook_delete", |
| "create_shadow_branch", |
| }) |
|
|
|
|
| |
| |
| _QB_DISPATCHED = os.getenv("QB_DISPATCHED", "").lower() in ("1", "true", "yes") |
|
|
|
|
| def should_gate_for_review(tool_name: str, args: dict) -> bool: |
| """Return ``True`` if *tool_name* should be held for review. |
| |
| Under QB authority: auto-execute everything (QB's hooks enforce). |
| Standalone: mutating tools are staged for human review. |
| """ |
| if _QB_DISPATCHED: |
| return False |
| return tool_name in _GATED_TOOLS |
|
|
|
|
| |
| |
| |
|
|
| def check_tool_call( |
| tool_name: str, |
| args: dict, |
| workspace: Path, |
| read_only_paths: list[Path] | None = None, |
| ) -> tuple[bool, str]: |
| """Run Rim checks applicable to *tool_name* and log the result. |
| |
| This is the main entry point callers should use. It dispatches to the |
| appropriate Rim checks based on tool semantics, logs the outcome, and |
| returns the verdict. |
| |
| Returns |
| ------- |
| tuple[bool, str] |
| (allowed, reason) |
| """ |
| allowed = True |
| reason = "Permitted." |
|
|
| |
| if tool_name in ("write_file", "edit_file", "read_file", "notebook_add", "notebook_delete"): |
| path = args.get("path", args.get("file_path", "")) |
| mode = "write" if tool_name not in ("read_file",) else "read" |
| allowed, reason = can_access_path(path, mode, workspace, read_only_paths) |
|
|
| |
| if allowed and tool_name == "write_file": |
| content = args.get("content", "") |
| path = args.get("path", args.get("file_path", "")) |
| allowed, reason = can_write_content(path, content) |
|
|
| if allowed and tool_name == "edit_file": |
| new_text = args.get("new_text", "") |
| path = args.get("path", args.get("file_path", "")) |
| allowed, reason = can_write_content(path, new_text) |
|
|
| |
| if allowed and tool_name == "shell_execute": |
| command = args.get("command", "") |
| allowed, reason = can_execute_shell(command) |
|
|
| _audit_log(tool_name, args, allowed, reason) |
| return allowed, reason |
|
|