Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Configuration Navigator for Operations System | |
| This script helps you navigate and understand your config-driven architecture. | |
| Use it to explore, validate, and modify configurations at both global and tool levels. | |
| Usage: | |
| python scripts/config_navigator.py --help | |
| python scripts/config_navigator.py --list-all | |
| python scripts/config_navigator.py --show agent.response_templates | |
| python scripts/config_navigator.py --show omirl.tasks | |
| """ | |
| import os | |
| import sys | |
| import yaml | |
| import argparse | |
| from pathlib import Path | |
| from typing import Dict, Any, List | |
| class ConfigNavigator: | |
| """Navigate the config-driven architecture""" | |
| def __init__(self, project_root: Path = None): | |
| if project_root is None: | |
| project_root = Path(__file__).parent.parent | |
| self.project_root = project_root | |
| self.agent_config_dir = project_root / "agent" / "config" | |
| self.tools_dir = project_root / "tools" | |
| def list_all_configs(self) -> Dict[str, List[str]]: | |
| """List all available configuration files""" | |
| configs = { | |
| "π Global Agent Configs": [], | |
| "π§ Tool-Specific Configs": [] | |
| } | |
| # Global configs | |
| if self.agent_config_dir.exists(): | |
| for config_file in self.agent_config_dir.glob("*.yaml"): | |
| configs["π Global Agent Configs"].append(f"agent.{config_file.stem}") | |
| # Tool configs | |
| if self.tools_dir.exists(): | |
| for tool_dir in self.tools_dir.iterdir(): | |
| if tool_dir.is_dir(): | |
| tool_config_dir = tool_dir / "config" | |
| if tool_config_dir.exists(): | |
| for config_file in tool_config_dir.glob("*.yaml"): | |
| configs["π§ Tool-Specific Configs"].append(f"{tool_dir.name}.{config_file.stem}") | |
| return configs | |
| def load_config(self, config_path: str) -> Dict[str, Any]: | |
| """Load a specific configuration file""" | |
| try: | |
| if "." not in config_path: | |
| raise ValueError("Config path must be in format 'namespace.config_name'") | |
| namespace, config_name = config_path.split(".", 1) | |
| if namespace == "agent": | |
| file_path = self.agent_config_dir / f"{config_name}.yaml" | |
| else: | |
| # Assume it's a tool name | |
| file_path = self.tools_dir / namespace / "config" / f"{config_name}.yaml" | |
| if not file_path.exists(): | |
| raise FileNotFoundError(f"Config file not found: {file_path}") | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| return yaml.safe_load(f) | |
| except Exception as e: | |
| print(f"β Error loading config '{config_path}': {e}") | |
| return {} | |
| def show_config_structure(self, config_path: str, max_depth: int = 3): | |
| """Show the structure of a configuration file""" | |
| config = self.load_config(config_path) | |
| if not config: | |
| return | |
| print(f"\nπ **Configuration: {config_path}**") | |
| print("=" * 60) | |
| self._print_dict_structure(config, max_depth=max_depth) | |
| def _print_dict_structure(self, obj: Any, indent: int = 0, max_depth: int = 3, current_depth: int = 0): | |
| """Recursively print dictionary structure""" | |
| if current_depth >= max_depth: | |
| print(" " * indent + "...") | |
| return | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| if isinstance(value, dict): | |
| print(" " * indent + f"π {key}:") | |
| self._print_dict_structure(value, indent + 1, max_depth, current_depth + 1) | |
| elif isinstance(value, list): | |
| print(" " * indent + f"π {key}: [{len(value)} items]") | |
| if value and current_depth < max_depth - 1: | |
| if isinstance(value[0], str): | |
| # Show first few string items | |
| sample = value[:3] | |
| print(" " * (indent + 1) + f"β³ {sample}{'...' if len(value) > 3 else ''}") | |
| else: | |
| self._print_dict_structure(value[0], indent + 1, max_depth, current_depth + 1) | |
| else: | |
| value_str = str(value) | |
| if len(value_str) > 50: | |
| value_str = value_str[:47] + "..." | |
| print(" " * indent + f"π {key}: {value_str}") | |
| elif isinstance(obj, list) and obj: | |
| for i, item in enumerate(obj[:3]): # Show first 3 items | |
| print(" " * indent + f"[{i}]:") | |
| self._print_dict_structure(item, indent + 1, max_depth, current_depth + 1) | |
| if len(obj) > 3: | |
| print(" " * indent + f"... and {len(obj) - 3} more items") | |
| def find_in_configs(self, search_term: str) -> List[tuple]: | |
| """Search for a term across all configurations""" | |
| results = [] | |
| all_configs = self.list_all_configs() | |
| for category, configs in all_configs.items(): | |
| for config_path in configs: | |
| config = self.load_config(config_path) | |
| if self._search_in_dict(config, search_term.lower()): | |
| results.append((config_path, category)) | |
| return results | |
| def _search_in_dict(self, obj: Any, search_term: str) -> bool: | |
| """Recursively search for a term in a dictionary""" | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| if search_term in key.lower() or self._search_in_dict(value, search_term): | |
| return True | |
| elif isinstance(obj, list): | |
| for item in obj: | |
| if self._search_in_dict(item, search_term): | |
| return True | |
| elif isinstance(obj, str): | |
| return search_term in obj.lower() | |
| return False | |
| def validate_configs(self) -> Dict[str, List[str]]: | |
| """Validate all configuration files""" | |
| results = {"β Valid": [], "β Invalid": []} | |
| all_configs = self.list_all_configs() | |
| for category, configs in all_configs.items(): | |
| for config_path in configs: | |
| try: | |
| config = self.load_config(config_path) | |
| if config: | |
| results["β Valid"].append(config_path) | |
| else: | |
| results["β Invalid"].append(f"{config_path} (empty)") | |
| except Exception as e: | |
| results["β Invalid"].append(f"{config_path} ({str(e)})") | |
| return results | |
| def show_config_relationships(self): | |
| """Show how configurations relate to each other""" | |
| print("\nπ **Configuration Relationships**") | |
| print("=" * 60) | |
| print("\nπ **Control Flow:**") | |
| print("1. π£οΈ User Request") | |
| print("2. π§ llm_router_config.yaml β Route to tool") | |
| print("3. π§ tool_registry.yaml β Find tool capabilities") | |
| print("4. βοΈ tool/config/*.yaml β Execute with parameters") | |
| print("5. π response_templates.yaml β Format response") | |
| print("\nπ **Shared Data:**") | |
| print("β’ geography.yaml β Used by all tools for validation") | |
| print("β’ response_templates.yaml β Used by all tools for responses") | |
| print("\nπ§ **Tool-Specific:**") | |
| print("β’ tools/{tool}/config/tasks.yaml β What the tool can do") | |
| print("β’ tools/{tool}/config/parameters.yaml β Tool parameters") | |
| print("β’ tools/{tool}/config/validation_rules.yaml β Input validation") | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="Navigate the config-driven architecture", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| %(prog)s --list-all # List all configs | |
| %(prog)s --show agent.response_templates # Show agent response templates | |
| %(prog)s --show omirl.tasks # Show OMIRL tasks config | |
| %(prog)s --search "precipitazione" # Search for term in all configs | |
| %(prog)s --validate # Validate all configs | |
| %(prog)s --relationships # Show config relationships | |
| """ | |
| ) | |
| parser.add_argument("--list-all", action="store_true", | |
| help="List all available configuration files") | |
| parser.add_argument("--show", metavar="CONFIG_PATH", | |
| help="Show structure of specific config (e.g., agent.response_templates)") | |
| parser.add_argument("--search", metavar="TERM", | |
| help="Search for a term across all configurations") | |
| parser.add_argument("--validate", action="store_true", | |
| help="Validate all configuration files") | |
| parser.add_argument("--relationships", action="store_true", | |
| help="Show configuration relationships") | |
| parser.add_argument("--depth", type=int, default=3, | |
| help="Maximum depth for structure display (default: 3)") | |
| args = parser.parse_args() | |
| navigator = ConfigNavigator() | |
| if args.list_all: | |
| print("\nπΊοΈ **Available Configurations**") | |
| print("=" * 60) | |
| configs = navigator.list_all_configs() | |
| for category, config_list in configs.items(): | |
| print(f"\n{category}:") | |
| for config in config_list: | |
| print(f" β’ {config}") | |
| elif args.show: | |
| navigator.show_config_structure(args.show, max_depth=args.depth) | |
| elif args.search: | |
| print(f"\nπ **Search Results for '{args.search}'**") | |
| print("=" * 60) | |
| results = navigator.find_in_configs(args.search) | |
| if results: | |
| for config_path, category in results: | |
| print(f"π {config_path} ({category})") | |
| else: | |
| print("No results found.") | |
| elif args.validate: | |
| print("\nβ **Configuration Validation**") | |
| print("=" * 60) | |
| results = navigator.validate_configs() | |
| for status, configs in results.items(): | |
| print(f"\n{status}:") | |
| for config in configs: | |
| print(f" β’ {config}") | |
| elif args.relationships: | |
| navigator.show_config_relationships() | |
| else: | |
| parser.print_help() | |
| if __name__ == "__main__": | |
| main() | |