| | """ |
| | parse.py |
| | Translate .p/ commands to function calls in transformerOS |
| | |
| | This module handles the parsing and translation of the .p/ command language, |
| | transforming structured .p/ commands into executable function calls in the |
| | transformerOS framework. |
| | """ |
| |
|
| | import re |
| | import yaml |
| | import json |
| | import logging |
| | from typing import Dict, List, Optional, Tuple, Union, Any |
| | from pathlib import Path |
| |
|
| | |
| | log = logging.getLogger("transformerOS.pareto_lang.parse") |
| | log.setLevel(logging.INFO) |
| |
|
| | class CommandParser: |
| | """ |
| | Parser for the .p/ command language |
| | |
| | This class handles the parsing and translation of .p/ commands into |
| | executable function calls, enabling a consistent interface between |
| | the symbolic command language and the underlying system operations. |
| | """ |
| | |
| | def __init__(self, commands_file: Optional[str] = None): |
| | """ |
| | Initialize the command parser |
| | |
| | Parameters: |
| | ----------- |
| | commands_file : Optional[str] |
| | Path to the YAML file containing command definitions |
| | If None, uses the default commands.yaml in the same directory |
| | """ |
| | |
| | if commands_file is None: |
| | |
| | commands_file = Path(__file__).parent / "commands.yaml" |
| | |
| | self.commands = self._load_commands(commands_file) |
| | |
| | |
| | self.command_pattern = re.compile(r'\.p/([a-zA-Z_]+)\.([a-zA-Z_]+)(\{.*\})?') |
| | self.param_pattern = re.compile(r'([a-zA-Z_]+)\s*=\s*([^,}]+)') |
| | |
| | log.info(f"CommandParser initialized with {len(self.commands)} command definitions") |
| |
|
| | def _load_commands(self, commands_file: str) -> Dict: |
| | """Load command definitions from YAML file""" |
| | try: |
| | with open(commands_file, 'r') as f: |
| | commands = yaml.safe_load(f) |
| | |
| | |
| | if not isinstance(commands, dict) or "commands" not in commands: |
| | raise ValueError(f"Invalid command file format: {commands_file}") |
| | |
| | log.info(f"Loaded {len(commands['commands'])} command definitions from {commands_file}") |
| | return commands |
| | |
| | except Exception as e: |
| | log.error(f"Failed to load commands from {commands_file}: {e}") |
| | |
| | return {"commands": {}, "version": "unknown"} |
| |
|
| | def parse_command(self, command_str: str) -> Dict: |
| | """ |
| | Parse a .p/ command string into a structured command object |
| | |
| | Parameters: |
| | ----------- |
| | command_str : str |
| | The .p/ command string to parse |
| | |
| | Returns: |
| | -------- |
| | Dict containing the parsed command structure |
| | """ |
| | |
| | command_str = command_str.strip() |
| | |
| | |
| | if not command_str.startswith(".p/"): |
| | raise ValueError(f"Not a valid .p/ command: {command_str}") |
| | |
| | |
| | match = self.command_pattern.match(command_str) |
| | if not match: |
| | raise ValueError(f"Invalid command format: {command_str}") |
| | |
| | domain, operation, params_str = match.groups() |
| | |
| | |
| | params = {} |
| | if params_str: |
| | |
| | params_str = params_str.strip('{}') |
| | |
| | |
| | param_matches = self.param_pattern.findall(params_str) |
| | for param_name, param_value in param_matches: |
| | |
| | params[param_name] = self._parse_parameter_value(param_value) |
| | |
| | |
| | parsed_command = { |
| | "domain": domain, |
| | "operation": operation, |
| | "parameters": params, |
| | "original": command_str |
| | } |
| | |
| | log.debug(f"Parsed command: {parsed_command}") |
| | return parsed_command |
| |
|
| | def _parse_parameter_value(self, value_str: str) -> Any: |
| | """Parse a parameter value string into the appropriate type""" |
| | |
| | value_str = value_str.strip() |
| | |
| | |
| | if (value_str.startswith('"') and value_str.endswith('"')) or \ |
| | (value_str.startswith("'") and value_str.endswith("'")): |
| | return value_str[1:-1] |
| | |
| | |
| | try: |
| | |
| | if value_str.isdigit(): |
| | return int(value_str) |
| | |
| | |
| | return float(value_str) |
| | except ValueError: |
| | pass |
| | |
| | |
| | if value_str.lower() == "true": |
| | return True |
| | elif value_str.lower() == "false": |
| | return False |
| | |
| | |
| | if value_str.lower() == "null" or value_str.lower() == "none": |
| | return None |
| | |
| | |
| | if value_str.startswith('[') and value_str.endswith(']'): |
| | |
| | items = value_str[1:-1].split(',') |
| | return [self._parse_parameter_value(item) for item in items] |
| | |
| | |
| | if value_str.lower() == "complete": |
| | return "complete" |
| | |
| | |
| | return value_str |
| |
|
| | def validate_command(self, parsed_command: Dict) -> Tuple[bool, Optional[str]]: |
| | """ |
| | Validate a parsed command against command definitions |
| | |
| | Parameters: |
| | ----------- |
| | parsed_command : Dict |
| | The parsed command structure to validate |
| | |
| | Returns: |
| | -------- |
| | Tuple of (is_valid, error_message) |
| | """ |
| | domain = parsed_command["domain"] |
| | operation = parsed_command["operation"] |
| | parameters = parsed_command["parameters"] |
| | |
| | |
| | if domain not in self.commands["commands"]: |
| | return False, f"Unknown command domain: {domain}" |
| | |
| | |
| | domain_commands = self.commands["commands"][domain] |
| | if operation not in domain_commands: |
| | return False, f"Unknown operation '{operation}' in domain '{domain}'" |
| | |
| | |
| | command_def = domain_commands[operation] |
| | |
| | |
| | if "required_parameters" in command_def: |
| | for required_param in command_def["required_parameters"]: |
| | if required_param not in parameters: |
| | return False, f"Missing required parameter: {required_param}" |
| | |
| | |
| | if "parameters" in command_def: |
| | for param_name, param_value in parameters.items(): |
| | if param_name in command_def["parameters"]: |
| | expected_type = command_def["parameters"][param_name]["type"] |
| | |
| | |
| | if not self._validate_parameter_type(param_value, expected_type): |
| | return False, f"Parameter '{param_name}' has invalid type. Expected {expected_type}." |
| | |
| | return True, None |
| |
|
| | def _validate_parameter_type(self, value: Any, expected_type: str) -> bool: |
| | """Validate a parameter value against its expected type""" |
| | if expected_type == "string": |
| | return isinstance(value, str) |
| | elif expected_type == "int": |
| | return isinstance(value, int) |
| | elif expected_type == "float": |
| | return isinstance(value, (int, float)) |
| | elif expected_type == "bool": |
| | return isinstance(value, bool) |
| | elif expected_type == "list": |
| | return isinstance(value, list) |
| | elif expected_type == "any": |
| | return True |
| | elif expected_type == "string_or_int": |
| | return isinstance(value, (str, int)) |
| | elif expected_type == "null": |
| | return value is None |
| | else: |
| | |
| | return True |
| |
|
| | def get_function_mapping(self, parsed_command: Dict) -> Dict: |
| | """ |
| | Get the function mapping for a parsed command |
| | |
| | Parameters: |
| | ----------- |
| | parsed_command : Dict |
| | The parsed command structure |
| | |
| | Returns: |
| | -------- |
| | Dict containing function mapping information |
| | """ |
| | domain = parsed_command["domain"] |
| | operation = parsed_command["operation"] |
| | |
| | |
| | try: |
| | command_def = self.commands["commands"][domain][operation] |
| | except KeyError: |
| | log.warning(f"No function mapping found for {domain}.{operation}") |
| | return {"function": None, "module": None, "parameters": {}} |
| | |
| | |
| | function_mapping = command_def.get("function_mapping", {}) |
| | |
| | |
| | mapping = { |
| | "function": function_mapping.get("function", None), |
| | "module": function_mapping.get("module", None), |
| | "parameters": self._map_parameters(parsed_command["parameters"], function_mapping), |
| | "original_command": parsed_command["original"] |
| | } |
| | |
| | return mapping |
| |
|
| | def _map_parameters(self, cmd_params: Dict, function_mapping: Dict) -> Dict: |
| | """Map command parameters to function parameters based on mapping rules""" |
| | result_params = {} |
| | |
| | |
| | param_mapping = function_mapping.get("parameter_mapping", {}) |
| | for cmd_param, func_param in param_mapping.items(): |
| | if cmd_param in cmd_params: |
| | result_params[func_param] = cmd_params[cmd_param] |
| | |
| | |
| | for param_name, param_value in cmd_params.items(): |
| | if param_name not in param_mapping.values(): |
| | |
| | result_params[param_name] = param_value |
| | |
| | return result_params |
| |
|
| | def extract_commands(self, text: str) -> List[Dict]: |
| | """ |
| | Extract all .p/ commands from a text |
| | |
| | Parameters: |
| | ----------- |
| | text : str |
| | The text to extract commands from |
| | |
| | Returns: |
| | -------- |
| | List of parsed command dictionaries |
| | """ |
| | |
| | pattern = r'(\.p/[a-zA-Z_]+\.[a-zA-Z_]+(?:\{[^}]*\})?)' |
| | command_matches = re.findall(pattern, text) |
| | |
| | |
| | parsed_commands = [] |
| | for cmd_str in command_matches: |
| | try: |
| | parsed_cmd = self.parse_command(cmd_str) |
| | parsed_commands.append(parsed_cmd) |
| | except ValueError as e: |
| | log.warning(f"Failed to parse command '{cmd_str}': {e}") |
| | |
| | log.info(f"Extracted {len(parsed_commands)} commands from text") |
| | return parsed_commands |
| |
|
| |
|
| | |
| | def execute_parsed_command(parsed_mapping: Dict) -> Dict: |
| | """ |
| | Execute a parsed command mapping using dynamic imports |
| | |
| | Parameters: |
| | ----------- |
| | parsed_mapping : Dict |
| | The parsed function mapping to execute |
| | |
| | Returns: |
| | -------- |
| | Dict containing execution results |
| | """ |
| | function_name = parsed_mapping["function"] |
| | module_name = parsed_mapping["module"] |
| | parameters = parsed_mapping["parameters"] |
| | |
| | if not function_name or not module_name: |
| | raise ValueError("Invalid function mapping: missing function or module name") |
| | |
| | try: |
| | |
| | module = __import__(module_name, fromlist=[function_name]) |
| | |
| | |
| | function = getattr(module, function_name) |
| | |
| | |
| | result = function(**parameters) |
| | |
| | |
| | return { |
| | "status": "success", |
| | "result": result, |
| | "command": parsed_mapping["original_command"] |
| | } |
| | |
| | except ImportError as e: |
| | log.error(f"Failed to import module {module_name}: {e}") |
| | return { |
| | "status": "error", |
| | "error": f"Module not found: {module_name}", |
| | "command": parsed_mapping["original_command"] |
| | } |
| | |
| | except AttributeError as e: |
| | log.error(f"Function {function_name} not found in module {module_name}: {e}") |
| | return { |
| | "status": "error", |
| | "error": f"Function not found: {function_name}", |
| | "command": parsed_mapping["original_command"] |
| | } |
| | |
| | except Exception as e: |
| | log.error(f"Error executing function {function_name}: {e}") |
| | return { |
| | "status": "error", |
| | "error": str(e), |
| | "command": parsed_mapping["original_command"] |
| | } |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | import argparse |
| | |
| | parser = argparse.ArgumentParser(description="Parse .p/ commands") |
| | parser.add_argument("command", help=".p/ command to parse") |
| | parser.add_argument("--commands-file", help="Path to commands YAML file") |
| | parser.add_argument("--execute", action="store_true", help="Execute the parsed command") |
| | parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") |
| | |
| | args = parser.parse_args() |
| | |
| | |
| | if args.verbose: |
| | logging.basicConfig(level=logging.DEBUG) |
| | else: |
| | logging.basicConfig(level=logging.INFO) |
| | |
| | |
| | cmd_parser = CommandParser(args.commands_file) |
| | |
| | try: |
| | |
| | parsed_cmd = cmd_parser.parse_command(args.command) |
| | print("Parsed Command:") |
| | print(json.dumps(parsed_cmd, indent=2)) |
| | |
| | |
| | valid, error = cmd_parser.validate_command(parsed_cmd) |
| | if valid: |
| | print("Command validation: PASSED") |
| | else: |
| | print(f"Command validation: FAILED - {error}") |
| | |
| | |
| | func_mapping = cmd_parser.get_function_mapping(parsed_cmd) |
| | print("\nFunction Mapping:") |
| | print(json.dumps(func_mapping, indent=2)) |
| | |
| | |
| | if args.execute and valid: |
| | print("\nExecuting command...") |
| | result = execute_parsed_command(func_mapping) |
| | print("\nExecution Result:") |
| | print(json.dumps(result, indent=2)) |
| | |
| | except Exception as e: |
| | print(f"Error: {e}") |
| |
|