#!/usr/bin/env python3 """Codette Tool System — Safe Local Tool Execution Gives Codette the ability to read files, search code, list directories, and run safe Python snippets. Tools are sandboxed and read-only by default. Tool Call Format (in Codette's output): tool_name(arg1, arg2) Tool Result (injected back into context): ...output... Architecture: 1. Codette generates text that may contain ... tags 2. Server parses out tool calls 3. Tools execute with safety limits 4. Results are fed back for a second generation pass """ import os import re import ast import json import subprocess import traceback from pathlib import Path from typing import Dict, List, Optional, Tuple, Any # ================================================================ # Safety Configuration # ================================================================ # Directories Codette is allowed to read from ALLOWED_ROOTS = [ Path(r"J:\codette-training-lab"), Path(r"C:\Users\Jonathan\Documents"), ] # File extensions Codette can read READABLE_EXTENSIONS = { ".py", ".js", ".ts", ".html", ".css", ".json", ".yaml", ".yml", ".md", ".txt", ".csv", ".toml", ".cfg", ".ini", ".sh", ".bat", ".bib", ".tex", ".log", ".jsonl", } # Max file size to read (prevent reading huge binaries) MAX_FILE_SIZE = 500_000 # 500KB # Max output length per tool result MAX_OUTPUT_LENGTH = 4000 # chars # Max lines for file reads MAX_LINES = 200 # Python execution timeout PYTHON_TIMEOUT = 10 # seconds # ================================================================ # Tool Registry # ================================================================ class ToolRegistry: """Registry of available tools with descriptions and handlers.""" def __init__(self): self.tools: Dict[str, dict] = {} self._register_defaults() def _register_defaults(self): """Register the built-in tool set.""" self.register("read_file", { "description": "Read a file's contents. Args: path (str), start_line (int, optional), end_line (int, optional)", "examples": [ 'read_file("inference/codette_server.py")', 'read_file("configs/adapter_registry.yaml", 1, 50)', ], "handler": tool_read_file, }) self.register("list_files", { "description": "List files in a directory. Args: path (str), pattern (str, optional)", "examples": [ 'list_files("inference/")', 'list_files("datasets/", "*.jsonl")', ], "handler": tool_list_files, }) self.register("search_code", { "description": "Search for a text pattern across files. Args: pattern (str), path (str, optional), file_ext (str, optional)", "examples": [ 'search_code("phase_coherence")', 'search_code("def route", "inference/", ".py")', ], "handler": tool_search_code, }) self.register("file_info", { "description": "Get file metadata (size, modified time, line count). Args: path (str)", "examples": [ 'file_info("paper/codette_paper.pdf")', ], "handler": tool_file_info, }) self.register("run_python", { "description": "Execute a short Python snippet and return output. For calculations, data processing, or quick checks. Args: code (str)", "examples": [ 'run_python("import math; print(math.pi * 2)")', 'run_python("print(sorted([3,1,4,1,5,9]))")', ], "handler": tool_run_python, }) self.register("project_summary", { "description": "Get an overview of the Codette project structure. No args.", "examples": [ 'project_summary()', ], "handler": tool_project_summary, }) def register(self, name: str, spec: dict): self.tools[name] = spec def get_descriptions(self) -> str: """Format tool descriptions for injection into system prompt.""" lines = ["Available tools (use name(args) to call):"] for name, spec in self.tools.items(): lines.append(f"\n {name}: {spec['description']}") for ex in spec.get("examples", []): lines.append(f" Example: {ex}") return "\n".join(lines) def execute(self, name: str, args: list, kwargs: dict) -> str: """Execute a tool by name with parsed arguments.""" if name not in self.tools: return f"Error: Unknown tool '{name}'. Available: {', '.join(self.tools.keys())}" handler = self.tools[name]["handler"] try: result = handler(*args, **kwargs) # Truncate if too long if len(result) > MAX_OUTPUT_LENGTH: result = result[:MAX_OUTPUT_LENGTH] + f"\n... (truncated, {len(result)} chars total)" return result except Exception as e: return f"Error executing {name}: {e}" # ================================================================ # Tool Call Parser # ================================================================ def parse_tool_calls(text: str) -> List[Tuple[str, list, dict]]: """Parse name(args) tags from generated text. Returns list of (tool_name, positional_args, keyword_args). """ pattern = r'\s*([\w]+)\s*\((.*?)\)\s*' matches = re.findall(pattern, text, re.DOTALL) calls = [] for name, args_str in matches: try: # Parse arguments safely using ast.literal_eval args, kwargs = _parse_args(args_str.strip()) calls.append((name, args, kwargs)) except Exception as e: calls.append((name, [args_str.strip()], {})) return calls def _parse_args(args_str: str) -> Tuple[list, dict]: """Safely parse function arguments string.""" if not args_str: return [], {} # Wrap in a tuple to parse as Python literal try: # Try parsing as a tuple of values parsed = ast.literal_eval(f"({args_str},)") return list(parsed), {} except (ValueError, SyntaxError): # If that fails, treat as a single string argument # Strip quotes if present cleaned = args_str.strip().strip('"').strip("'") return [cleaned], {} def strip_tool_calls(text: str) -> str: """Remove ... tags from text, leaving the rest.""" return re.sub(r'.*?', '', text, flags=re.DOTALL).strip() def has_tool_calls(text: str) -> bool: """Check if text contains any tool calls.""" return bool(re.search(r'', text)) # ================================================================ # Path Safety # ================================================================ def _resolve_path(path_str: str) -> Optional[Path]: """Resolve a path, ensuring it's within allowed roots.""" # Handle relative paths — resolve relative to project root p = Path(path_str) if not p.is_absolute(): p = ALLOWED_ROOTS[0] / p p = p.resolve() # Check against allowed roots for root in ALLOWED_ROOTS: try: p.relative_to(root.resolve()) return p except ValueError: continue return None # Not in any allowed root # ================================================================ # Tool Implementations # ================================================================ def tool_read_file(path: str, start_line: int = 1, end_line: int = None) -> str: """Read a file's contents with optional line range.""" resolved = _resolve_path(path) if resolved is None: return f"Error: Path '{path}' is outside allowed directories." if not resolved.exists(): return f"Error: File not found: {path}" if not resolved.is_file(): return f"Error: '{path}' is a directory, not a file. Use list_files() instead." # Check extension if resolved.suffix.lower() not in READABLE_EXTENSIONS: return f"Error: Cannot read {resolved.suffix} files. Supported: {', '.join(sorted(READABLE_EXTENSIONS))}" # Check size size = resolved.stat().st_size if size > MAX_FILE_SIZE: return f"Error: File too large ({size:,} bytes). Max: {MAX_FILE_SIZE:,} bytes." try: content = resolved.read_text(encoding='utf-8', errors='replace') except Exception as e: return f"Error reading file: {e}" lines = content.splitlines() total = len(lines) # Apply line range start = max(1, start_line) - 1 # Convert to 0-indexed end = min(end_line or total, start + MAX_LINES, total) selected = lines[start:end] # Format with line numbers numbered = [] for i, line in enumerate(selected, start=start + 1): numbered.append(f"{i:4d} | {line}") header = f"File: {path} ({total} lines total)" if start > 0 or end < total: header += f" [showing lines {start+1}-{end}]" return header + "\n" + "\n".join(numbered) def tool_list_files(path: str = ".", pattern: str = None) -> str: """List files in a directory with optional glob pattern.""" resolved = _resolve_path(path) if resolved is None: return f"Error: Path '{path}' is outside allowed directories." if not resolved.exists(): return f"Error: Directory not found: {path}" if not resolved.is_dir(): return f"Error: '{path}' is a file, not a directory. Use read_file() instead." try: if pattern: entries = sorted(resolved.glob(pattern)) else: entries = sorted(resolved.iterdir()) result = [f"Directory: {path}"] for entry in entries[:100]: # Limit to 100 entries rel = entry.relative_to(resolved) if entry.is_dir(): result.append(f" [DIR] {rel}/") else: size = entry.stat().st_size if size >= 1024 * 1024: size_str = f"{size / 1024 / 1024:.1f}MB" elif size >= 1024: size_str = f"{size / 1024:.1f}KB" else: size_str = f"{size}B" result.append(f" [FILE] {rel} ({size_str})") if len(entries) > 100: result.append(f" ... and {len(entries) - 100} more") return "\n".join(result) except Exception as e: return f"Error listing directory: {e}" def tool_search_code(pattern: str, path: str = ".", file_ext: str = None) -> str: """Search for a text pattern in files.""" resolved = _resolve_path(path) if resolved is None: return f"Error: Path '{path}' is outside allowed directories." if not resolved.exists(): return f"Error: Path not found: {path}" # Determine glob pattern if file_ext: if not file_ext.startswith("."): file_ext = "." + file_ext glob = f"**/*{file_ext}" else: glob = "**/*" results = [] files_searched = 0 matches_found = 0 try: search_root = resolved if resolved.is_dir() else resolved.parent for filepath in search_root.glob(glob): if not filepath.is_file(): continue if filepath.suffix.lower() not in READABLE_EXTENSIONS: continue if filepath.stat().st_size > MAX_FILE_SIZE: continue # Skip hidden dirs, __pycache__, node_modules, .git parts = filepath.parts if any(p.startswith('.') or p in ('__pycache__', 'node_modules', '.git') for p in parts): continue files_searched += 1 try: content = filepath.read_text(encoding='utf-8', errors='replace') for line_num, line in enumerate(content.splitlines(), 1): if pattern.lower() in line.lower(): rel = filepath.relative_to(search_root) results.append(f" {rel}:{line_num}: {line.strip()[:120]}") matches_found += 1 if matches_found >= 50: # Limit results break except Exception: continue if matches_found >= 50: break except Exception as e: return f"Error searching: {e}" header = f"Search: '{pattern}' in {path} ({matches_found} matches in {files_searched} files)" if not results: return header + "\n No matches found." return header + "\n" + "\n".join(results) def tool_file_info(path: str) -> str: """Get file metadata.""" resolved = _resolve_path(path) if resolved is None: return f"Error: Path '{path}' is outside allowed directories." if not resolved.exists(): return f"Error: File not found: {path}" stat = resolved.stat() import time mtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.st_mtime)) info = [ f"File: {path}", f" Size: {stat.st_size:,} bytes ({stat.st_size / 1024:.1f} KB)", f" Modified: {mtime}", f" Type: {'directory' if resolved.is_dir() else resolved.suffix or 'no extension'}", ] # Line count for text files if resolved.is_file() and resolved.suffix.lower() in READABLE_EXTENSIONS: try: lines = resolved.read_text(encoding='utf-8', errors='replace').count('\n') + 1 info.append(f" Lines: {lines:,}") except Exception: pass return "\n".join(info) def tool_run_python(code: str) -> str: """Run a Python snippet safely with timeout.""" import sys # Basic safety checks dangerous = ['import os', 'import sys', 'subprocess', 'shutil.rmtree', 'os.remove', 'os.unlink', '__import__', 'eval(', 'exec(', 'open(', 'write(', 'pathlib'] for d in dangerous: if d in code and 'print' not in code.split(d)[0].split('\n')[-1]: # Allow if it's inside a print statement string if f'"{d}"' not in code and f"'{d}'" not in code: return f"Error: '{d}' is not allowed in run_python for safety. Use read_file/search_code for file operations." try: result = subprocess.run( [r"J:\python.exe", "-c", code], capture_output=True, text=True, timeout=PYTHON_TIMEOUT, env={**os.environ, "PYTHONPATH": r"J:\Lib\site-packages"}, ) output = result.stdout if result.stderr: output += "\nSTDERR: " + result.stderr if not output.strip(): output = "(no output)" return output.strip() except subprocess.TimeoutExpired: return f"Error: Code execution timed out after {PYTHON_TIMEOUT}s." except Exception as e: return f"Error running code: {e}" def tool_project_summary() -> str: """Generate a quick project structure overview.""" root = ALLOWED_ROOTS[0] summary = ["Codette Training Lab — Project Structure\n"] # Key directories key_dirs = [ ("configs/", "Configuration files (adapter registry, pipeline config)"), ("datasets/", "Training data — perspective-tagged JSONL files"), ("dataset_engine/", "Dataset generation pipeline"), ("evaluation/", "Evaluation scripts and benchmarks"), ("inference/", "Local inference server + web UI"), ("paper/", "Academic paper (LaTeX, PDF, BibTeX)"), ("reasoning_forge/", "Core RC+xi engine, spiderweb, cocoon sync"), ("research/", "Research docs, experiments, DreamReweaver"), ("scripts/", "Training and pipeline scripts"), ("adapters/", "GGUF LoRA adapter files for llama.cpp"), ] for dirname, desc in key_dirs: dirpath = root / dirname if dirpath.exists(): count = sum(1 for _ in dirpath.rglob("*") if _.is_file()) summary.append(f" [DIR] {dirname:<30s} {desc} ({count} files)") # Key files summary.append("\nKey Files:") key_files = [ "HOWTO.md", "configs/adapter_registry.yaml", "inference/codette_server.py", "inference/codette_orchestrator.py", "reasoning_forge/quantum_spiderweb.py", "reasoning_forge/epistemic_metrics.py", "paper/codette_paper.tex", ] for f in key_files: fp = root / f if fp.exists(): size = fp.stat().st_size summary.append(f" [FILE] {f} ({size / 1024:.1f} KB)") return "\n".join(summary) # ================================================================ # Tool-Augmented System Prompt # ================================================================ TOOL_PROMPT_SUFFIX = """ TOOLS: You can read files, search code, and run calculations. When a user asks about code, files, or the project, you MUST use tools to look things up rather than guessing. Format: tool_name("arg1", "arg2") {tool_descriptions} RULES: 1. If the user asks about a file, config, or code: ALWAYS call read_file or search_code FIRST 2. If the user asks "show me" or "what is": call the relevant tool FIRST, then explain 3. For general conversation or reasoning: respond normally without tools 4. Start your response with the tool call on the very first line """ def build_tool_system_prompt(base_prompt: str, registry: ToolRegistry) -> str: """Augment a system prompt with tool-use instructions.""" return base_prompt + TOOL_PROMPT_SUFFIX.format( tool_descriptions=registry.get_descriptions() ) # ================================================================ # Quick Test # ================================================================ if __name__ == "__main__": print("Testing Codette Tools...\n") registry = ToolRegistry() print(registry.get_descriptions()) print("\n--- Test: read_file ---") print(tool_read_file("configs/adapter_registry.yaml", 1, 10)) print("\n--- Test: list_files ---") print(tool_list_files("inference/")) print("\n--- Test: search_code ---") print(tool_search_code("phase_coherence", "reasoning_forge/", ".py")) print("\n--- Test: file_info ---") print(tool_file_info("paper/codette_paper.pdf")) print("\n--- Test: run_python ---") print(tool_run_python("print(2 ** 10)")) print("\n--- Test: project_summary ---") print(tool_project_summary()) print("\n--- Test: parse_tool_calls ---") test = 'Let me check that. read_file("configs/adapter_registry.yaml", 1, 20) And also search_code("AEGIS")' calls = parse_tool_calls(test) for name, args, kwargs in calls: print(f" Call: {name}({args})") print("\nDone!")