# RustworkX Agent Framework β€” Full Documentation

A modern graph-based framework for multi-agent systems

A flexible, high-performance alternative to LangGraph with dynamic topology, decentralized memory, and full access to graph structures

--- ## πŸ“‹ Table of Contents - [Introduction](#introduction) - [Installation](#installation) - [Quick Start](#quick-start) - [Key Concepts](#key-concepts) - [Core Components](#core-components) - [RoleGraph](#rolegraph) - [AgentProfile](#agentprofile) - [TaskNode](#tasknode) - [NodeEncoder](#nodeencoder) - [MACPRunner](#macprunner) - [Scheduler](#scheduler) - [Memory System](#memory-system) - [Streaming API](#streaming-api) - [Token Budget](#token-budget-budget-system) - [Error Handling](#error-handling-error-handling) - [Graph Algorithms](#graph-algorithms-graph-algorithms) - [Metrics Tracking](#metrics-tracking-metrics-tracker) - [Visualization](#visualization-visualization) - [Graph Schemas](#graph-schemas-schema-system) - [Builder API](#builder-api-detailed) - [Event System](#event-system-event-system) - [Callback System (LangChain-like)](#callback-system) - [State Storage](#state-storage-state-storage) - [Async Utilities](#async-utilities-async-utils) - [Conditional Routing](#conditional-routing-conditional-routing) - [Agent Tools (Tools)](#agent-tools-tools) - [Advanced Features](#advanced-features) - [Execution Optimization and Token Savings](#execution-optimization-and-token-savings) - [Multi-Model Support](#multi-model-support-multi-model-support) - [Structured Prompt β€” modern chat LLMs (recommended)](#structured-prompt--modern-chat-llms-recommended) - [Built-in factory helpers](#built-in-factory-helpers-recommended-zero-boilerplate) - [Dynamic Topology](#dynamic-topology) - [GNN Routing](#gnn-routing) - [Hidden Channels](#hidden-channels) - [Adaptive Execution](#adaptive-execution) - [Configuration](#configuration) - [Usage Examples](#usage-examples) - [API Reference](#api-reference) - [FAQ](#faq) --- ## Introduction **RustworkX Agent Framework** (gMAS) is a framework for building multi-agent systems that uses the `rustworkx` library for high-performance graph operations. It addresses key limitations of existing solutions such as LangGraph: ### Why is gMAS better than LangGraph? | Feature | LangGraph | gMAS Framework | |-------------|-----------|----------------| | **Topology** | Fixed | **Dynamic** (runtime changes via hooks) | | **Token optimization** | Minimal | **Automatic** (filtering isolated nodes, disabled nodes, early stopping) | | **Memory** | Centralized | Decentralized (agents’ local state) | | **Graph** | Hidden from the developer | First-class citizen (full access) | | **Representations** | Text only | Text + embeddings + hidden states | | **Typing and validation** | Minimal | **Full Pydantic validation** (type safety) | | **Data schemas** | Informal | **Pydantic BaseModel** (auto-validation, serialization) | | **Multi-model** | Limited | Full support for different LLMs per agent | | **Parallelism** | Limited | Full async/parallel support | | **ML integration** | None | PyTorch Geometric, GNN routing, RL hooks | | **Serialization** | Manual | **Automatic** (Pydantic `.model_dump()`) | | **Runtime adaptation** | None | **Topology hooks, early stopping, disabled nodes** | | **Callbacks** | BaseCallbackHandler | **Full compatibility** (same methods: on_run_start, on_agent_end, on_tool_start/end/error, etc.) | --- ## Installation ### Requirements - Python 3.12+ - PyTorch 2.0+ - **Pydantic 2.0+** (required β€” the framework is fully built on Pydantic) ### Via pip (from sources) ```bash git clone https://github.com/yourusername/rustworkx-agent-framework.git cd rustworkx-agent-framework pip install -e . ``` ### Dependencies ```bash # Core (required) pip install rustworkx>=0.13 pydantic>=2.0 pydantic-settings>=2.0 torch>=2.0 loguru>=0.7 # For embeddings (optional) pip install sentence-transformers>=2.0 # For GNN routing (optional) pip install torch-geometric>=2.0 # For visualization (optional) pip install rich>=13.0 graphviz>=0.20 ``` ### Install all optional dependencies ```bash pip install -e ".[all]" ``` ### Important: Pydantic 2.0+ gMAS Framework **requires Pydantic 2.0+** and is incompatible with Pydantic 1.x. All models (`AgentProfile`, `TaskNode`, schemas, configurations) use the Pydantic v2 API: - `.model_dump()` instead of `.dict()` - `.model_validate()` instead of `.parse_obj()` - `.model_dump_json()` instead of `.json()` If you have Pydantic 1.x installed: ```bash pip install --upgrade "pydantic>=2.0" ``` --- ## Quick Start ### Minimal example ```python from core import AgentProfile, RoleGraph from execution import MACPRunner from builder import build_property_graph # 1. Define agents agents = [ AgentProfile( agent_id="solver", display_name="Math Solver", description="Solves math problems step by step", tools=["calculator"], ), AgentProfile( agent_id="checker", display_name="Answer Checker", description="Checks solutions for correctness", ), ] # 2. Define connections between agents workflow_edges = [("solver", "checker")] # 3. Build the graph graph = build_property_graph( agents, workflow_edges=workflow_edges, query="What is 25 Γ— 17?", ) # 4. Define an LLM call function def my_llm_caller(prompt: str) -> str: # Integrate your LLM here (OpenAI, Anthropic, local, etc.) return call_your_llm(prompt) # 5. Run execution runner = MACPRunner(llm_caller=my_llm_caller) result = runner.run_round(graph) # 6. Get results print(f"Answer: {result.final_answer}") print(f"Execution order: {result.execution_order}") print(f"Tokens used: {result.total_tokens}") ``` ### Quick Start: with monitoring (Callbacks) ```python from execution import MACPRunner, RunnerConfig from callbacks import ( StdoutCallbackHandler, MetricsCallbackHandler, collect_metrics, ) # 1. Add callback handlers config = RunnerConfig( callbacks=[ StdoutCallbackHandler(show_outputs=True), # Console output MetricsCallbackHandler(), # Metrics collection ] ) runner = MACPRunner(llm_caller=my_llm_caller, config=config) result = runner.run_round(graph) # 2. Or use a context manager with collect_metrics() as metrics: result = runner.run_round(graph) print(f"Total tokens: {metrics.total_tokens}") print(f"Execution time: {metrics.total_duration_ms}ms") print(f"Agent calls: {metrics.get_metrics()['agent_calls']}") ``` ### Quick Start: multi-model (different LLM for each agent) ```python from builder import GraphBuilder from execution import MACPRunner, LLMCallerFactory # 1. Create a builder and add agents with different models builder = GraphBuilder() # Agent 1: strong model for complex analysis builder.add_agent( agent_id="analyst", display_name="Senior Analyst", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, max_tokens=2000, ) # Agent 2: smaller model for formatting builder.add_agent( agent_id="formatter", display_name="Report Formatter", llm_backbone="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.3, max_tokens=500, ) # 2. Define edges builder.add_workflow_edge("analyst", "formatter") # 3. Set the query and build the graph builder.add_task(query="Analyze Q4 sales") graph = builder.build() # 4. Create an LLM factory (automatically creates callers for each agent) factory = LLMCallerFactory.create_openai_factory() # 5. Run execution runner = MACPRunner(llm_factory=factory) result = runner.run_round(graph) # 6. Get the result print(f"Final answer: {result.final_answer}") print("Savings: use gpt-4 only for analysis, gpt-4o-mini for formatting") ``` ### Quick Start: token optimization and dynamic topology ```python from builder import GraphBuilder from execution import ( MACPRunner, RunnerConfig, EarlyStopCondition, TopologyAction ) # 1. Create a graph with explicit boundaries builder = GraphBuilder() builder.add_agent("input", persona="Input processor") builder.add_agent("solver", persona="Problem solver") builder.add_agent("checker", persona="Solution checker") builder.add_agent("expert", persona="Expert reviewer (expensive)") builder.add_agent("output", persona="Output formatter") builder.add_agent("optional", persona="Optional analyzer") builder.add_workflow_edge("input", "solver") builder.add_workflow_edge("solver", "checker") builder.add_workflow_edge("checker", "output") # expert is connected dynamically when needed # Set boundaries (for filtering unreachable nodes) builder.set_start_node("input") builder.set_end_node("output") builder.add_task(query="Solve the problem") builder.connect_task_to_agents() graph = builder.build() # 2. Disable optional nodes graph.disable("optional") # Will not run, token savings # 3. Hook for topology adaptation def adaptive_hook(ctx, graph): # If checker found an error β€” add expert if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""): return TopologyAction( add_edges=[("checker", "expert", 1.0), ("expert", "output", 1.0)], trigger_rebuild=True ) # If solver is confident β€” skip checker if ctx.agent_id == "solver" and "CONFIDENT" in (ctx.response or ""): return TopologyAction(skip_agents=["checker"]) return None # 4. Configure runner with optimization config = RunnerConfig( adaptive=True, enable_dynamic_topology=True, topology_hooks=[adaptive_hook], early_stop_conditions=[ EarlyStopCondition.on_keyword("FINAL_ANSWER"), EarlyStopCondition.on_token_limit(5000), ], ) runner = MACPRunner(llm_caller=my_llm, config=config) # 5. Execute with filtering unreachable nodes result = runner.run_round( graph, filter_unreachable=True # Exclude nodes not on the input->output path ) # 6. Result print(f"Executed: {result.execution_order}") print(f"Pruned: {result.pruned_agents}") # optional + unreachable print(f"Early stopped: {result.early_stopped}") print(f"Topology mods: {result.topology_modifications}") # was expert added? print(f"Tokens: {result.total_tokens}") ``` --- ## Key Concepts ### Pydantic-oriented architecture gMAS Framework is **fully built on Pydantic** for type safety, validation, and data serialization. All key models inherit from `pydantic.BaseModel`: #### Core Pydantic models in the framework | Model | Purpose | Notes | |--------|-----------|-------------| | `AgentProfile` | Agent profile | `frozen=True` (immutable), `arbitrary_types_allowed` for torch.Tensor | | `AgentLLMConfig` | Agent LLM configuration | Validates model parameters, supports env vars | | `TaskNode` | Task node | Stores the query and task context | | `GraphSchema` | Schema of the whole graph | Nodes (dict), edges (list), metadata | | `AgentNodeSchema` | Agent-node schema | LLM config, tools, metrics, embeddings | | `TaskNodeSchema` | Task-node schema | Query, status, deadline | | `BaseEdgeSchema` | Base edge schema | Weight, probability, cost metrics | | `WorkflowEdgeSchema` | Workflow edge | Conditions, priority, transformations | | `CostMetrics` | Cost metrics | Tokens, latency, trust, reliability | | `LLMConfig` | Full LLM configuration | Model name, base URL, API key, generation parameters | | `VisualizationStyle` | Visualization styles | Settings for colors, shapes, what to show | | `NodeStyle` | Node style | Shape, colors, icon | | `EdgeStyle` | Edge style | Line style, arrow, colors | | `ValidationResult` | Validation result | Errors, warnings | | `FeatureConfig` | GNN configuration | Feature dimensions | | `TrainingConfig` | Training configuration | Learning rate, epochs, optimizer | #### Benefits of Pydantic in gMAS 1. **Automatic type validation** ```python # Pydantic automatically checks types agent = AgentProfile( agent_id="test", # str - OK display_name="Test Agent", # str - OK tools=["search", "calc"], # list[str] - OK ) # Validation error for a wrong type agent = AgentProfile(agent_id=123) # ❌ ValidationError: agent_id must be str ``` 2. **Default values** ```python # Pydantic fills fields with default values agent = AgentProfile(agent_id="test", display_name="Test") print(agent.tools) # [] (empty list by default) print(agent.persona) # "" (empty string by default) ``` 3. **Automatic type conversion** ```python # Pydantic validators can automatically convert types schema = AgentNodeSchema( id="test", embedding=torch.tensor([0.1, 0.2, 0.3]) # torch.Tensor β†’ list[float] ) print(type(schema.embedding)) # ``` 4. **Nested models** ```python # Pydantic validates nested models agent = AgentProfile( agent_id="test", display_name="Test", llm_config=AgentLLMConfig( # Nested Pydantic model model_name="gpt-4", temperature=0.7, ) ) ``` 5. **Serialization and deserialization** ```python # Built-in Pydantic methods data = agent.model_dump() # β†’ dict json_str = agent.model_dump_json(indent=2) # β†’ JSON string # Load from dict/JSON loaded = AgentProfile.model_validate(data) loaded_json = AgentProfile.model_validate_json(json_str) ``` 6. **Immutability** ```python # frozen=True for AgentProfile agent = AgentProfile(agent_id="test", display_name="Test") agent.agent_id = "new_id" # ❌ ValidationError: frozen model # Use copy methods for changes updated = agent.model_copy(update={"display_name": "New Name"}) ``` 7. **Extensibility** ```python # extra="allow" enables arbitrary fields schema = GraphSchema( name="MyGraph", custom_field="custom_value", # Additional field another_field=123, # Another one ) ``` ### Declarative typing Thanks to Pydantic, all types are declarative and are checked both statically (mypy, pyright) and dynamically (at runtime): ```python from core import AgentProfile from core.schema import AgentNodeSchema, LLMConfig # Static typing (IDE autocompletion) agent: AgentProfile = AgentProfile(...) config: LLMConfig = LLMConfig(...) schema: AgentNodeSchema = AgentNodeSchema(...) # Dynamic validation (runtime) try: bad_agent = AgentProfile(agent_id=None) # ❌ None instead of str except ValidationError as e: print(e.errors()) # Detailed error information ``` --- ### Decentralized data storage Unlike centralized architectures, gMAS uses a **decentralized** approach: - **Embeddings** are stored inside `AgentProfile.embedding` - **Hidden states** are stored inside `AgentProfile.hidden_state` - **Local memory** is stored inside `AgentProfile.state` - `RoleGraph.embeddings` is an accessor that gathers embeddings from all agents into a single tensor This allows each agent to own its representations and ensures node independence. ### System architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ RoleGraph β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Agent │──│ Agent │──│ Agent │──│ Agent β”‚ β”‚ β”‚ β”‚ Profile β”‚ β”‚ Profile β”‚ β”‚ Profile β”‚ β”‚ Profile β”‚ β”‚ β”‚ β”‚(embeddingβ”‚ β”‚(embeddingβ”‚ β”‚(embeddingβ”‚ β”‚(embeddingβ”‚ β”‚ β”‚ β”‚ state) β”‚ β”‚ state) β”‚ β”‚ state) β”‚ β”‚ state) β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ ↑ ↑ ↑ ↑ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ Adjacency matrix (A_com) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ MACPRunner β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Scheduler β”‚ β”‚ Memory β”‚ β”‚ Budget β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ Pool β”‚ β”‚ Tracker β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ MACPResult β”‚ β”‚ β€’ messages β”‚ β”‚ β€’ final_answer β”‚ β”‚ β€’ metrics β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### Data flow 1. **Create agents** β†’ `AgentProfile` describes the role, capabilities, and tools 2. **Build the graph** β†’ `build_property_graph` creates a `RoleGraph` with topology 3. **Planning** β†’ `Scheduler` determines the execution order 4. **Execution** β†’ `MACPRunner` runs agents sequentially/in parallel 5. **Result** β†’ `MACPResult` contains all agents’ responses and metrics --- ## Core Components ### RoleGraph `RoleGraph` is the central data structure representing the agent graph. ```python from core import RoleGraph # === Graph properties === graph.num_nodes # Number of nodes graph.num_edges # Number of edges graph.agents # List of AgentProfile objects graph.node_ids # List of node IDs ["agent1", "agent2", ...] graph.role_sequence # Role order (legacy) graph.A_com # Adjacency matrix (torch.Tensor, N x N) graph.edge_index # Edge index in PyG format (torch.Tensor, 2 x E) graph.edge_attr # Edge attributes (torch.Tensor, E x feature_dim) graph.embeddings # Accessor: gathers agent embeddings into a tensor (N x dim) graph.graph # Internal rustworkx.PyDiGraph object graph.task_node # TaskNode if enabled, otherwise None graph.query # Task query (string) # === Node operations === # Add a node graph.add_node( agent, # AgentProfile connections_to=["other"], # List of IDs for outgoing edges connections_from=["prev"], # List of IDs for incoming edges weight=1.0, # Default edge weight ) # Remove a node with a state migration policy graph.remove_node( "agent_id", policy=StateMigrationPolicy.ARCHIVE, # DISCARD, COPY, ARCHIVE ) # Replace a node graph.replace_node( old_node_id="old", new_agent=new_agent_profile, policy=StateMigrationPolicy.COPY, # Copy state keep_connections=True, # Preserve edges ) # Get an agent agent = graph.get_agent_by_id("agent_id") # Get node index in the matrix idx = graph.get_node_index("agent_id") # -> int # Existence check if "agent_id" in graph.node_ids: ... # === Edge operations === # Add an edge graph.add_edge( source="agent1", target="agent2", weight=0.8, edge_type="workflow", # Edge type (optional) metadata={"priority": 1}, # Additional data ) # Remove an edge graph.remove_edge("agent1", "agent2") # Update edge weight graph.update_edge_weight("agent1", "agent2", new_weight=0.9) # Get neighbors out_neighbors = graph.get_neighbors("agent_id", direction="out") # Outgoing in_neighbors = graph.get_neighbors("agent_id", direction="in") # Incoming all_neighbors = graph.get_neighbors("agent_id", direction="both") # All # Check whether an edge exists has_edge = graph.has_edge("agent1", "agent2") # Get edge weight weight = graph.get_edge_weight("agent1", "agent2") # === Execution bounds (start/end nodes) === # Set start and end nodes for optimization graph.set_start_node("input_agent") graph.set_end_node("output_agent") # Or set both at once graph.set_execution_bounds("input_agent", "output_agent") # Inspect bounds print(f"Start: {graph.start_node}, End: {graph.end_node}") # === Disabled nodes === # Disable nodes (they remain in the graph but will not be executed) graph.disable("agent1") # One node graph.disable(["agent2", "agent3"]) # Multiple nodes # Enable back graph.enable("agent1") # One node graph.enable(["agent2", "agent3"]) # Multiple nodes graph.enable() # All disabled nodes # Check status graph.is_enabled("agent1") # -> bool graph.get_enabled() # -> ["agent1", ...] graph.get_disabled() # -> ["agent2", ...] # Use case: token savings based on algorithms if rl_model.predict(graph_state) < threshold: graph.disable("expensive_agent") # === Reachability analysis === # Get nodes reachable from start_node reachable = graph.get_reachable_from("input_agent") # Get nodes that can reach end_node reaching = graph.get_nodes_reaching("output_agent") # Get relevant nodes (on the path start -> end) relevant = graph.get_relevant_nodes() # Automatically uses graph.start_node and graph.end_node # Get isolated nodes (not on the path start -> end) isolated = graph.get_isolated_nodes() # Optimized execution order (without isolated nodes) order = graph.get_optimized_execution_order() # === Conditional edges === # Add an edge with a condition from execution.scheduler import ConditionContext def condition_func(context: ConditionContext) -> bool: return context.state.get("quality") > 0.8 graph.add_conditional_edge( source="writer", target="editor", condition=condition_func, weight=0.9, ) # === Dynamic topology updates === # Full update of the adjacency matrix graph.update_communication( a_new, # New adjacency matrix (torch.Tensor) s_tilde=scores, # Quality score matrix (optional) p_matrix=probabilities, # Transition probability matrix (optional) ) # === Conversion and export === # Serialize to a dictionary data = graph.to_dict() # { # "agents": [...], # "adjacency": [[...]], # "query": "...", # "task_node": {...}, # } # Convert to PyTorch Geometric Data pyg_data = graph.to_pyg_data() # Data(x=node_features, edge_index=edges, edge_attr=weights) # Extract a subgraph subgraph = graph.subgraph(["agent1", "agent2", "agent3"]) # Copy the graph graph_copy = graph.copy() # === Integrity checks === # Verify consistency of internal structures graph.verify_integrity(raise_on_error=True) # Quick check is_valid = graph.is_consistent() # === Graph analysis === # Check whether it is a DAG (directed acyclic graph) is_dag = graph.is_dag() # Get topological order (if DAG) if graph.is_dag(): topo_order = graph.topological_sort() # === Agent updates === # Update an agent's embedding agent = graph.get_agent_by_id("solver") agent = agent.with_embedding(new_embedding) graph.update_agent("solver", agent) # Update an agent's state agent = agent.append_state({"role": "assistant", "content": "Response"}) graph.update_agent("solver", agent) # === Batch operations === # Update multiple agents updates = { "agent1": updated_agent1, "agent2": updated_agent2, } graph.batch_update_agents(updates) # Add multiple edges edges = [ ("a", "b", 0.8), ("b", "c", 0.9), ("c", "d", 0.7), ] graph.batch_add_edges(edges) ``` #### State migration policies When removing or replacing a node, you can specify a migration policy: ```python from core.graph import StateMigrationPolicy # DISCARD β€” state is removed graph.remove_node("agent_id", policy=StateMigrationPolicy.DISCARD) # COPY β€” state is copied into the new node graph.replace_node("old_id", new_agent, policy=StateMigrationPolicy.COPY) # ARCHIVE β€” state is saved to external storage graph.remove_node("agent_id", policy=StateMigrationPolicy.ARCHIVE) ``` --- ### AgentProfile `AgentProfile` is an **immutable Pydantic model** (`BaseModel` with `frozen=True`) representing an agent profile with description, tools, state, and LLM configuration. > **Important**: > - `AgentProfile` inherits from `pydantic.BaseModel`, providing **automatic type validation** and **type safety** > - Embeddings and hidden states are stored **at the agent level**, not at the graph level > - **Multi-model support** β€” each agent can have its own LLM configuration > - Immutability (`frozen=True`) β€” methods return new objects #### AgentProfile structure (Pydantic model) | Field | Type | Description | |------|-----|----------| | `agent_id` | `str` | Unique agent identifier (required) | | `display_name` | `str` | Display name (required) | | `persona` | `str` | Agent role/persona (e.g., "Expert analyst") | | `description` | `str` | Textual description of agent capabilities | | `llm_backbone` | `str \| None` | LLM model identifier (legacy; use `llm_config`) | | `llm_config` | `AgentLLMConfig \| None` | **Pydantic model** for the agent’s LLM configuration | | `tools` | `list[str]` | List of available tools (shell, code_interpreter, file_search, web_search, custom) | | `raw` | `Mapping[str, Any]` | Arbitrary extra data | | `embedding` | `torch.Tensor \| None` | Agent vector representation (arbitrary_types_allowed) | | `state` | `list[dict[str, Any]]` | Local state / message history | | `hidden_state` | `torch.Tensor \| None` | Hidden state passed between agents | #### AgentLLMConfig (Pydantic model) ```python from core.agent import AgentLLMConfig # AgentLLMConfig - a Pydantic model for LLM configuration llm_config = AgentLLMConfig( model_name="gpt-4", # Model name base_url="https://api.openai.com/v1", # API endpoint api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR) max_tokens=2000, # Max tokens temperature=0.7, # Temperature timeout=60.0, # Timeout in seconds top_p=0.9, # Top-p sampling stop_sequences=["END", "STOP"], # Stop sequences extra_params={"frequency_penalty": 0.5}, # Extra parameters ) # AgentLLMConfig methods api_key = llm_config.resolve_api_key() # Resolve $ENV_VAR is_set = llm_config.is_configured() # Check whether configured params = llm_config.to_generation_params() # Build params for the LLM ``` #### Creating and working with AgentProfile ```python from core import AgentProfile from core.agent import AgentLLMConfig # 1. Basic creation (Pydantic validates types) agent = AgentProfile( agent_id="analyzer", # Unique ID (str, required) display_name="Data Analyzer", # Display name (str, required) persona="Expert data analyst", # Role/persona (str, default="") description="Analyzes data and produces insights", # Description (str, default="") tools=["python", "sql"], # Available tools (list[str], default=[]) ) # 2. Creation with LLM config (Pydantic model) llm_config = AgentLLMConfig( model_name="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", # Resolved from environment temperature=0.7, max_tokens=2000, ) agent = AgentProfile( agent_id="researcher", display_name="Researcher", llm_config=llm_config, # Pydantic validates the nested model tools=["web_search"], ) # 3. State operations (immutable β€” returns a NEW object) agent = agent.append_state({"role": "user", "content": "Hello!"}) agent = agent.with_state([{"role": "system", "content": "You are helpful"}]) agent = agent.clear_state() # 4. Embeddings (arbitrary_types_allowed for torch.Tensor) import torch embedding = torch.randn(384) agent = agent.with_embedding(embedding) hidden_state = torch.randn(768) agent = agent.with_hidden_state(hidden_state) # 5. LLM config operations agent = agent.with_llm_config(llm_config) # Get the agent model name (priority: llm_config.model_name β†’ llm_backbone) model_name = agent.get_model_name() # "gpt-4" # Check if a custom LLM configuration is set if agent.has_custom_llm(): print(f"Agent uses custom LLM: {agent.llm_config.model_name}") print(f"Base URL: {agent.llm_config.base_url}") print(f"Generation params: {agent.llm_config.to_generation_params()}") # 6. Serialization (Pydantic methods) # For encoder (text) text = agent.to_text() # For persistence (dict, includes llm_config) data = agent.to_dict() # Pydantic serialization methods agent_dict = agent.model_dump() # Dict[str, Any] agent_json = agent.model_dump_json(indent=2) # JSON string # 7. Deserialization (Pydantic methods) loaded_agent = AgentProfile.model_validate(agent_dict) loaded_from_json = AgentProfile.model_validate_json(agent_json) ``` #### Example: agents with different LLMs ```python from core import AgentProfile from core.agent import AgentLLMConfig # Agent 1: strong model for analysis analyst = AgentProfile( agent_id="analyst", display_name="Senior Analyst", persona="Expert data analyst with 10 years experience", description="Performs deep analysis of complex data", llm_config=AgentLLMConfig( model_name="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, # Deterministic for analysis max_tokens=2000, ), tools=["python", "sql", "visualization"], ) # Agent 2: cheaper model for formatting formatter = AgentProfile( agent_id="formatter", display_name="Report Formatter", persona="Technical writer", description="Formats analysis results into readable reports", llm_config=AgentLLMConfig( model_name="gpt-4o-mini", # Cheaper for simple tasks base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.3, max_tokens=500, ), tools=["markdown", "latex"], ) # Agent 3: local model local_agent = AgentProfile( agent_id="local_llm", display_name="Local Assistant", llm_config=AgentLLMConfig( model_name="llama3:70b", base_url="http://localhost:11434/v1", # Ollama temperature=0.5, ), ) ``` #### Benefits of Pydantic validation 1. **Automatic type checking** when creating objects 2. **Default values** for optional fields 3. **Immutability** (`frozen=True`) prevents accidental changes 4. **Nested models** (`AgentLLMConfig` is validated automatically) 5. **Serialization/deserialization** via `.model_dump()` and `.model_validate()` 6. **Support for arbitrary types** (`arbitrary_types_allowed`) for torch.Tensor --- ### TaskNode `TaskNode` is an **immutable Pydantic model** (`BaseModel` with `frozen=True`) representing a virtual task node that stores the task query and can be connected to all agents. > **Important**: `TaskNode` inherits from `pydantic.BaseModel`, providing automatic type validation and immutability (just like `AgentProfile`). #### TaskNode structure (Pydantic model) | Field | Type | Description | |------|-----|----------| | `agent_id` (`id`) | `str` | Task node identifier (default `__task__`) | | `type` | `str` | Node type (`"task"`, automatically) | | `query` | `str` | Task statement / query | | `description` | `str` | Additional context description | | `embedding` | `torch.Tensor \| None` | Task embedding (arbitrary_types_allowed) | | `display_name` | `str` | Display name (default `"Task"`) | | `persona` | `str` | Task persona/role (default empty) | | `llm_backbone` | `str \| None` | Model identifier, if needed | | `tools` | `list[str]` | Tools available to the task node (default=[]) | | `state` | `list[dict[str, Any]]` | Local task state / message history (default=[]) | ```python from core import TaskNode # Pydantic validates types on creation task = TaskNode( agent_id="__task__", # can be overridden (str) query="Draft a market research plan", # required (str) description="A task for the whole team of agents", # optional (str, default="") ) # Task embedding (optional, arbitrary_types_allowed for torch.Tensor) import torch task_embedding = torch.randn(384) task = task.with_embedding(task_embedding) # TaskNode is immutable (frozen=True), use copy methods updated_task = task.model_copy(update={"description": "New description"}) # Pydantic serialization task_dict = task.model_dump() task_json = task.model_dump_json(indent=2) # Deserialization loaded = TaskNode.model_validate(task_dict) ``` > When using `build_property_graph(..., include_task_node=True)`, the task node is created automatically and connected to agents via context/update edges. #### TaskNode methods (immutable) ```python # Embedding operations (returns a new object) task = task.with_embedding(embedding_tensor) # State operations (returns a new object) task = task.append_state({"role": "system", "content": "Context"}) task = task.with_state([{"role": "user", "content": "Query"}]) task = task.clear_state() # Convert to text task_text = task.to_text() # For encoder # Convert to dict task_data = task.to_dict() # For persistence ``` --- ### NodeEncoder `NodeEncoder` converts textual agent descriptions into vector representations. ```python from core import NodeEncoder # sentence-transformers (recommended) encoder = NodeEncoder( model_name="sentence-transformers/all-MiniLM-L6-v2", normalize_embeddings=True, ) # hash fallback (fast, no model required) encoder = NodeEncoder(model_name="hash:256") # Encode texts texts = [agent.to_text() for agent in agents] embeddings = encoder.encode(texts) # torch.Tensor (N x dim) # Get dimensionality dim = encoder.embedding_dim ``` --- ### MACPRunner `MACPRunner` is the executor of the Multi-Agent Communication Protocol. ```python from execution import MACPRunner, RunnerConfig # βœ… Recommended for modern chat LLMs (OpenAI, GigaChat, etc.) # Sends proper system/user roles β€” no flat-string workaround needed. from openai import OpenAI client = OpenAI(api_key="sk-...") def my_structured_caller(messages: list[dict]) -> str: resp = client.chat.completions.create(model="gpt-4o", messages=messages) return resp.choices[0].message.content or "" runner = MACPRunner(structured_llm_caller=my_structured_caller) # Legacy setup β€” one flat-string LLM for all agents (still supported) runner = MACPRunner( llm_caller=sync_llm_function, # Callable[[str], str] async_llm_caller=async_llm_function, # Callable[[str], Awaitable[str]] token_counter=my_token_counter, # Token counting ) # Multi-model setup (different LLMs for different agents) from execution import LLMCallerFactory, create_openai_caller # Option 1: Use a factory (recommended) factory = LLMCallerFactory.create_openai_factory( default_model="gpt-4o-mini", default_base_url="https://api.openai.com/v1", ) runner = MACPRunner(llm_factory=factory) # Option 2: A dictionary of callers per agent runner = MACPRunner( llm_callers={ "analyst": create_openai_caller(model="gpt-4", temperature=0.0), "writer": create_openai_caller(model="gpt-4o-mini", temperature=0.7), }, async_llm_callers={ "analyst": create_openai_caller(model="gpt-4", is_async=True), "writer": create_openai_caller(model="gpt-4o-mini", is_async=True), }, ) # Option 3: Combined (factory + overrides for specific agents) runner = MACPRunner( llm_factory=factory, # Default for everyone llm_callers={"critical_agent": specialized_caller}, # Override for critical_agent ) # Advanced configuration config = RunnerConfig( timeout=60.0, # Per-agent timeout adaptive=True, # Adaptive mode enable_parallel=True, # Parallel execution max_parallel_size=5, # Max parallel agents max_retries=2, # Retries on errors update_states=True, # Update agent states enable_memory=True, # Enable memory callbacks=[StdoutCallbackHandler()], # Callbacks for logging ) runner = MACPRunner(llm_caller=my_llm, config=config) # Synchronous execution result = runner.run_round(graph) # With explicit execution bounds and filtering result = runner.run_round( graph, start_agent_id="input", # Start agent (overrides graph.start_node) final_agent_id="output", # Final agent (overrides graph.end_node) filter_unreachable=True, # Exclude isolated nodes (token savings) update_states=True, # Update agent states ) # Asynchronous execution result = await runner.arun_round( graph, start_agent_id="input", final_agent_id="output", filter_unreachable=True, ) # Execution with hidden channels result = runner.run_round_with_hidden(graph, hidden_encoder=encoder) ``` #### RunnerConfig (full specification) ```python from execution import RunnerConfig, RoutingPolicy, PruningConfig, BudgetConfig, ErrorPolicy, ErrorAction config = RunnerConfig( # === Basic parameters === timeout=60.0, # Per-agent timeout (sec) max_retries=3, # Max attempts on errors update_states=True, # Update AgentProfile.state # === Adaptive mode === # adaptive controls conditional edges, pruning, fallback, and routing # policies. It does NOT affect whether agents run in parallel. adaptive=True, # Enable conditional routing & pruning routing_policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy # === Parallel execution === # enable_parallel works independently of adaptive: when True, # independent agents (those with all predecessors done) are executed # concurrently via asyncio.gather. Works with both astream() and # arun_round(), regardless of the adaptive flag. enable_parallel=True, # Parallel group execution max_parallel_size=5, # Max agents in a parallel group # === Pruning === pruning_config=PruningConfig( min_weight_threshold=0.1, # Min edge weight min_probability_threshold=0.05, # Min transition probability max_consecutive_errors=3, # Max consecutive errors token_budget=10000, # Token budget for pruning enable_fallback=True, # Use fallback agents max_fallback_attempts=2, # Max fallback attempts quality_scorer=None, # Quality scoring function min_quality_threshold=0.3, # Min quality to continue ), # === Budget === budget_config=BudgetConfig( total_token_limit=50000, node_token_limit=2000, max_prompt_length=4000, max_response_length=2000, warn_at_usage_ratio=0.8, total_time_limit_seconds=600, total_request_limit=100, ), # === Memory === enable_memory=True, # Enable memory system memory_config=MemoryConfig( working_max_entries=20, long_term_max_entries=100, working_default_ttl=3600.0, auto_compress=True, promote_after_accesses=3, ), memory_context_limit=5, # Memory entries injected into the prompt # === Hidden channels === enable_hidden_channels=True, # Passing hidden_state hidden_combine_strategy="mean", # mean, sum, concat, attention pass_embeddings=True, # Pass embeddings # === Task query broadcast === broadcast_task_to_all=True, # True: task query is sent to all agents # False: only to agents connected to the task node # === Dynamic topology (runtime modification) === enable_dynamic_topology=True, # Enable runtime graph modifications topology_hooks=[my_hook_func], # Sync hooks for topology modification async_topology_hooks=[async_hook], # Async hooks for topology modification early_stop_conditions=[ # Early stopping conditions EarlyStopCondition.on_keyword("FINAL ANSWER"), EarlyStopCondition.on_token_limit(10000), EarlyStopCondition.on_custom(lambda ctx: my_logic(ctx)), ], # === Callbacks (monitoring and logging) === callbacks=[ # Callback handlers StdoutCallbackHandler( # Console output show_prompts=False, show_outputs=True, ), MetricsCallbackHandler(), # Metrics aggregation FileCallbackHandler("run.jsonl"), # File logging ], # === Error handling === error_policy=ErrorPolicy( on_timeout=ErrorAction.RETRY, on_retry_exhausted=ErrorAction.PRUNE, on_budget_exceeded=ErrorAction.ABORT, on_validation_error=ErrorAction.ABORT, ), # === Streaming === enable_token_streaming=False, # Enable token-level streaming if LLM supports it ) ``` #### Execution result (MACPResult) ```python result.messages # Dict[agent_id -> response] result.final_answer # Final agent answer result.final_agent_id # Final agent ID result.execution_order # Execution order result.agent_states # Updated agent states result.total_tokens # Total tokens result.total_time # Execution time (sec) result.topology_changed_count # Number of topology changes result.fallback_count # Number of fallbacks result.pruned_agents # Pruned agents (including disabled and isolated) result.errors # List of errors result.hidden_states # Agents' hidden states result.metrics # ExecutionMetrics with detailed statistics # New fields (dynamic topology) result.early_stopped # bool: whether early stopping occurred result.early_stop_reason # str: early stop reason result.topology_modifications # int: number of topology modifications ``` --- ### Scheduler The scheduler determines the agent execution order. ```python from execution import ( build_execution_order, get_parallel_groups, AdaptiveScheduler, RoutingPolicy, PruningConfig, ) # Simple topological order order = build_execution_order(graph.A_com, agent_ids) # Parallel execution groups groups = get_parallel_groups(graph.A_com, agent_ids) # Result: [["a", "b"], ["c"], ["d", "e"]] # Adaptive scheduler scheduler = AdaptiveScheduler( policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy pruning_config=PruningConfig( min_weight_threshold=0.1, # Min edge weight min_probability_threshold=0.05, # Min probability max_consecutive_errors=3, # Max consecutive errors token_budget=10000, # Token budget enable_fallback=True, # Enable fallback max_fallback_attempts=2, # Max fallback attempts ), beam_width=3, # Beam search width ) # Build a plan plan = scheduler.build_plan( a_agents, # Agent adjacency matrix agent_ids, # List of IDs p_matrix=probs, # Probability matrix end_agent="final", # Final agent ) # Working with the plan step = plan.get_current_step() plan.mark_completed("agent_id", tokens=100) plan.mark_failed("agent_id") plan.mark_skipped("agent_id") ``` #### Routing policies (detailed) ```python from execution import RoutingPolicy, AdaptiveScheduler # ========== 1. TOPOLOGICAL (Topological sort) ========== # Description: Classic topological sort for a DAG # Use case: Simple pipelines without adaptivity # Complexity: O(V + E) scheduler = AdaptiveScheduler(policy=RoutingPolicy.TOPOLOGICAL) plan = scheduler.build_plan(adjacency, agent_ids) # Example: # A β†’ B β†’ C β†’ D # Order: [A, B, C, D] # ========== 2. WEIGHTED_TOPO (Weighted topological) ========== # Description: Topological sort with priority based on edge weights # Use case: When you need to account for connection importance # Complexity: O(V + E log V) scheduler = AdaptiveScheduler(policy=RoutingPolicy.WEIGHTED_TOPO) plan = scheduler.build_plan(adjacency, agent_ids) # Example: # β”Œβ”€(0.9)β†’ B ─┐ # A ─── β”œβ†’ D # └─(0.3)β†’ C β”€β”˜ # Order: [A, B, C, D] (B runs before C because 0.9 > 0.3) # ========== 3. GREEDY (Greedy selection) ========== # Description: At each step, selects the agent with the maximum edge weight # Use case: Optimize for connection quality # Complexity: O(VΒ²) scheduler = AdaptiveScheduler(policy=RoutingPolicy.GREEDY) plan = scheduler.build_plan( adjacency, agent_ids, start_node="coordinator", end_node="final", ) # Example: # Start β†’ A(0.9) β†’ B(0.8) β†’ End # Start β†’ C(0.5) β†’ D(0.7) β†’ End # Selected: Start β†’ A β†’ B β†’ End (higher total weight) # ========== 4. BEAM_SEARCH (Beam search) ========== # Description: Keeps beam_width best paths and selects the optimal one # Use case: Balance between quality and speed # Complexity: O(V * beam_width * E) scheduler = AdaptiveScheduler( policy=RoutingPolicy.BEAM_SEARCH, beam_width=3, # Keep 3 best paths ) plan = scheduler.build_plan( adjacency, agent_ids, p_matrix=probability_matrix, # Transition probabilities ) # Example with beam_width=2: # Start ─┬→ A(0.8) ─┬→ B(0.9) β†’ End [path 1: 0.72] # β”‚ β””β†’ C(0.6) β†’ End [path 2: 0.48] # β””β†’ D(0.7) ─→ E(0.8) β†’ End [path 3: 0.56] # Beam keeps paths 1 and 3, drops path 2 # Final choice: path 1 # ========== 5. K_SHORTEST (K shortest paths) ========== # Description: Finds K shortest paths and selects the best by a criterion # Use case: When alternative routes are required # Complexity: O(K * (V + E) log V) scheduler = AdaptiveScheduler( policy=RoutingPolicy.K_SHORTEST, k_paths=5, # Find 5 shortest paths ) plan = scheduler.build_plan( adjacency, agent_ids, start_node="input", end_node="output", path_metric=PathMetric.WEIGHTED, # HOP_COUNT, WEIGHTED, RELIABILITY ) # Example: # Found paths: # 1. input β†’ A β†’ B β†’ output (cost=3, hops=3) # 2. input β†’ C β†’ output (cost=4, hops=2) # 3. input β†’ A β†’ D β†’ output (cost=5, hops=3) # 4. input β†’ E β†’ F β†’ output (cost=6, hops=3) # 5. input β†’ G β†’ output (cost=7, hops=2) # Selection by metric: path 1 (minimum cost) # ========== 6. GNN_BASED (GNN-based) ========== # Description: Uses a trained GNN to predict the optimal route # Use case: Adaptive routing based on history # Requires: A trained GNN model from core.gnn import GNNRouterInference scheduler = AdaptiveScheduler( policy=RoutingPolicy.GNN_BASED, gnn_router=gnn_inference, # GNNRouterInference object gnn_threshold=0.7, # Min confidence to use the GNN ) # If confidence < threshold, fallback policy is used scheduler.set_fallback_policy(RoutingPolicy.WEIGHTED_TOPO) plan = scheduler.build_plan( adjacency, agent_ids, metrics_tracker=tracker, # For GNN features ) # ========== Policy comparison ========== # | Policy | Adaptivity | Complexity | Quality | Use case | # |----------------|----------------|----------------|----------|--------------------------------| # | TOPOLOGICAL | No | O(V+E) | ⭐ | Simple pipelines | # | WEIGHTED_TOPO | Low | O(V+EΒ·logV) | ⭐⭐ | Priority-based pipelines | # | GREEDY | Medium | O(VΒ²) | ⭐⭐⭐ | Weight-optimized routing | # | BEAM_SEARCH | High | O(VΒ·kΒ·E) | ⭐⭐⭐⭐ | Quality/speed balance | # | K_SHORTEST | High | O(KΒ·VΒ·logV) | ⭐⭐⭐⭐ | Alternative route search | # | GNN_BASED | Very high | O(GNN) | ⭐⭐⭐⭐⭐ | Trained systems | # ========== Choosing a policy based on the task ========== # Simple linear pipeline config = RunnerConfig(routing_policy=RoutingPolicy.TOPOLOGICAL) # Graph with different agent priorities config = RunnerConfig(routing_policy=RoutingPolicy.WEIGHTED_TOPO) # Optimize route quality config = RunnerConfig(routing_policy=RoutingPolicy.GREEDY) # Balance exploration vs exploitation config = RunnerConfig( routing_policy=RoutingPolicy.BEAM_SEARCH, adaptive=True, ) scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=3) # Need fallback alternatives config = RunnerConfig(routing_policy=RoutingPolicy.K_SHORTEST) scheduler = AdaptiveScheduler(policy=RoutingPolicy.K_SHORTEST, k_paths=3) # Advanced trained system config = RunnerConfig(routing_policy=RoutingPolicy.GNN_BASED) scheduler = AdaptiveScheduler( policy=RoutingPolicy.GNN_BASED, gnn_router=trained_router, ) ``` --- ### Memory System A stratified memory system with **working** and **long-term** levels, supporting TTL, tags, priorities, and automatic compression. #### Memory architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ AgentMemory β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Working Memory β”‚ β”‚ Long-term Memory β”‚ β”‚ β”‚ β”‚ (TTL: 1 hour) β”‚ β”‚ (TTL: ∞) β”‚ β”‚ β”‚ β”‚ Max: 20 entries β”‚ β”‚ Max: 100 entries β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ - Recent messages │────▢│ - Important facts β”‚ β”‚ β”‚ β”‚ - Temp context β”‚ β”‚ - Key insights β”‚ β”‚ β”‚ β”‚ - Active tasks β”‚ β”‚ - Historical data β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β–² β–² β”‚ β”‚ β”‚ promotion β”‚ β”‚ β”‚ β”‚ (after N accesses) β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ sharing β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ SharedMemoryPool β”‚ β”‚ Memory sharing between agents β”‚ β”‚ - Broadcast: one β†’ all β”‚ β”‚ - Share: one β†’ selected β”‚ β”‚ - Query: search by tags β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- #### Basic usage of AgentMemory ```python from utils.memory import ( AgentMemory, MemoryConfig, MemoryLevel, MemoryEntry, ) # 1. Memory configuration config = MemoryConfig( # Working memory (short-term) working_max_entries=20, # Max entries working_default_ttl=3600.0, # TTL: 1 hour # Long-term memory long_term_max_entries=100, # Max entries long_term_default_ttl=None, # No expiration # Automatic management auto_compress=True, # Auto-compress on limit overflow compress_strategy="truncate", # truncate, summarize promote_after_accesses=3, # Promote to long-term after N accesses # Prioritization use_priority=True, # Consider priorities when evicting priority_weight=0.3, # Priority weight vs recency ) # 2. Create an agent memory memory = AgentMemory("researcher", config) # 3. Add entries # 3.1. Add messages (the simplest way) memory.add_message(role="user", content="Analyze the dataset") memory.add_message(role="assistant", content="I will analyze it") # 3.2. Add with parameters memory.add( content={"type": "insight", "text": "Pattern detected in data"}, level=MemoryLevel.WORKING, # WORKING or LONG_TERM priority=5, # 0-10 (higher = more important) tags={"insight", "data"}, # Tags for search ttl=7200.0, # Custom TTL (2 hours) metadata={"source": "analysis", "confidence": 0.95}, ) # 3.3. Add directly into long-term memory.add( content="Critical finding: correlation coefficient = 0.87", level=MemoryLevel.LONG_TERM, priority=10, tags={"critical", "finding"}, ) # 4. Retrieve entries # 4.1. Get recent messages messages = memory.get_messages(limit=5) for msg in messages: print(f"{msg['role']}: {msg['content']}") # 4.2. Get from working memory working_entries = memory.get(level=MemoryLevel.WORKING, limit=10) for entry in working_entries: print(f"[{entry.priority}] {entry.content}") # 4.3. Get from long-term memory longterm_entries = memory.get(level=MemoryLevel.LONG_TERM) # 4.4. Search by tags insights = memory.search_by_tags({"insight"}, level=MemoryLevel.WORKING) critical = memory.search_by_tags({"critical"}, level=MemoryLevel.LONG_TERM) # 4.5. Get all entries all_entries = memory.get_all() # 5. Memory management # 5.1. Remove an entry memory.remove(entry_key) # 5.2. Clear a level memory.clear(level=MemoryLevel.WORKING) # 5.3. Force compression memory.compress(level=MemoryLevel.WORKING) # 5.4. Promote an entry to long-term memory.promote(entry_key) # 5.5. Update an entry memory.update(entry_key, new_content={"updated": "data"}) # 6. Stats stats = memory.get_stats() print(f"Working: {stats['working_count']}/{stats['working_max']}") print(f"Long-term: {stats['longterm_count']}/{stats['longterm_max']}") print(f"Total accesses: {stats['total_accesses']}") print(f"Promotions: {stats['promotion_count']}") ``` --- #### SharedMemoryPool β€” memory sharing between agents ```python from utils.memory import SharedMemoryPool # 1. Create a pool pool = SharedMemoryPool(max_shared_entries=1000) # 2. Register agents memory_a = AgentMemory("agent_a", config) memory_b = AgentMemory("agent_b", config) memory_c = AgentMemory("agent_c", config) pool.register(memory_a) pool.register(memory_b) pool.register(memory_c) # 3. Broadcast β€” send to everyone pool.broadcast( from_agent="agent_a", entry={ "content": "Important discovery: X correlates with Y", "priority": 8, "tags": {"discovery", "shared"}, }, ) # All agents will receive this entry in working memory # 4. Share β€” send to specific agents pool.share( from_agent="agent_a", entry={"content": "Secret info", "priority": 9}, to_agents=["agent_b", "agent_c"], ) # Only agent_b and agent_c receive the entry # 5. Query β€” request information from the pool results = pool.query( tags={"discovery"}, min_priority=5, limit=10, ) for result in results: print(f"From {result['source_agent']}: {result['content']}") # 6. Subscribe to updates (callback) def on_shared_entry(entry, from_agent, to_agents): print(f"{from_agent} shared: {entry['content']}") pool.subscribe("agent_b", on_shared_entry) # 7. Remove from the pool pool.unregister("agent_c") # 8. Clear the pool pool.clear() ``` --- #### Memory compression ```python from utils.memory import ( TruncateCompressor, SummaryCompressor, ) # 1. Truncate β€” simple removal of old entries compressor = TruncateCompressor(keep_ratio=0.5) # Keep 50% memory = AgentMemory("agent", config) memory.set_compressor(compressor) # When over the limit, 50% of old entries are removed automatically # 2. Summary β€” summarization using an LLM def summarize_llm(entries: list[MemoryEntry]) -> str: texts = [e.content for e in entries] combined = "\n".join(texts) return my_llm(f"Summarize these entries: {combined}") compressor = SummaryCompressor( summarizer=summarize_llm, chunk_size=10, # Summarize in chunks of 10 entries ) memory.set_compressor(compressor) # On compression, 10 entries are replaced with 1 summarized entry # 3. Custom compressor from utils.memory import MemoryCompressor class SmartCompressor(MemoryCompressor): def compress(self, entries: list[MemoryEntry], target_count: int) -> list[MemoryEntry]: # Remove low-priority and old entries sorted_entries = sorted( entries, key=lambda e: (e.priority, e.timestamp), reverse=True, ) return sorted_entries[:target_count] memory.set_compressor(SmartCompressor()) ``` --- #### Integrating memory with the Runner ```python from execution import MACPRunner, RunnerConfig # 1. Configuration with memory enabled config = RunnerConfig( enable_memory=True, memory_config=MemoryConfig( working_max_entries=20, long_term_max_entries=100, auto_compress=True, promote_after_accesses=3, ), memory_context_limit=5, # How many entries to inject into the prompt enable_shared_memory=True, # Enable SharedMemoryPool ) runner = MACPRunner(llm_caller=my_llm, config=config) # 2. Run β€” memory is updated automatically result1 = runner.run_round(graph) # 3. Access an agent’s memory memory = runner.get_agent_memory("researcher") entries = memory.get_messages(limit=10) print(f"Researcher memory: {entries}") # 4. Manually add to memory runner.add_to_memory( "researcher", content="External knowledge: XYZ", level=MemoryLevel.LONG_TERM, priority=8, ) # 5. Second round β€” agents retain context graph.query = "Continue analysis from previous round" result2 = runner.run_round(graph) # 6. Export memories memory_export = runner.export_memories() # { # "agent_a": {"working": [...], "long_term": [...]}, # "agent_b": {"working": [...], "long_term": [...]}, # } # 7. Import memories (restore state) runner.import_memories(memory_export) # 8. Clear memory for all agents runner.clear_all_memories() ``` --- #### Advanced usage: Semantic memory search ```python from utils.memory import SemanticMemoryIndex from core import NodeEncoder # 1. Create a semantic index encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2") semantic_index = SemanticMemoryIndex(encoder) # 2. Add entries to the index memory = AgentMemory("agent", config) for entry in memory.get_all(): semantic_index.add(entry.key, entry.content, entry.tags) # 3. Semantic search query = "findings about correlation" results = semantic_index.search( query, top_k=5, min_similarity=0.7, filter_tags={"finding"}, ) for result in results: print(f"[{result['similarity']:.3f}] {result['content']}") # 4. Integration with AgentMemory memory.enable_semantic_search(encoder) # Now you can search semantically results = memory.semantic_search( query="data patterns", top_k=3, level=MemoryLevel.LONG_TERM, ) ``` --- #### Practical example: Multi-round conversation with memory ```python # Create a graph with memory agents = [ AgentProfile(agent_id="analyzer", display_name="Data Analyzer"), AgentProfile(agent_id="reporter", display_name="Report Writer"), ] graph = build_property_graph( agents, workflow_edges=[("analyzer", "reporter")], query="Analyze dataset.csv", ) # Memory-enabled configuration config = RunnerConfig( enable_memory=True, memory_config=MemoryConfig( working_max_entries=15, long_term_max_entries=50, auto_compress=True, promote_after_accesses=2, ), memory_context_limit=5, enable_shared_memory=True, ) runner = MACPRunner(llm_caller=my_llm, config=config) # Round 1: Initial analysis graph.query = "Analyze the dataset and find key patterns" result1 = runner.run_round(graph) print(f"Round 1 answer: {result1.final_answer}") # Analyzer saved findings to memory analyzer_memory = runner.get_agent_memory("analyzer") print(f"Analyzer memory entries: {len(analyzer_memory.get_all())}") # Round 2: Deeper analysis (agents remember the previous round) graph.query = "Based on previous findings, analyze correlations" result2 = runner.run_round(graph) print(f"Round 2 answer: {result2.final_answer}") # Round 3: Report generation graph.query = "Generate final report summarizing all findings" result3 = runner.run_round(graph) print(f"Round 3 answer: {result3.final_answer}") # Reporter used accumulated memory for a complete report reporter_memory = runner.get_agent_memory("reporter") # Export full history history = { "round_1": result1.to_dict(), "round_2": result2.to_dict(), "round_3": result3.to_dict(), "memories": runner.export_memories(), } import json with open("conversation_history.json", "w") as f: json.dump(history, f, indent=2) ``` --- ### Streaming API LangGraph-like streaming for real-time output. ```python from execution import ( MACPRunner, StreamEventType, StreamBuffer, format_event, print_stream, ) runner = MACPRunner(llm_caller=my_llm) # Synchronous streaming for event in runner.stream(graph): if event.event_type == StreamEventType.AGENT_OUTPUT: print(f"{event.agent_id}: {event.content}") elif event.event_type == StreamEventType.TOKEN: print(event.token, end="", flush=True) # Asynchronous streaming async for event in runner.astream(graph): print(format_event(event)) # Using a buffer buffer = StreamBuffer() for event in runner.stream(graph): buffer.add(event) # ... handle the event print(f"Final answer: {buffer.final_answer}") print(f"Agent outputs: {buffer.agent_outputs}") # Convenience printing answer = print_stream(runner.stream(graph), show_tokens=True) ``` #### Event types (full specification) ```python from execution.streaming import StreamEventType, StreamEvent # === Execution lifecycle === StreamEventType.RUN_START # Fields: run_id, query, num_agents, config StreamEventType.RUN_END # Fields: run_id, success, total_time, total_tokens, execution_order, final_answer # === Agent events === StreamEventType.AGENT_START # Fields: agent_id, step_index, predecessors, prompt_preview StreamEventType.AGENT_OUTPUT # Fields: agent_id, step_index, content, tokens_used, latency_ms StreamEventType.AGENT_ERROR # Fields: agent_id, step_index, error_type, error_message, will_retry # === Token streaming === StreamEventType.TOKEN # Fields: agent_id, token (str), token_index # === Adaptive execution === StreamEventType.TOPOLOGY_CHANGED # Fields: reason, old_plan, new_plan, remaining_steps StreamEventType.PRUNE # Fields: agent_id, reason (low_weight/low_probability/budget/quality) StreamEventType.FALLBACK # Fields: original_agent, fallback_agent, reason, attempt # === Parallel execution === StreamEventType.PARALLEL_START # Fields: group_agents (list), group_index StreamEventType.PARALLEL_END # Fields: group_agents, completed_count, failed_count, duration_ms # === Budget === StreamEventType.BUDGET_WARNING # Fields: budget_type (tokens/requests/time), current, limit, ratio StreamEventType.BUDGET_EXCEEDED # Fields: budget_type, current, limit, action_taken # === Memory === StreamEventType.MEMORY_WRITE # Fields: agent_id, memory_level (working/long_term), entry_key StreamEventType.MEMORY_READ # Fields: agent_id, memory_level, entry_key, found StreamEventType.MEMORY_PROMOTED # Fields: agent_id, entry_key, from_level, to_level # === Metrics === StreamEventType.METRICS_UPDATE # Fields: agent_id, metrics (dict with reliability, latency, quality, cost) # Example: handling all event types for event in runner.stream(graph): match event.event_type: case StreamEventType.RUN_START: print(f"Starting run {event.run_id} with {event.num_agents} agents") case StreamEventType.AGENT_START: print(f"Agent {event.agent_id} starting (step {event.step_index})") case StreamEventType.AGENT_OUTPUT: print(f"Agent {event.agent_id}: {event.content[:100]}...") print(f" Tokens: {event.tokens_used}, Latency: {event.latency_ms}ms") case StreamEventType.TOKEN: print(event.token, end="", flush=True) case StreamEventType.TOPOLOGY_CHANGED: print(f"⟳ Topology changed: {event.reason}") print(f" New plan: {event.new_plan}") case StreamEventType.PRUNE: print(f"βœ‚ Pruned {event.agent_id}: {event.reason}") case StreamEventType.FALLBACK: print(f"β€· Fallback: {event.original_agent} β†’ {event.fallback_agent}") case StreamEventType.PARALLEL_START: print(f"β«Έ Starting parallel group: {event.group_agents}") case StreamEventType.PARALLEL_END: print(f"β«· Parallel group done: {event.completed_count}/{len(event.group_agents)}") case StreamEventType.BUDGET_WARNING: print(f"⚠ Budget warning: {event.budget_type} at {event.ratio:.1%}") case StreamEventType.BUDGET_EXCEEDED: print(f"❌ Budget exceeded: {event.budget_type}") case StreamEventType.RUN_END: print(f"βœ“ Execution completed in {event.total_time:.2f}s") print(f" Total tokens: {event.total_tokens}") print(f" Final answer: {event.final_answer[:100]}...") ``` --- ## Advanced Features ### Execution optimization and token savings The framework provides several mechanisms to optimize execution and reduce token usage: #### 1. Filtering isolated nodes Automatically exclude nodes that are not on the path from start to end: ```python # Set execution bounds graph.set_execution_bounds("input", "output") # Filter isolated nodes during execution result = runner.run_round( graph, filter_unreachable=True # Exclude nodes not on the input->output path ) # Nodes unrelated to the input->output path will not be executed print(f"Agents excluded: {len(result.pruned_agents or [])}") ``` **Example:** ```python builder = GraphBuilder() builder.add_agent("a1") builder.add_agent("a2") builder.add_agent("a3") builder.add_agent("isolated") # Not connected to a1->a3 builder.add_workflow_edge("a1", "a2") builder.add_workflow_edge("a2", "a3") builder.set_execution_bounds("a1", "a3") graph = builder.build() # Reachability analysis relevant = graph.get_relevant_nodes() # {"a1", "a2", "a3"} isolated = graph.get_isolated_nodes() # {"isolated"} result = runner.run_round(graph, filter_unreachable=True) # "isolated" will not run β†’ token savings ``` #### 2. Node deactivation (Disabled Nodes) Temporarily deactivate nodes without removing them from the graph: ```python # Deactivate based on metrics/RL if quality_score < threshold: graph.disable("expensive_agent") # Or multiple nodes graph.disable(["agent1", "agent2"]) # Check if graph.is_enabled("agent1"): ... # Re-enable graph.enable("agent1") graph.enable() # All result = runner.run_round(graph) # Deactivated nodes appear in result.pruned_agents ``` **Use case: RL control** ```python # An RL agent decides which nodes to deactivate for agent_id in graph.node_ids: rl_score = rl_model.predict(graph_state, agent_id) if rl_score < 0.3: graph.disable(agent_id) result = runner.run_round(graph) ``` #### 3. Early stopping Stop execution when a condition is met: ```python from execution import EarlyStopCondition, RunnerConfig # By keyword stop1 = EarlyStopCondition.on_keyword("FINAL ANSWER") # By token limit stop2 = EarlyStopCondition.on_token_limit(5000) # By number of agents stop3 = EarlyStopCondition.on_agent_count(3) # By metadata (for RL/metrics) stop4 = EarlyStopCondition.on_metadata( "quality", 0.95, comparator=lambda v, t: v > t ) # Custom logic stop5 = EarlyStopCondition.on_custom( lambda ctx: my_evaluator.is_done(ctx.messages), reason="Evaluator decided task is done", min_agents_executed=2 # At least 2 agents before checking ) # Combination (OR) stop_any = EarlyStopCondition.combine_any([ EarlyStopCondition.on_keyword("DONE"), EarlyStopCondition.on_token_limit(10000), ]) config = RunnerConfig( early_stop_conditions=[stop1, stop2, stop5] ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round(graph) if result.early_stopped: print(f"Reason: {result.early_stop_reason}") saved = len(graph.node_ids) - len(result.execution_order) print(f"Agents saved: {saved}") ``` #### 4. Runtime topology (Topology Hooks) Modify the graph **during execution** based on intermediate results: ```python from execution import TopologyAction, StepContext def adaptive_topology(ctx: StepContext, graph) -> TopologyAction: """Hook is called after each agent.""" # ctx.agent_id β€” current agent # ctx.response β€” its response # ctx.messages β€” all responses # ctx.execution_order β€” execution order # ctx.remaining_agents β€” remaining agents # ctx.total_tokens β€” tokens used # Add an edge if review is needed if "uncertain" in (ctx.response or "").lower(): return TopologyAction( add_edges=[(ctx.agent_id, "reviewer", 1.0)], trigger_rebuild=True ) # Remove an edge if confident: return TopologyAction( remove_edges=[("agent1", "checker")] ) # Skip agents if ctx.total_tokens > 8000: return TopologyAction( skip_agents=["expensive_agent"] ) # Early stop if "DONE" in (ctx.response or ""): return TopologyAction( early_stop=True, early_stop_reason="Task completed" ) return None config = RunnerConfig( enable_dynamic_topology=True, topology_hooks=[adaptive_topology] ) ``` #### 5. Combined optimization Use all mechanisms together for maximum optimization: ```python from execution import ( GraphBuilder, MACPRunner, RunnerConfig, EarlyStopCondition, TopologyAction, StepContext ) # Build a graph builder = GraphBuilder() builder.add_agent("input") builder.add_agent("solver") builder.add_agent("checker") builder.add_agent("expert") # Expensive agent builder.add_agent("formatter") builder.add_agent("optional") # Optional builder.add_workflow_edge("input", "solver") builder.add_workflow_edge("solver", "checker") builder.add_workflow_edge("checker", "formatter") # Set execution bounds builder.set_execution_bounds("input", "formatter") graph = builder.build() # Disable optional nodes graph.disable("optional") # Adaptation hooks def smart_topology(ctx: StepContext, graph) -> TopologyAction: # If solver is confident β€” skip checker if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95: return TopologyAction(skip_agents=["checker"]) # If checker found an issue β€” add expert if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""): return TopologyAction( add_edges=[("checker", "expert", 1.0), ("expert", "formatter", 1.0)], trigger_rebuild=True ) return None # Configure runner with optimization config = RunnerConfig( adaptive=True, enable_dynamic_topology=True, topology_hooks=[smart_topology], early_stop_conditions=[ EarlyStopCondition.on_keyword("FINAL_ANSWER"), EarlyStopCondition.on_token_limit(10000), ], pruning_config=PruningConfig(token_budget=15000), ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round( graph, filter_unreachable=True # Exclude isolated nodes ) # Optimization analysis print(f"Agents executed: {len(result.execution_order)}") print(f"Pruned: {len(result.pruned_agents or [])}") print(f"Early stopped: {result.early_stopped}") print(f"Modifications: {result.topology_modifications}") print(f"Tokens: {result.total_tokens}") ``` --- ### Multi-Model Support (Multi-Model Support) Each agent in the graph can use its own LLM model with individual settings. This makes it possible to: - **Optimize costs** β€” use expensive models only for complex tasks - **Balance performance** β€” fast models for simple operations - **Specialize agents** β€” models trained for specific domains - **Hybrid solutions** β€” combine cloud and local models #### Multi-model architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ TASK NODE β”‚ β”‚ "Analyze the market" β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β” β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ ANALYST β”‚ β”‚ COORDINATOR β”‚ β”‚ │──▢│ β”‚ β”‚ GPT-4 β”‚ β”‚ GPT-4o-mini β”‚ β”‚ temp: 0.0 β”‚ β”‚ temp: 0.3 β”‚ β”‚ tokens: 4000 β”‚ β”‚ tokens: 1000 β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` --- #### Key components **1. LLMConfig** β€” an agent’s LLM configuration ```python from core.schema import LLMConfig llm_config = LLMConfig( model_name="gpt-4", # Model name base_url="https://api.openai.com/v1", # API endpoint api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR) max_tokens=2000, # Max tokens in the response temperature=0.7, # Generation temperature timeout=60.0, # Request timeout top_p=0.9, # Nucleus sampling stop_sequences=["END"], # Stop sequences ) # Validate configuration if llm_config.is_configured(): params = llm_config.to_generation_params() print(f"Generation params: {params}") # Merge configurations (fallback) default_config = LLMConfig(model_name="gpt-4o-mini", temperature=0.5) final_config = llm_config.merge_with(default_config) ``` **2. AgentLLMConfig** β€” an immutable configuration for AgentProfile ```python from core.agent import AgentLLMConfig agent_llm_config = AgentLLMConfig( model_name="gpt-4", base_url="https://api.openai.com/v1", api_key="sk-...", temperature=0.7, max_tokens=2000, ) # Convert to LLMConfig llm_config = agent_llm_config.to_llm_config() ``` **3. LLMCallerFactory** β€” a factory for creating LLM callers ```python from execution import LLMCallerFactory # Create a factory for OpenAI-compatible APIs factory = LLMCallerFactory.create_openai_factory( default_model="gpt-4o-mini", default_base_url="https://api.openai.com/v1", default_api_key="sk-...", default_temperature=0.7, default_max_tokens=2000, ) # The factory automatically creates callers based on AgentLLMConfig # when used with MACPRunner ``` **4. Caller factory helpers** Three ready-made functions cover the most common setups: | Function | Interface | Use with | |---|---|---| | `create_openai_caller()` | `(str) -> str` | Legacy `llm_caller` | | `create_openai_structured_caller()` | `(list[dict]) -> str` | `structured_llm_caller` βœ… **recommended** | | `create_openai_async_structured_caller()` | `async (list[dict]) -> str` | `async_structured_llm_caller` βœ… parallel | ```python from execution import ( create_openai_caller, create_openai_structured_caller, create_openai_async_structured_caller, ) # ── Legacy flat-string caller ──────────────────────────────────────────────── caller = create_openai_caller( model="gpt-4", base_url="https://api.openai.com/v1", api_key="sk-...", temperature=0.7, max_tokens=2000, ) response = caller("What is 2+2?") # (str) -> str # ── Structured sync caller (recommended for chat LLMs) ────────────────────── sync_caller = create_openai_structured_caller( api_key="sk-...", model="gpt-4o", temperature=0.7, max_tokens=1024, ) # Use as: MACPRunner(structured_llm_caller=sync_caller) # ── Structured async caller (required for parallel astream) ───────────────── async_caller = create_openai_async_structured_caller( api_key="sk-...", model="gpt-4o", temperature=0.7, max_tokens=1024, ) # Use as: MACPRunner(async_structured_llm_caller=async_caller) # ── Full parallel setup ────────────────────────────────────────────────────── from execution import MACPRunner, RunnerConfig runner = MACPRunner( structured_llm_caller=sync_caller, async_structured_llm_caller=async_caller, config=RunnerConfig(enable_parallel=True), ) # Sequential graphs β†’ stream() uses sync_caller for event in runner.stream(graph): ... # Parallel graphs β†’ astream() uses async_caller for concurrent groups import asyncio async def run(): async for event in runner.astream(graph): ... asyncio.run(run()) ``` --- #### Ways to configure multi-model support ##### Method 1: Via GraphBuilder (recommended) ```python from builder import GraphBuilder from execution import MACPRunner, LLMCallerFactory builder = GraphBuilder() # Agent 1: strong model for analysis builder.add_agent( agent_id="analyst", display_name="Senior Analyst", persona="Expert data analyst with deep domain knowledge", llm_backbone="gpt-4", # Or model_name base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, # Strict analysis max_tokens=4000, timeout=120.0, ) # Agent 2: weaker model for formatting builder.add_agent( agent_id="formatter", display_name="Report Formatter", persona="Formats data into readable reports", llm_backbone="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.3, max_tokens=1000, timeout=30.0, ) # Agent 3: local model for confidential data builder.add_agent( agent_id="privacy_checker", display_name="Privacy Checker", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1", # Ollama api_key="not-needed", temperature=0.1, max_tokens=500, ) builder.add_workflow_edge("analyst", "formatter") builder.add_workflow_edge("analyst", "privacy_checker") graph = builder.build() # The factory will automatically create callers for each agent factory = LLMCallerFactory.create_openai_factory() runner = MACPRunner(llm_factory=factory) result = runner.run_round(graph) print(f"Final answer: {result.final_answer}") ``` ##### Method 2: Explicit LLMConfig ```python from core.schema import LLMConfig # Predefined configurations gpt4_config = LLMConfig( model_name="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.7, max_tokens=2000, ) gpt4_mini_config = LLMConfig( model_name="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.5, max_tokens=1000, ) builder = GraphBuilder() builder.add_agent( "researcher", display_name="Researcher", llm_config=gpt4_config, # Pass a ready configuration ) builder.add_agent( "writer", display_name="Writer", llm_config=gpt4_mini_config, ) graph = builder.build() ``` ##### Method 3: llm_callers dictionary ```python from execution import create_openai_caller # Create callers manually callers = { "analyst": create_openai_caller( model="gpt-4", temperature=0.0, max_tokens=4000, ), "formatter": create_openai_caller( model="gpt-4o-mini", temperature=0.3, max_tokens=1000, ), "privacy_checker": create_openai_caller( model="llama3:70b", base_url="http://localhost:11434/v1", api_key="not-needed", ), } # Pass directly into the runner runner = MACPRunner(llm_callers=callers) result = runner.run_round(graph) ``` ##### Method 4: Combined approach ```python # Use the factory as default, but override for some agents factory = LLMCallerFactory.create_openai_factory( default_model="gpt-4o-mini", # Default ) # Create a custom caller for a specific agent specialized_caller = create_openai_caller( model="gpt-4", temperature=0.0, max_tokens=4000, ) runner = MACPRunner( llm_factory=factory, # For all agents llm_callers={"analyst": specialized_caller}, # Override for analyst ) ``` --- #### LLM caller resolution priority ``` 1. llm_callers[agent_id] ← Explicitly provided caller ↓ 2. llm_factory.get_caller() ← Factory creates based on agent.llm_config ↓ 3. llm_caller ← Default caller for all agents ↓ 4. Exception ← Error: no caller specified ``` --- #### Usage examples ##### Example 1: Cost optimization ```python # Cheap model for routine operations, expensive one for complex tasks builder = GraphBuilder() # 5 simple analysts (cheap model) for i in range(5): builder.add_agent( f"analyst_{i}", display_name=f"Junior Analyst {i}", llm_backbone="gpt-4o-mini", temperature=0.3, max_tokens=500, ) builder.add_workflow_edge(f"analyst_{i}", "senior") # 1 senior analyst (expensive model) builder.add_agent( "senior", display_name="Senior Analyst", llm_backbone="gpt-4", temperature=0.7, max_tokens=4000, ) graph = builder.build() # Savings: ~80% of tokens use the cheap model ``` ##### Example 2: Hybrid solution (cloud + local model) ```python builder = GraphBuilder() # Public data β†’ cloud model builder.add_agent( "public_analyzer", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", ) # Confidential data β†’ local model builder.add_agent( "private_analyzer", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1", api_key="not-needed", ) # Aggregator β†’ cheap cloud model builder.add_agent( "aggregator", llm_backbone="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", ) builder.add_workflow_edge("public_analyzer", "aggregator") builder.add_workflow_edge("private_analyzer", "aggregator") graph = builder.build() ``` ##### Example 3: Specialized models ```python builder = GraphBuilder() # Medical expert β†’ a model trained on medical data builder.add_agent( "medical_expert", llm_backbone="medical-llm-v2", base_url="https://medical-api.example.com/v1", api_key="$MEDICAL_API_KEY", temperature=0.0, # Strict medical recommendations ) # Legal expert β†’ a model trained on legal texts builder.add_agent( "legal_expert", llm_backbone="legal-llm-v3", base_url="https://legal-api.example.com/v1", api_key="$LEGAL_API_KEY", temperature=0.0, ) # Coordinator β†’ general model builder.add_agent( "coordinator", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.5, ) builder.add_workflow_edge("medical_expert", "coordinator") builder.add_workflow_edge("legal_expert", "coordinator") graph = builder.build() ``` ##### Example 4: Different temperatures for different styles ```python builder = GraphBuilder() # Creative writer (high temperature) builder.add_agent( "creative_writer", llm_backbone="gpt-4", temperature=0.9, # Creativity max_tokens=2000, ) # Strict editor (low temperature) builder.add_agent( "strict_editor", llm_backbone="gpt-4", temperature=0.1, # Precision max_tokens=1500, ) # Final formatter (medium temperature) builder.add_agent( "formatter", llm_backbone="gpt-4o-mini", temperature=0.5, # Balance max_tokens=1000, ) builder.add_workflow_edge("creative_writer", "strict_editor") builder.add_workflow_edge("strict_editor", "formatter") graph = builder.build() ``` --- #### Supported providers The framework supports **any OpenAI-compatible API**: | Provider | Base URL | Notes | |----------|----------|-------| | **OpenAI** | `https://api.openai.com/v1` | GPT-4, GPT-4o-mini, GPT-3.5-turbo | | **Anthropic** | via wrapper | Claude (requires an adapter) | | **Ollama** | `http://localhost:11434/v1` | Local models (llama3, mistral, etc.) | | **vLLM** | custom | Self-hosted models | | **LiteLLM** | custom | Unified API for all providers | | **Azure OpenAI** | `https://.openai.azure.com/` | Azure-hosted models | | **GigaChat** | custom | Sber models | | **Cloudflare Tunnels** | custom | Via Cloudflare tunnels | ```python # Examples for different providers # OpenAI builder.add_agent("agent1", llm_backbone="gpt-4", base_url="https://api.openai.com/v1") # Ollama (local) builder.add_agent("agent2", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1") # Azure OpenAI builder.add_agent("agent3", llm_backbone="gpt-4", base_url="https://myresource.openai.azure.com/") # GigaChat builder.add_agent("agent4", llm_backbone="GigaChat-Lightning", base_url="https://gigachat-api.trycloudflare.com/v1") # vLLM builder.add_agent("agent5", llm_backbone="./models/Qwen3-80B", base_url="https://my-vllm-server.com/v1") ``` --- #### Async and streaming support ```python from execution import create_openai_caller # Async caller per agent async_callers = { "agent1": create_openai_caller(model="gpt-4", is_async=True), "agent2": create_openai_caller(model="gpt-4o-mini", is_async=True), } runner = MACPRunner(async_llm_callers=async_callers) result = await runner.arun_round(graph) # Streaming callers streaming_callers = { "agent1": create_openai_caller(model="gpt-4", is_streaming=True), "agent2": create_openai_caller(model="gpt-4o-mini", is_streaming=True), } runner = MACPRunner(streaming_llm_callers=streaming_callers) for event in runner.stream(graph): if event.event_type == StreamEventType.TOKEN: print(f"[{event.agent_id}] {event.token}", end="") ``` --- #### API key handling ```python # 1. Direct builder.add_agent("agent", api_key="sk-...") # 2. From an environment variable (recommended) builder.add_agent("agent", api_key="$OPENAI_API_KEY") # When parsing, it is automatically resolved as os.getenv("OPENAI_API_KEY") # 3. From a file import os os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip() builder.add_agent("agent", api_key="$OPENAI_API_KEY") ``` --- #### Monitoring multi-model execution ```python from core.metrics import MetricsTracker tracker = MetricsTracker() runner = MACPRunner( llm_factory=factory, metrics_tracker=tracker, ) result = runner.run_round(graph) # Per-model analysis for agent_id in graph.node_ids: agent = graph.get_agent_by_id(agent_id) model = agent.llm_config.model_name if agent.llm_config else "default" metrics = tracker.get_node_metrics(agent_id) print(f"\n{agent_id} ({model}):") print(f" Latency: {metrics.avg_latency_ms:.0f}ms") print(f" Tokens: {metrics.total_cost_tokens}") print(f" Reliability: {metrics.reliability:.2%}") ``` --- #### Backward compatibility Old code **continues to work** without changes: ```python # Old approach (one LLM for all agents) runner = MACPRunner(llm_caller=my_llm) result = runner.run_round(graph) # βœ… Works as before # New approach (multi-model) runner = MACPRunner(llm_factory=factory) result = runner.run_round(graph) # βœ… Uses per-agent models ``` --- ### Structured Prompt β€” modern chat LLMs (recommended) > **TL;DR** β€” if you use OpenAI, GigaChat, Anthropic, or any other > chat-completion API, pass `structured_llm_caller` instead of the > legacy `llm_caller`. The runner will send proper `system` / `user` > roles to the LLM instead of one flat string. This produces shorter, > more focused responses and saves tokens β€” especially in long agent chains. #### The problem with the legacy `llm_caller` The classic `llm_caller: Callable[[str], str]` interface passes the entire prompt as a **single flat string**, combining persona, description, task and messages from other agents: ``` "You are a mathematician.\n\nSolve step by step.\n\nTask: ...\n\nMessages from other agents:\n..." ``` Modern chat LLMs (OpenAI GPT-4, GigaChat, Claude, Gemini…) expect messages to be split into **roles** (`system`, `user`, `assistant`). When everything arrives in one blob the model has to re-parse it, which leads to: - πŸ”΄ **Verbose, padded responses** β€” the model does not know how strictly to follow the system instruction - πŸ”΄ **Token accumulation** β€” long chains accumulate more and more context - πŸ”΄ **Lower instruction-following quality** β€” especially for role-specific behaviour #### The fix: `structured_llm_caller` `MACPRunner` now supports a second caller interface that receives a `list[dict[str, str]]` β€” exactly what the OpenAI chat completions API expects: The full message list produced by `_build_prompt` is: ```python [ # 1. system β€” persona, description, tools hint, output_schema instruction {"role": "system", "content": "You are a mathematician. Solve step by step.\n\nAvailable tools: calculator.\n\nRespond with JSON matching: {\"type\":\"object\",...}"}, # 2..N-1. agent.state β€” previous conversation turns replayed with correct roles {"role": "assistant", "content": "Previous answer turn 1…"}, {"role": "user", "content": "Follow-up question turn 2…"}, # … (as many entries as agent.state contains) # N. user β€” current task, input_schema hint, memory context, incoming agent messages {"role": "user", "content": "Task: 3xΒ² - 7x + 2 = 0\n\nInput format: {...}\n\nMessages from other agents: ..."}, ] ``` The runner builds this automatically inside `_build_prompt` β†’ `StructuredPrompt` and dispatches via `_call_llm`. No parsing, no heuristics, no hacks. --- #### How it works internally ``` _build_prompt() β”‚ └─► StructuredPrompt β”œβ”€β”€ .text β†’ flat string (used by legacy llm_caller) └── .messages β†’ list[dict] (used by structured_llm_caller) MACPRunner._call_llm(caller, prompt) β”œβ”€β”€ if structured_llm_caller is set β†’ calls structured_llm_caller(prompt.messages) └── else β†’ calls caller(prompt.text) # backward compat ``` Both representations are always built β€” switching between interfaces requires **zero changes** to graph/agent code. > **What goes where in `messages`:** > > | Source field | Role | Note | > |---|---|---| > | `persona` + `description` | `system` | Always first message | > | tool names (`has_tools()`) | `system` | Appended to system content | > | `output_schema` | `system` | `"Respond with JSON matching: …"` | > | `agent.state` entries | `assistant`/`user` | Replayed in order between system and final user | > | query + `input_schema` + memory + incoming msgs | `user` | Always last message | --- #### Built-in factory helpers (recommended, zero boilerplate) The framework ships ready-made factory functions so you don't need to write any boilerplate caller code yourself: ```python from execution import ( MACPRunner, RunnerConfig, create_openai_structured_caller, # sync β€” for stream() / run_round() create_openai_async_structured_caller, # async β€” for astream() / arun_round() ) # ── Sequential graphs (chains, single agent) ──────────────────────────────── runner = MACPRunner( structured_llm_caller=create_openai_structured_caller( api_key="sk-...", base_url="https://api.openai.com/v1", model="gpt-4o", temperature=0.7, max_tokens=1024, ), ) for event in runner.stream(graph): ... # ── Parallel graphs (fan-in, fan-out) ────────────────────────────────────── runner = MACPRunner( structured_llm_caller=create_openai_structured_caller( api_key="sk-...", model="gpt-4o" ), async_structured_llm_caller=create_openai_async_structured_caller( api_key="sk-...", model="gpt-4o" ), config=RunnerConfig(enable_parallel=True), ) async for event in runner.astream(graph): ... ``` > **Why two callers for parallel mode?** `stream()` is synchronous and > uses `structured_llm_caller`. `astream()` with `enable_parallel=True` > runs independent agents concurrently via `asyncio.gather` and therefore > requires `async_structured_llm_caller`. For purely sequential graphs > only the sync caller is needed. --- #### Quick start (manual caller) If you need custom logic (retries, logging, token tracking), write the caller yourself β€” the interface is a simple function: ```python from openai import OpenAI from execution import MACPRunner, RunnerConfig client = OpenAI(api_key="sk-...") def my_structured_caller(messages: list[dict[str, str]]) -> str: """Drop-in replacement for any str->str llm_caller.""" resp = client.chat.completions.create( model="gpt-4o", messages=messages, # passed through as-is max_tokens=1024, temperature=0.7, ) return resp.choices[0].message.content or "" runner = MACPRunner( structured_llm_caller=my_structured_caller, config=RunnerConfig(timeout=60.0), ) result = runner.run_round(graph) print(result.final_answer) ``` #### Async variant (manual caller) ```python import asyncio from openai import AsyncOpenAI aclient = AsyncOpenAI(api_key="sk-...") async def my_async_structured_caller(messages: list[dict[str, str]]) -> str: resp = await aclient.chat.completions.create( model="gpt-4o", messages=messages, max_tokens=1024, ) return resp.choices[0].message.content or "" runner = MACPRunner(async_structured_llm_caller=my_async_structured_caller) result = await runner.arun_round(graph) ``` --- #### Tracking tokens (benchmark pattern) When you need to count tokens across many agents (e.g. for benchmarks), wrap the OpenAI client to intercept `usage`: ```python from openai import OpenAI class TrackedLLM: def __init__(self, api_key, base_url, model): self._client = OpenAI(api_key=api_key, base_url=base_url) self._model = model self.total_tokens = 0 self.call_count = 0 def reset(self): self.total_tokens = 0 self.call_count = 0 def chat(self, system: str, user: str, max_tokens: int = 1024) -> str: messages = [] if system: messages.append({"role": "system", "content": system}) messages.append({"role": "user", "content": user}) resp = self._client.chat.completions.create( model=self._model, messages=messages, temperature=0.7, max_tokens=max_tokens, ) self.total_tokens += resp.usage.total_tokens if resp.usage else 0 self.call_count += 1 return resp.choices[0].message.content or "" def as_structured_caller(self, max_tokens: int = 1024): """Return a structured_llm_caller for MACPRunner.""" def _caller(messages: list[dict[str, str]]) -> str: system = next((m["content"] for m in messages if m["role"] == "system"), "") user = next((m["content"] for m in messages if m["role"] == "user"), "") return self.chat(system, user, max_tokens=max_tokens) return _caller llm = TrackedLLM(api_key="...", base_url="...", model="gpt-4o") runner = MACPRunner( structured_llm_caller=llm.as_structured_caller(max_tokens=1024), ) result = runner.run_round(graph) print(f"Tokens used: {llm.total_tokens}, calls: {llm.call_count}") ``` --- #### Caller priority All caller types can coexist. The resolution priority is: ``` structured_llm_caller ← Used for ALL plain agent calls when set β”‚ β”‚ (automatic strβ†’str wrapper also registered as llm_caller β”‚ for internal checks β€” no code change needed) β–Ό llm_callers[agent_id] ← Per-agent override (always takes precedence) β–Ό llm_factory ← Factory by AgentLLMConfig β–Ό llm_caller ← Legacy default ``` You can mix `structured_llm_caller` (global default) with per-agent `llm_callers` overrides β€” the structured caller will be used for all agents that don't have an explicit override. --- #### Providers comparison | Provider | Recommended interface | Notes | |---|---|---| | **OpenAI** (GPT-4o, GPT-4, …) | `structured_llm_caller` βœ… | Native chat completions | | **GigaChat / Sber** | `structured_llm_caller` βœ… | OpenAI-compatible API | | **Anthropic Claude** | `structured_llm_caller` βœ… | Via adapter or LiteLLM | | **Ollama** (local) | `structured_llm_caller` βœ… | OpenAI-compatible `/v1/chat/completions` | | **vLLM** | `structured_llm_caller` βœ… | OpenAI-compatible server | | **Azure OpenAI** | `structured_llm_caller` βœ… | Same API, different base URL | | **Custom / non-chat API** | `llm_caller` (legacy) | Falls back to flat string | --- #### Benchmark results (gMAS vs LangGraph) The table below was measured with `examples/benchmark_vs_langgraph.py --runs 10` using `structured_llm_caller`. LangGraph uses an equivalent explicit `system` / `user` split on its side. | Test topology | LangGraph time | gMAS time | Token Ξ” | |---|---|---|---| | Single agent (1) | baseline | ~+10% | ~+10% | | Chain of 3 (3) | baseline | **βˆ’18 %** | **βˆ’11 %** | | Fan-in 2β†’1 (3) | baseline | **βˆ’30 %** | **βˆ’22 %** | | Chain of 7 (7) | baseline | **βˆ’10 %** | **βˆ’17 %** | | Fan-out 1β†’3β†’1 (5) | baseline | **βˆ’19 %** | **βˆ’13 %** | > Single-agent test is slightly slower in gMAS due to protocol overhead; > this overhead amortises quickly as the number of agents grows. --- #### Migration from `llm_caller` to `structured_llm_caller` No changes to graph or agent code are required. Only the runner instantiation changes: ```python # Before (legacy) runner = MACPRunner(llm_caller=lambda prompt: my_model(prompt)) # After (recommended) runner = MACPRunner( structured_llm_caller=lambda messages: my_model_chat(messages) ) ``` Both interfaces are fully supported. The legacy `llm_caller` is not deprecated and will not be removed. --- ### Dynamic Topology #### Static graph modification Modify the graph structure before execution: ```python # Add a new agent new_agent = AgentProfile(agent_id="expert", display_name="Expert") graph.add_node(new_agent, connections_to=["checker"]) # Change connections graph.add_edge("solver", "expert", weight=0.9) graph.remove_edge("solver", "checker") # Disable nodes (without deletion) graph.disable("expensive_agent") # Will not run, but remains in the graph # Full topology update from a matrix import torch new_adjacency = torch.tensor([ [0, 1, 0], [0, 0, 1], [0, 0, 0], ], dtype=torch.float32) graph.update_communication( new_adjacency, s_tilde=score_matrix, # Connection quality scores p_matrix=probability_matrix # Transition probabilities ) ``` #### Runtime modification (during execution) A powerful feature for modifying the graph **during a round** based on intermediate results: ##### Early stopping (Early Stopping) ```python from execution import EarlyStopCondition, RunnerConfig # 1. By keyword in the response stop_on_answer = EarlyStopCondition.on_keyword( "FINAL ANSWER", reason="Answer found" ) # 2. By token limit stop_on_tokens = EarlyStopCondition.on_token_limit( max_tokens=5000, reason="Token budget exceeded" ) # 3. By number of executed agents stop_on_count = EarlyStopCondition.on_agent_count( max_agents=5, reason="Sufficient agents executed" ) # 4. By a metadata value (for RL, metrics) stop_on_quality = EarlyStopCondition.on_metadata( "quality_score", 0.95, comparator=lambda v, threshold: v > threshold, reason="Quality threshold reached" ) # 5. Custom condition stop_custom = EarlyStopCondition.on_custom( condition=lambda ctx: my_rl_agent.should_stop(ctx.messages), reason="RL agent decided to stop", min_agents_executed=2 # At least 2 agents before checking ) # 6. Combine conditions (OR) stop_any = EarlyStopCondition.combine_any([ EarlyStopCondition.on_keyword("DONE"), EarlyStopCondition.on_token_limit(10000), stop_on_quality, ]) # 7. Combine conditions (AND) stop_all = EarlyStopCondition.combine_all([ EarlyStopCondition.on_keyword("answer"), stop_on_quality, ]) # Usage config = RunnerConfig( early_stop_conditions=[stop_on_answer, stop_on_tokens] ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round(graph) if result.early_stopped: print(f"Stopped: {result.early_stop_reason}") print(f"Saved: {len(graph.node_ids) - len(result.execution_order)} agents") ``` ##### Topology Hooks (on-the-fly graph modification) ```python from execution import TopologyAction, StepContext, RunnerConfig def my_topology_hook(ctx: StepContext, graph) -> TopologyAction: """Called after each execution step. StepContext contains: - agent_id: current agent - response: its response - messages: all responses so far - execution_order: execution order - remaining_agents: remaining agents - total_tokens: tokens used - metadata: arbitrary data """ # 1. Early stopping based on custom logic if "TASK_COMPLETE" in (ctx.response or ""): return TopologyAction( early_stop=True, early_stop_reason="Task marked as complete" ) # 2. Add an edge if quality is low if ctx.metadata.get("quality", 1.0) < 0.5: return TopologyAction( add_edges=[ (ctx.agent_id, "reviewer_agent", 1.0), ], trigger_rebuild=True # Re-plan remaining steps ) # 3. Remove an edge if some_condition: return TopologyAction( remove_edges=[ ("agent1", "agent2"), ] ) # 4. Skip upcoming agents if ctx.total_tokens > 8000: return TopologyAction( skip_agents=["expensive_agent1", "expensive_agent2"] ) # 5. Force execution of agents if needs_expert_review: return TopologyAction( force_agents=["expert_reviewer"] ) # 6. Change the final agent if early_finish: return TopologyAction( new_end_agent="quick_finalizer" ) return None # No changes # Async hook for integration with RL, APIs, etc. async def rl_topology_hook(ctx: StepContext, graph) -> TopologyAction: """Async hook for more complex logic.""" # You can call async APIs, RL models, etc. decision = await my_rl_agent.get_topology_decision( messages=ctx.messages, graph_state=graph.to_dict() ) if decision.add_connection: return TopologyAction( add_edges=[(decision.from_node, decision.to_node, decision.weight)] ) return None # Usage config = RunnerConfig( enable_dynamic_topology=True, topology_hooks=[my_topology_hook], async_topology_hooks=[rl_topology_hook], ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round(graph) print(f"Topology modifications: {result.topology_modifications}") ``` ##### Example: RL-controlled topology ```python import torch from your_rl_agent import RLAgent class TopologyRL: def __init__(self): self.rl_agent = RLAgent() def should_stop(self, ctx: StepContext) -> bool: """RL-agent decision for early stopping.""" state = self.encode_state(ctx) action = self.rl_agent.predict(state) return action == "STOP" def get_topology_action(self, ctx: StepContext) -> TopologyAction | None: """RL agent decides how to change topology.""" state = self.encode_state(ctx) action = self.rl_agent.predict(state) if action == "ADD_REVIEWER": return TopologyAction( add_edges=[(ctx.agent_id, "reviewer", 1.0)], trigger_rebuild=True ) elif action == "SKIP_EXPENSIVE": return TopologyAction( skip_agents=["expensive_model"] ) return None def encode_state(self, ctx: StepContext) -> torch.Tensor: # Encode state for RL return torch.tensor([ len(ctx.messages), ctx.total_tokens, len(ctx.remaining_agents), ]) # Usage rl_controller = TopologyRL() config = RunnerConfig( enable_dynamic_topology=True, early_stop_conditions=[ EarlyStopCondition.on_custom( rl_controller.should_stop, reason="RL decided to stop" ) ], topology_hooks=[rl_controller.get_topology_action], ) ``` ##### Full example: adaptive system ```python from execution import ( GraphBuilder, MACPRunner, RunnerConfig, EarlyStopCondition, TopologyAction, StepContext ) # Build the graph builder = GraphBuilder() builder.add_agent("input", persona="Input processor") builder.add_agent("solver", persona="Problem solver") builder.add_agent("checker", persona="Solution checker") builder.add_agent("expensive_expert", persona="Expert (expensive)") builder.add_agent("output", persona="Output formatter") builder.add_workflow_edge("input", "solver") builder.add_workflow_edge("solver", "checker") builder.add_workflow_edge("checker", "output") # expensive_expert is connected dynamically builder.set_start_node("input") builder.set_end_node("output") builder.add_task(query="Solve the complex problem") builder.connect_task_to_agents() graph = builder.build() # Hooks for adaptation def adaptive_hook(ctx: StepContext, graph) -> TopologyAction: # If checker found an issue β€” add expert if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""): return TopologyAction( add_edges=[("checker", "expensive_expert", 1.0), ("expensive_expert", "output", 1.0)], trigger_rebuild=True ) # If solver produced a good answer β€” skip checker if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95: return TopologyAction( skip_agents=["checker"], reason="High confidence, skipping validation" ) return None # Configure runner config = RunnerConfig( adaptive=True, enable_dynamic_topology=True, topology_hooks=[adaptive_hook], early_stop_conditions=[ EarlyStopCondition.on_keyword("FINAL_ANSWER"), EarlyStopCondition.on_token_limit(10000), ], ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round( graph, filter_unreachable=True # Exclude isolated nodes ) # Result print(f"Executed: {result.execution_order}") print(f"Early stopped: {result.early_stopped}") print(f"Topology mods: {result.topology_modifications}") print(f"Tokens saved: calculated from pruned_agents") ``` --- ### GNN Routing (Graph Neural Networks for Routing) Using graph neural networks for **learnable** optimal routing based on execution history. #### Overview of GNN models | Model | Description | When to use | |------|-------------|-------------| | **GCN** (Graph Convolutional Network) | Classic convolution for graphs | Homogeneous graphs, simple tasks | | **GAT** (Graph Attention Network) | Uses an attention mechanism | Edge importance varies | | **GraphSAGE** | Neighbor sampling for large graphs | Large graphs, inductive learning | | **GIN** (Graph Isomorphism Network) | Maximally expressive architecture | Complex patterns, small graphs | --- #### Full example: training a GNN router ```python from core.gnn import ( create_gnn_router, GNNTrainer, GNNRouterInference, GNNModelType, TrainingConfig, FeatureConfig, RoutingStrategy, DefaultFeatureGenerator, ) from core.metrics import MetricsTracker import torch from torch_geometric.data import Data # ========== STEP 1: Collect execution data ========== tracker = MetricsTracker() # Run multiple rounds to accumulate metrics for i in range(100): result = runner.run_round(graph) # Record per-node metrics for agent_id in result.execution_order: response = result.messages[agent_id] tracker.record_node_execution( node_id=agent_id, success=True, latency_ms=response["latency"], cost_tokens=response["tokens"], quality=evaluate_quality(response["content"]), ) # Record edge traversal metrics for i, agent_id in enumerate(result.execution_order[:-1]): next_agent = result.execution_order[i + 1] tracker.record_edge_traversal( source=agent_id, target=next_agent, weight=graph.get_edge_weight(agent_id, next_agent), success=True, latency_ms=50, ) # ========== STEP 2: Feature generation ========== feature_config = FeatureConfig( include_degree=True, # Node degrees include_centrality=True, # Centrality (betweenness, closeness) include_embeddings=True, # Agent embeddings include_metrics=True, # Performance metrics include_structural=True, # Structural features (clustering coef) normalize=True, # Feature normalization ) feature_gen = DefaultFeatureGenerator(config=feature_config) node_features = feature_gen.generate_node_features( graph, graph.node_ids, tracker, ) # Shape: (num_nodes, feature_dim) edge_features = feature_gen.generate_edge_features( graph, tracker, ) # Shape: (num_edges, edge_feature_dim) print(f"Node features shape: {node_features.shape}") print(f"Edge features shape: {edge_features.shape}") # ========== STEP 3: Prepare the dataset ========== # Create PyTorch Geometric Data objects train_data_list = [] val_data_list = [] for sample in dataset: # Your dataset with execution history data = Data( x=sample['node_features'], # Node features edge_index=sample['edge_index'], # Edge connections (2, E) edge_attr=sample['edge_features'], # Edge features y=sample['labels'], # Labels (optimal next node, quality score, etc.) ) if sample['is_train']: train_data_list.append(data) else: val_data_list.append(data) # ========== STEP 4: Training configuration ========== training_config = TrainingConfig( # Hyperparameters learning_rate=1e-3, hidden_dim=64, num_layers=3, dropout=0.2, # Training epochs=100, batch_size=32, patience=10, # Early stopping # Task task="node_classification", # or "link_prediction", "graph_regression" num_classes=2, # For classification # Optimization optimizer="adam", # adam, sgd, adamw weight_decay=1e-5, scheduler="reduce_on_plateau", # step, cosine, reduce_on_plateau # Device device="cuda" if torch.cuda.is_available() else "cpu", # Logging log_interval=10, save_best=True, ) # ========== STEP 5: Create the model ========== # 5.1. GCN (Graph Convolutional Network) model_gcn = create_gnn_router( model_type=GNNModelType.GCN, in_channels=node_features.shape[1], out_channels=training_config.num_classes, config=training_config, ) # 5.2. GAT (Graph Attention Network) model_gat = create_gnn_router( model_type=GNNModelType.GAT, in_channels=node_features.shape[1], out_channels=training_config.num_classes, config=training_config, heads=4, # Number of attention heads concat=True, # Concatenate heads or average ) # 5.3. GraphSAGE model_sage = create_gnn_router( model_type=GNNModelType.GraphSAGE, in_channels=node_features.shape[1], out_channels=training_config.num_classes, config=training_config, aggr="mean", # mean, max, lstm ) # 5.4. GIN (Graph Isomorphism Network) model_gin = create_gnn_router( model_type=GNNModelType.GIN, in_channels=node_features.shape[1], out_channels=training_config.num_classes, config=training_config, train_eps=True, # Trainable epsilon ) # ========== STEP 6: Train ========== trainer = GNNTrainer(model_gat, training_config) training_result = trainer.train( train_data_list, val_data_list, verbose=True, ) print(f"Best validation accuracy: {training_result['best_val_acc']:.3f}") print(f"Best epoch: {training_result['best_epoch']}") print(f"Training time: {training_result['training_time']:.2f}s") # Save the model trainer.save("gnn_router.pt") # Load the model trainer.load("gnn_router.pt") # ========== STEP 7: Inference ========== router = GNNRouterInference( model=model_gat, feature_generator=feature_gen, ) # 7.1. Predict the next node (node selection) prediction = router.predict( graph, source="coordinator", candidates=["researcher", "analyst", "writer"], metrics_tracker=tracker, strategy=RoutingStrategy.ARGMAX, # ARGMAX, TOP_K, SAMPLING, THRESHOLD ) print(f"Recommended nodes: {prediction.recommended_nodes}") print(f"Scores: {prediction.scores}") print(f"Confidence: {prediction.confidence:.3f}") # 7.2. Top-K prediction prediction_topk = router.predict( graph, source="coordinator", candidates=["a", "b", "c", "d"], strategy=RoutingStrategy.TOP_K, k=2, # Return top 2 ) print(f"Top 2: {prediction_topk.recommended_nodes}") # 7.3. Probabilistic sampling prediction_sample = router.predict( graph, source="coordinator", candidates=candidates, strategy=RoutingStrategy.SAMPLING, temperature=0.8, # Sampling temperature ) # 7.4. Threshold filtering prediction_threshold = router.predict( graph, source="coordinator", candidates=candidates, strategy=RoutingStrategy.THRESHOLD, threshold=0.7, # Only nodes with prob > 0.7 ) # ========== STEP 8: Integrate with AdaptiveScheduler ========== from execution import AdaptiveScheduler, RoutingPolicy scheduler = AdaptiveScheduler( policy=RoutingPolicy.GNN_BASED, gnn_router=router, gnn_threshold=0.6, # Min confidence to use the GNN fallback_policy=RoutingPolicy.WEIGHTED_TOPO # Fallback on low confidence ) plan = scheduler.build_plan( graph.A_com, graph.node_ids, metrics_tracker=tracker, ) # ========== STEP 9: Monitoring and fine-tuning ========== # Collect new data after deployment new_data = [] for i in range(20): result = runner.run_round(graph) # ... record data ... new_data.append(create_data_sample(result)) # Fine-tune trainer.fine_tune( new_data, epochs=10, learning_rate=1e-4, ) trainer.save("gnn_router_finetuned.pt") # ========== Evaluation ========== from core.gnn import evaluate_router metrics = evaluate_router( router, test_data_list, metrics=["accuracy", "f1", "precision", "recall"], ) print(f"Test accuracy: {metrics['accuracy']:.3f}") print(f"F1 score: {metrics['f1']:.3f}") ``` --- #### Comparing GNN models ```python # Experiment: compare performance across models models = { "GCN": create_gnn_router(GNNModelType.GCN, in_channels, out_channels, config), "GAT": create_gnn_router(GNNModelType.GAT, in_channels, out_channels, config), "GraphSAGE": create_gnn_router(GNNModelType.GraphSAGE, in_channels, out_channels, config), "GIN": create_gnn_router(GNNModelType.GIN, in_channels, out_channels, config), } results = {} for name, model in models.items(): trainer = GNNTrainer(model, training_config) result = trainer.train(train_data, val_data) results[name] = result # Comparison import pandas as pd df = pd.DataFrame([ { "Model": name, "Val Acc": res["best_val_acc"], "Train Time": res["training_time"], "Params": sum(p.numel() for p in models[name].parameters()), } for name, res in results.items() ]) print(df) # Output: # | Model | Val Acc | Train Time | Params | # |-----------|---------|------------|---------| # | GCN | 0.853 | 12.5s | 45123 | # | GAT | 0.891 | 18.3s | 67891 | # | GraphSAGE | 0.874 | 15.2s | 52341 | # | GIN | 0.867 | 14.8s | 48976 | ``` --- #### Production usage ```python # Load a trained model router = GNNRouterInference.load("gnn_router.pt", feature_gen) # Integrate with the runner config = RunnerConfig( adaptive=True, routing_policy=RoutingPolicy.GNN_BASED, ) runner = MACPRunner( llm_caller=my_llm, config=config, gnn_router=router, metrics_tracker=tracker, ) # Execute with GNN routing result = runner.run_round(graph) # Monitor GNN predictions print(f"GNN predictions used: {result.gnn_prediction_count}") print(f"Fallback to heuristic: {result.fallback_to_heuristic_count}") ``` --- ### Hidden Channels Hidden channels allow passing **implicit information** between agents as vector representations, bypassing text prompts. This is especially useful for: - Passing contextual information without increasing prompt length - Preserving semantic embeddings for downstream tasks - Implementing attention mechanisms between agents - Integrating with a GNN to predict next steps #### Hidden channel architecture ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” hidden_state β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Agent A β”‚ ──────────────────> β”‚ Agent B β”‚ β”‚ (embedding) β”‚ embedding β”‚ (receives β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ combined) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` Each agent owns its: - **`embedding`** β€” vector representation of the agent description - **`hidden_state`** β€” hidden state updated after execution The runner combines predecessor `hidden_state` and `embedding` and passes them to the next agent. #### Using hidden channels ```python from execution import RunnerConfig, MACPRunner, HiddenState from core import NodeEncoder # 1. Create an encoder for embeddings encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2") # 2. Hidden-channel configuration config = RunnerConfig( enable_hidden_channels=True, hidden_combine_strategy="mean", # Combine strategy pass_embeddings=True, # Pass embeddings too hidden_dim=384, # Hidden state dimensionality ) runner = MACPRunner(llm_caller=my_llm, config=config) # 3. Compute agent embeddings texts = [agent.to_text() for agent in graph.agents] embeddings = encoder.encode(texts) for agent, emb in zip(graph.agents, embeddings): agent = agent.with_embedding(emb) graph.update_agent(agent.agent_id, agent) # 4. Execute with hidden channels result = runner.run_round_with_hidden( graph, hidden_encoder=encoder, # To create hidden_state from responses ) # 5. Access hidden states after execution for agent_id, hidden in result.hidden_states.items(): print(f"{agent_id}:") print(f" Hidden state: {hidden.tensor.shape}") # (hidden_dim,) print(f" Embedding: {hidden.embedding.shape}") # (embedding_dim,) print(f" Combined: {hidden.combined.shape}") # (hidden_dim + embedding_dim,) # 6. Use hidden states for downstream tasks hidden_states_matrix = torch.stack([ result.hidden_states[aid].tensor for aid in graph.node_ids ]) # Shape: (num_agents, hidden_dim) # For example, cluster agents by semantics from sklearn.cluster import KMeans kmeans = KMeans(n_clusters=3) clusters = kmeans.fit_predict(hidden_states_matrix.cpu().numpy()) ``` #### Combine strategies (combine_strategy) When an agent has multiple predecessors, their hidden states are combined: ```python # 1. "mean" β€” average (default) # hidden_combined = mean([h1, h2, h3]) config.hidden_combine_strategy = "mean" # 2. "sum" β€” sum # hidden_combined = h1 + h2 + h3 config.hidden_combine_strategy = "sum" # 3. "concat" β€” concatenation # hidden_combined = concat([h1, h2, h3]) # dimensionality increases config.hidden_combine_strategy = "concat" # 4. "attention" β€” weighted attention (weights from adjacency) # hidden_combined = w1*h1 + w2*h2 + w3*h3, where wi = edge_weight(i -> current) config.hidden_combine_strategy = "attention" # 5. "max" β€” elementwise max # hidden_combined = max(h1, h2, h3) config.hidden_combine_strategy = "max" ``` #### Advanced: custom hidden-state processing ```python from utils.memory import HiddenChannel # Create a custom HiddenChannel channel = HiddenChannel( node_id="agent_id", hidden_dim=384, ) # Set hidden state import torch channel.set_hidden(torch.randn(384)) channel.set_embedding(torch.randn(384)) # Get combined representation combined = channel.get_combined(strategy="attention", edge_weights=torch.tensor([0.8, 0.2])) # Reset channel.reset() # Integration with agent memory from utils.memory import AgentMemory memory = AgentMemory("agent_id") memory.hidden_state = torch.randn(384) memory.embedding = torch.randn(384) # Get what to pass to the next agent hidden_to_pass = memory.hidden_state embedding_to_pass = memory.embedding ``` #### Using with a GNN ```python from core.gnn import GNNRouterInference, DefaultFeatureGenerator # 1. Hidden states as features for a GNN feature_gen = DefaultFeatureGenerator() # Include hidden states into node features node_features = feature_gen.generate_node_features( graph, graph.node_ids, metrics_tracker, include_hidden_states=True, # Add hidden_state to features ) # 2. GNN predicts the next agent based on hidden states router = GNNRouterInference(model, feature_gen) prediction = router.predict( graph, source="current_agent", candidates=["next1", "next2"], metrics_tracker=tracker, hidden_states=result.hidden_states, # Pass current hidden states ) # 3. Update the graph based on GNN predictions if prediction.confidence > 0.8: next_agent = prediction.recommended_nodes[0] graph.add_edge("current_agent", next_agent, weight=prediction.confidence) ``` #### Example: multi-hop reasoning with hidden channels ```python # Task: multi-hop reasoning where each agent accumulates context agents = [ AgentProfile(agent_id="reader", display_name="Document Reader"), AgentProfile(agent_id="analyzer", display_name="Analyzer"), AgentProfile(agent_id="reasoner", display_name="Reasoner"), AgentProfile(agent_id="answerer", display_name="Final Answerer"), ] edges = [ ("reader", "analyzer"), ("analyzer", "reasoner"), ("reasoner", "answerer"), ] graph = build_property_graph(agents, edges, query="Complex question") # Enable hidden channels for context passing config = RunnerConfig( enable_hidden_channels=True, hidden_combine_strategy="attention", pass_embeddings=True, ) encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2") runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round_with_hidden(graph, hidden_encoder=encoder) # After each step, hidden_state contains the "accumulated context" # answerer receives a weighted combination of all previous hidden states ``` --- ### Adaptive execution Full control over adaptive execution: ```python from execution import ( MACPRunner, RunnerConfig, RoutingPolicy, PruningConfig, BudgetConfig, ErrorPolicy, ) config = RunnerConfig( adaptive=True, enable_parallel=True, max_parallel_size=5, routing_policy=RoutingPolicy.BEAM_SEARCH, pruning_config=PruningConfig( min_weight_threshold=0.1, token_budget=10000, enable_fallback=True, max_fallback_attempts=2, quality_scorer=lambda response: evaluate_quality(response), min_quality_threshold=0.5, ), budget_config=BudgetConfig( total_token_limit=50000, max_prompt_length=4000, node_token_limit=2000, ), error_policy=ErrorPolicy( on_timeout=ErrorAction.RETRY, on_retry_exhausted=ErrorAction.PRUNE, on_budget_exceeded=ErrorAction.ABORT, ), ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round(graph) print(f"Topology changes: {result.topology_changed_count}") print(f"Fallbacks: {result.fallback_count}") print(f"Pruned agents: {result.pruned_agents}") ``` --- ## Configuration ### Environment variables ```bash # API key (required) export RWXF_API_KEY="sk-your-api-key" # or via file export RWXF_API_KEY_FILE=/secure/rwxf.key # LLM service URL export RWXF_BASE_URL="https://api.openai.com/v1" # Models export RWXF_MODEL_NAME="gpt-4o-mini" export RWXF_EMBEDDING_MODEL="sentence-transformers/all-MiniLM-L6-v2" # Logging export RWXF_LOG_LEVEL="INFO" export RWXF_LOG_FILE="./logs/framework.log" # Network settings export RWXF_DEFAULT_TIMEOUT=60 export RWXF_MAX_RETRIES=3 ``` ### Programmatic configuration ```python from config import FrameworkSettings, load_settings # Load from environment settings = FrameworkSettings() # Load from a .env file settings = load_settings(".env") # Access settings api_key = settings.resolved_api_key model = settings.model_name timeout = settings.default_timeout ``` --- ## Usage examples ### Example 1: Simple pipeline ```python from execution import AgentProfile, MACPRunner from builder import build_property_graph agents = [ AgentProfile(agent_id="researcher", display_name="Researcher"), AgentProfile(agent_id="writer", display_name="Writer"), AgentProfile(agent_id="editor", display_name="Editor"), ] graph = build_property_graph( agents, workflow_edges=[("researcher", "writer"), ("writer", "editor")], query="Write an article about quantum computers", ) runner = MACPRunner(llm_caller=my_llm) result = runner.run_round(graph) print(result.final_answer) ``` ### Example 2: Parallel processing ```python # Agents work in parallel, then results are aggregated agents = [ AgentProfile(agent_id="analyst_1", display_name="Financial Analyst"), AgentProfile(agent_id="analyst_2", display_name="Market Analyst"), AgentProfile(agent_id="analyst_3", display_name="Risk Analyst"), AgentProfile(agent_id="aggregator", display_name="Report Aggregator"), ] edges = [ ("analyst_1", "aggregator"), ("analyst_2", "aggregator"), ("analyst_3", "aggregator"), ] graph = build_property_graph(agents, workflow_edges=edges, query="Analyze company X") config = RunnerConfig( enable_parallel=True, max_parallel_size=3, ) runner = MACPRunner(llm_caller=my_llm, config=config) result = await runner.arun_round(graph) ``` ### Example 3: Streaming with a callback ```python def on_event(event): if event.event_type == StreamEventType.AGENT_OUTPUT: save_to_db(event.agent_id, event.content) notify_frontend(event) runner = MACPRunner(llm_caller=my_llm) for event in runner.stream(graph): on_event(event) if event.event_type == StreamEventType.TOKEN: yield event.token # For SSE or WebSocket ``` ### Example 4: Working with memory ```python from execution import MACPRunner, RunnerConfig, MemoryConfig config = RunnerConfig( enable_memory=True, memory_config=MemoryConfig( working_max_entries=20, long_term_max_entries=100, ), memory_context_limit=5, # Include last 5 entries in the prompt ) runner = MACPRunner(llm_caller=my_llm, config=config) # First round result1 = runner.run_round(graph) # Second round β€” agents remember context graph.query = "Continue the previous task" result2 = runner.run_round(graph) # Access agent memory agent_memory = runner.get_agent_memory("solver") entries = agent_memory.get_messages() ``` ### Example 5: Graph visualization ```python from core import AgentProfile from core.visualization import ( GraphVisualizer, VisualizationStyle, MermaidDirection, NodeStyle, NodeShape, # Convenience functions to_mermaid, to_ascii, to_dot, print_graph, render_to_image, ) from builder import build_property_graph # Create a graph agents = [ AgentProfile( agent_id="input", display_name="Input Handler", tools=["api_reader"], ), AgentProfile( agent_id="processor", display_name="Data Processor", tools=["pandas", "torch"], ), AgentProfile( agent_id="output", display_name="Output Formatter", tools=["json", "csv"], ), ] graph = build_property_graph( agents, workflow_edges=[("input", "processor"), ("processor", "output")], query="Process data pipeline", include_task_node=True, ) # Option 1: Quick visualization (convenience functions) print("=== MERMAID ===") mermaid = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT) print(mermaid) print("\n=== ASCII ===") ascii_art = to_ascii(graph, show_edges=True) print(ascii_art) print("\n=== COLORED (if Rich is installed) ===") print_graph(graph, format="auto") # Automatically chooses colored or ascii # Option 2: Advanced visualization with custom styles (Pydantic models) # Create a style (Pydantic model with validation) custom_style = VisualizationStyle( direction=MermaidDirection.LEFT_RIGHT, agent_style=NodeStyle( shape=NodeShape.ROUND, fill_color="#e3f2fd", stroke_color="#1976d2", icon="πŸ€–", ), task_style=NodeStyle( shape=NodeShape.DIAMOND, fill_color="#fff3e0", stroke_color="#f57c00", icon="πŸ“‹", ), show_weights=True, show_tools=True, max_label_length=30, ) # Create a visualizer with the custom style viz = GraphVisualizer(graph, custom_style) # Mermaid with a title mermaid_styled = viz.to_mermaid(title="Data Pipeline") print("\n=== STYLED MERMAID ===") print(mermaid_styled) # Save to files viz.save_mermaid("pipeline.md", title="Data Pipeline") # Markdown with ```mermaid``` viz.save_dot("pipeline.dot", graph_name="DataPipeline") # Render to images (requires system Graphviz) try: render_to_image(graph, "pipeline.png", format="png", dpi=150, style=custom_style) render_to_image(graph, "pipeline.svg", format="svg", style=custom_style) print("\nβœ… Images created: pipeline.png, pipeline.svg") except Exception as e: print(f"\n⚠️ Image rendering failed: {e}") print(" Install system Graphviz to render images") # Adjacency matrix (text representation) print("\n=== ADJACENCY MATRIX ===") matrix = viz.to_adjacency_matrix(show_labels=True) print(matrix) # Rich Console output with trees and tables print("\n=== RICH CONSOLE ===") viz.print_colored() ``` ### Example 6: Conditional routing ```python from builder import GraphBuilder from execution.scheduler import ConditionContext # Define conditions def is_high_quality(context: ConditionContext) -> bool: return context.state.get("quality", 0) > 0.8 def needs_review(context: ConditionContext) -> bool: return context.state.get("word_count", 0) > 1000 # Build a graph with conditional edges builder = GraphBuilder() builder.add_agent(agent_id="writer", display_name="Content Writer") builder.add_agent(agent_id="editor", display_name="Quick Editor") builder.add_agent(agent_id="reviewer", display_name="Senior Reviewer") builder.add_agent(agent_id="publisher", display_name="Publisher") # Conditional transitions builder.add_conditional_edge("writer", "editor", condition=is_high_quality) builder.add_conditional_edge("writer", "reviewer", condition=needs_review) builder.add_workflow_edge("editor", "publisher") builder.add_workflow_edge("reviewer", "publisher") graph = builder.build() # Run runner = MACPRunner(llm_caller=my_llm) result = runner.run_round(graph) ``` ### Example 7: Monitoring with events ```python from core.events import ( global_event_bus, EventType, MetricsEventHandler, ) # Configure event handlers bus = global_event_bus() metrics_handler = MetricsEventHandler() # Subscribe to events bus.subscribe(None, metrics_handler) # Listen to all events @bus.subscribe(EventType.STEP_COMPLETED) def on_step_completed(event): print(f"βœ… {event.agent_id} completed in {event.duration_ms:.0f}ms") @bus.subscribe(EventType.BUDGET_WARNING) def on_budget_warning(event): print(f"⚠️ Budget {event.budget_type}: {event.ratio:.1%}") # Run with monitoring runner = MACPRunner(llm_caller=my_llm) result = runner.run_round(graph) # Get aggregated metrics metrics = metrics_handler.get_metrics() print(f"Total tokens: {metrics['total_tokens']}") print(f"Errors: {metrics['errors_count']}") print(f"Avg step duration: {metrics['avg_step_duration_ms']:.1f}ms") ``` ### Example 8: GNN routing with training ```python from core.gnn import ( create_gnn_router, GNNTrainer, GNNRouterInference, GNNModelType, TrainingConfig, DefaultFeatureGenerator, ) from core.metrics import MetricsTracker import torch # Collect execution data for training tracker = MetricsTracker() # ... run several rounds with different queries ... for i in range(100): result = runner.run_round(graph) # Record metrics for agent_id, response in result.messages.items(): tracker.record_node_execution( node_id=agent_id, success=True, latency_ms=response["latency"], cost_tokens=response["tokens"], quality=evaluate_quality(response["content"]), ) # Feature generation feature_gen = DefaultFeatureGenerator() node_features = feature_gen.generate_node_features( graph, graph.node_ids, tracker, ) # Create dataset # ... prepare train_data, val_data in PyG Data format ... # Train the model config = TrainingConfig( learning_rate=1e-3, hidden_dim=64, num_layers=2, epochs=50, task="node_classification", ) model = create_gnn_router( model_type=GNNModelType.GAT, in_channels=node_features.shape[1], out_channels=2, config=config, ) trainer = GNNTrainer(model, config) result = trainer.train(train_data, val_data) print(f"Best validation accuracy: {result['best_val_acc']:.3f}") trainer.save("gnn_router.pt") # Use the trained model for routing router = GNNRouterInference(model, feature_gen) prediction = router.predict( graph, source="coordinator", candidates=["agent1", "agent2", "agent3"], metrics_tracker=tracker, ) print(f"Recommended: {prediction.recommended_nodes[0]}") print(f"Confidence: {prediction.confidence:.3f}") ``` ### Example 9: Adaptive execution with a budget ```python from execution import ( MACPRunner, RunnerConfig, RoutingPolicy, PruningConfig, ) from execution.budget import Budget # Configure adaptive execution config = RunnerConfig( adaptive=True, enable_parallel=True, max_parallel_size=3, routing_policy=RoutingPolicy.WEIGHTED_TOPO, pruning_config=PruningConfig( min_weight_threshold=0.1, token_budget=5000, enable_fallback=True, max_fallback_attempts=2, ), budget_config=BudgetConfig( total_token_limit=10000, node_token_limit=2000, max_prompt_length=3000, warn_at_usage_ratio=0.8, ), timeout=60.0, max_retries=2, ) runner = MACPRunner(llm_caller=my_llm, config=config) # Execute try: result = runner.run_round(graph) print(f"Executed agents: {len(result.execution_order)}") print(f"Pruned agents: {result.pruned_agents}") print(f"Topology changes: {result.topology_changed_count}") print(f"Fallback count: {result.fallback_count}") print(f"Total tokens: {result.total_tokens}") except BudgetExceededError as e: print(f"Budget exceeded: {e}") except ExecutionError as e: print(f"Execution failed: {e}") ``` ### Example 10: Graph analysis with algorithms ```python from core.algorithms import ( GraphAlgorithms, CentralityType, PathMetric, ) # Create a complex graph algo = GraphAlgorithms(graph) # Find critical nodes centrality = algo.centrality(CentralityType.BETWEENNESS, normalized=True) print(f"Most critical agents: {centrality.top_nodes[:3]}") # Find alternative paths paths = algo.k_shortest_paths( source="input", target="output", k=3, metric=PathMetric.WEIGHTED, ) print(f"Found {len(paths)} alternative paths:") for i, path in enumerate(paths, 1): print(f" Path {i}: {' -> '.join(path.nodes)} (cost: {path.cost:.2f})") # Detect communities communities = algo.detect_communities(algorithm="louvain") print(f"Communities found: {len(communities.communities)}") for i, community in enumerate(communities.communities): print(f" Community {i}: {community}") # Cycle check cycles = algo.find_cycles(max_length=5) if cycles.has_cycles: print(f"⚠️ Graph has {len(cycles.cycles)} cycles!") else: print("βœ“ Graph is acyclic (DAG)") ``` ### Example 11: Multi-model system with cost optimization ```python from builder import GraphBuilder from execution import MACPRunner, LLMCallerFactory # Build a graph with different models for different tasks builder = GraphBuilder() # Stage 1: Data collection (5 parallel agents, cheap model) for i in range(5): builder.add_agent( f"collector_{i}", display_name=f"Data Collector {i}", persona="Collects and formats raw data", llm_backbone="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.2, max_tokens=500, ) builder.add_workflow_edge(f"collector_{i}", "analyst") # Stage 2: Deep analysis (1 agent, strong model) builder.add_agent( "analyst", display_name="Senior Data Analyst", persona="Expert analyst with deep statistical knowledge", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, max_tokens=4000, ) builder.add_workflow_edge("analyst", "privacy_checker") # Stage 3: Privacy compliance check (local model) builder.add_agent( "privacy_checker", display_name="Privacy Compliance Checker", persona="Ensures data privacy and compliance", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1", api_key="not-needed", temperature=0.0, max_tokens=1000, ) builder.add_workflow_edge("privacy_checker", "reporter") # Stage 4: Report generation (cheap model) builder.add_agent( "reporter", display_name="Report Generator", persona="Formats analysis into readable reports", llm_backbone="gpt-4o-mini", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.5, max_tokens=2000, ) builder.set_task( query="Analyze Q4 sales data and generate a compliance report", description="Full pipeline from data collection to the final report", ) graph = builder.build() # Print configuration print("=== Multi-Model Pipeline Configuration ===\n") for agent in graph.agents: if hasattr(agent, 'llm_config') and agent.llm_config: config = agent.llm_config print(f"{agent.display_name}:") print(f" Model: {config.model_name}") print(f" Endpoint: {config.base_url}") print(f" Temp: {config.temperature}, Max tokens: {config.max_tokens}") print() # Create factory and runner factory = LLMCallerFactory.create_openai_factory() config = RunnerConfig( enable_parallel=True, max_parallel_size=5, # Collectors run in parallel timeout=120.0, callbacks=[StdoutCallbackHandler()], # Execution monitoring ) runner = MACPRunner( llm_factory=factory, config=config, ) # Execute print("=== Executing Multi-Model Pipeline ===\n") result = runner.run_round(graph) print(f"\n=== Results ===") print(f"Execution order: {' β†’ '.join(result.execution_order)}") print(f"Total time: {result.total_time:.2f}s") print(f"Total tokens: {result.total_tokens}") print(f"\nFinal report:\n{result.final_answer}") # Token usage analysis by model from collections import defaultdict costs_by_model = defaultdict(int) for agent_id in result.execution_order: agent = graph.get_agent_by_id(agent_id) model = agent.llm_config.model_name if agent.llm_config else "default" tokens = result.messages.get(agent_id, {}).get("tokens", 0) costs_by_model[model] += tokens print(f"\n=== Token Usage by Model ===") for model, tokens in costs_by_model.items(): print(f"{model}: {tokens} tokens") # Savings calculation # gpt-4: $30/$60 per 1M tokens (input/output) # gpt-4o-mini: $0.15/$0.60 per 1M tokens # llama3 (local): $0 gpt4_tokens = costs_by_model.get("gpt-4", 0) mini_tokens = costs_by_model.get("gpt-4o-mini", 0) llama_tokens = costs_by_model.get("llama3:70b", 0) actual_cost = (gpt4_tokens * 45 / 1_000_000) + (mini_tokens * 0.375 / 1_000_000) if_all_gpt4_cost = (gpt4_tokens + mini_tokens + llama_tokens) * 45 / 1_000_000 print(f"\n=== Cost Analysis ===") print(f"Actual cost: ${actual_cost:.4f}") print(f"Cost if all GPT-4: ${if_all_gpt4_cost:.4f}") print(f"Savings: ${if_all_gpt4_cost - actual_cost:.4f} ({((1 - actual_cost/if_all_gpt4_cost)*100):.1f}%)") ``` --- ### Token budget (Budget System) Resource management for execution (tokens, requests, time). ```python from execution.budget import ( Budget, BudgetConfig, NodeBudget, BudgetTracker, ) # Budget β€” tracks a single resource (tokens, requests, or time) token_budget = Budget(limit=50000) print(f"Available: {token_budget.available}") print(f"Usage ratio: {token_budget.usage_ratio:.1%}") can_spend = token_budget.can_spend(100) # Check before using token_budget.spend(100) # Record usage # Per-node budget (composed of Budget objects) node_budget = NodeBudget( node_id="solver", tokens=Budget(limit=2000), requests=Budget(limit=10), time_seconds=Budget(limit=60), ) # Budget tracker β€” configured via BudgetConfig config = BudgetConfig( total_token_limit=50000, # Global token limit total_request_limit=100, # Global request limit total_time_limit_seconds=600, # Global time limit (10 min) node_token_limit=2000, # Per-node token limit max_prompt_length=4000, # Max chars in a prompt max_response_length=2000, # Max chars in a response warn_at_usage_ratio=0.8, # Warn at 80% ) tracker = BudgetTracker(config=config) tracker.start() # Start the timer # Availability check can_run, reason = tracker.can_execute("solver", estimated_tokens=100) if can_run: # Record usage after execution tracker.record_usage( node_id="solver", prompt_tokens=80, completion_tokens=120, latency_seconds=1.5, ) # Prompt/response truncation when exceeding limits prompt = "a very long prompt..." truncated = tracker.truncate_prompt(prompt) # Budget summary summary = tracker.get_summary() print(f"Tokens used: {summary['global']['tokens']['used']}") print(f"Time elapsed: {summary['global']['elapsed_seconds']:.1f}s") # Reset tracker.reset() ``` #### Integration with RunnerConfig ```python from execution import RunnerConfig, BudgetConfig config = RunnerConfig( budget_config=BudgetConfig( total_token_limit=50000, node_token_limit=2000, max_prompt_length=4000, warn_at_usage_ratio=0.8, ), ) ``` --- ### Error handling (Error Handling) Structured exceptions and error-handling policies. ```python from execution.errors import ( ExecutionError, TimeoutError, RetryExhaustedError, BudgetExceededError, AgentNotFoundError, ValidationError, ErrorPolicy, ErrorAction, ExecutionMetrics, ) # Error policy error_policy = ErrorPolicy( on_timeout=ErrorAction.RETRY, # retry, skip, prune, fallback, rollback, abort on_retry_exhausted=ErrorAction.PRUNE, on_budget_exceeded=ErrorAction.ABORT, on_validation_error=ErrorAction.ABORT, on_agent_not_found=ErrorAction.SKIP, on_unknown_error=ErrorAction.SKIP, max_skipped_agents=5, abort_on_critical_path=True, ) # Apply in configuration config = RunnerConfig( error_policy=error_policy, max_retries=3, timeout=60.0, ) # Error handling try: result = runner.run_round(graph) except TimeoutError as e: print(f"Timeout: {e}") except RetryExhaustedError as e: print(f"Retries exhausted: {e}") except BudgetExceededError as e: print(f"Budget exceeded: {e}") except ExecutionError as e: print(f"Execution error: {e}") # Access metrics metrics: ExecutionMetrics = e.metrics print(f"Retries: {metrics.retry_count}") print(f"Fallbacks: {metrics.fallback_count}") # Get metrics from the result if result.errors: for error in result.errors: print(f"{error['agent_id']}: {error['type']} - {error['message']}") ``` --- ### Graph algorithms (Graph Algorithms) A service layer for graph analysis using `rustworkx` algorithms. ```python from core.algorithms import ( GraphAlgorithms, CentralityType, PathMetric, SubgraphFilter, ) algo = GraphAlgorithms(graph) # K shortest paths paths = algo.k_shortest_paths( source="researcher", target="writer", k=3, metric=PathMetric.HOP_COUNT, # HOP_COUNT, WEIGHTED, RELIABILITY edge_weights=None, # or custom weights ) for i, path in enumerate(paths): print(f"Path {i+1}: {path.nodes} (cost={path.cost:.2f})") # Node centrality centrality = algo.centrality( centrality_type=CentralityType.BETWEENNESS, # DEGREE, BETWEENNESS, CLOSENESS, EIGENVECTOR, PAGERANK normalized=True, ) print(f"Most central node: {centrality.top_nodes[0]}") print(f"Scores: {centrality.scores}") # Community detection communities = algo.detect_communities(algorithm="louvain") # louvain, label_propagation print(f"Communities found: {len(communities.communities)}") print(f"Modularity: {communities.modularity:.3f}") # Cycle search cycles = algo.find_cycles(max_length=5) if cycles.has_cycles: print(f"Cycles found: {len(cycles.cycles)}") for cycle in cycles.cycles: print(f" {cycle}") # Subgraph filtering subgraph_filter = SubgraphFilter( include_node_ids=["a", "b", "c"], min_edge_weight=0.5, max_hop_distance=2, from_node="a", ) subgraph = algo.filter_subgraph(subgraph_filter) print(f"Nodes in subgraph: {len(subgraph.node_ids)}") # Reachability analysis reachable = algo.get_reachable_nodes("start", max_distance=3) print(f"Reachable nodes: {reachable}") # Topological order if algo.is_dag(): topo_order = algo.topological_sort() print(f"Topological order: {topo_order}") ``` --- ### Metrics Tracker Collects and aggregates performance metrics for nodes and edges. ```python from core.metrics import ( MetricsTracker, NodeMetrics, EdgeMetrics, MetricAggregator, ExponentialMovingAverage, SlidingWindowAverage, ) tracker = MetricsTracker() # Record node metrics tracker.record_node_execution( node_id="solver", success=True, latency_ms=150, cost_tokens=200, quality=0.95, ) # Record edge metrics tracker.record_edge_traversal( source="solver", target="checker", weight=0.9, success=True, latency_ms=50, ) # Get node metrics metrics: NodeMetrics = tracker.get_node_metrics("solver") print(f"Reliability: {metrics.reliability:.3f}") print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms") print(f"Total cost: {metrics.total_cost_tokens}") print(f"Avg quality: {metrics.avg_quality:.3f}") print(f"Executions: {metrics.execution_count}") # Get edge metrics edge_metrics: EdgeMetrics = tracker.get_edge_metrics("solver", "checker") print(f"Edge reliability: {edge_metrics.reliability:.3f}") print(f"Traversals: {edge_metrics.traversal_count}") # Snapshot of all metrics snapshot = tracker.snapshot() print(f"Timestamp: {snapshot.timestamp}") print(f"Node metrics: {snapshot.node_metrics}") print(f"Edge metrics: {snapshot.edge_metrics}") # Metrics history (if enabled) tracker = MetricsTracker(keep_history=True, history_window=100) # ... records ... history = tracker.get_history(node_id="solver") for snapshot in history.snapshots: print(f"{snapshot.timestamp}: {snapshot.metrics}") # Custom aggregators ema = ExponentialMovingAverage(alpha=0.1) tracker.set_aggregator("solver", "latency", ema) swa = SlidingWindowAverage(window_size=10) tracker.set_aggregator("checker", "quality", swa) # Export metrics data = tracker.to_dict() tracker.save("metrics.json") # Load metrics tracker = MetricsTracker.load("metrics.json") ``` --- ### Visualization Tools for visualizing graphs in different formats. All visualization styles are based on **Pydantic models** for validation and type safety. #### Core classes ```python from core.visualization import ( GraphVisualizer, VisualizationStyle, MermaidDirection, NodeShape, NodeStyle, EdgeStyle, # Convenience functions to_mermaid, to_ascii, to_dot, print_graph, render_to_image, show_graph_interactive, ) ``` #### 1. Quick usage (convenience functions) ```python # Simple Mermaid mermaid_code = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT) print(mermaid_code) # Simple ASCII ascii_art = to_ascii(graph, show_edges=True) print(ascii_art) # Simple DOT dot_code = to_dot(graph, graph_name="MyGraph") print(dot_code) # Print to console (auto-selects Rich or ASCII) print_graph(graph, format="auto") # "auto", "colored", "ascii", "mermaid" # Render to image (requires system Graphviz) render_to_image(graph, "output.png", format="png", dpi=300) render_to_image(graph, "output.svg", format="svg") # Interactive view (opens in system viewer) show_graph_interactive(graph, graph_name="MyWorkflow") ``` #### 2. Advanced usage (GraphVisualizer with custom styles) **VisualizationStyle**, **NodeStyle**, **EdgeStyle** are Pydantic models with field validation. ```python # Create custom node styles (Pydantic models) agent_style = NodeStyle( shape=NodeShape.ROUND, # RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc. fill_color="#e3f2fd", # Fill color stroke_color="#1976d2", # Border color text_color="#000000", # Text color icon="πŸ€–", # Emoji icon ) task_style = NodeStyle( shape=NodeShape.DIAMOND, fill_color="#fff3e0", stroke_color="#f57c00", icon="πŸ“‹", ) # Edge styles (Pydantic models) workflow_edge = EdgeStyle( line_style="solid", # solid, dashed, dotted arrow_head="normal", # normal, none, diamond color="#1976d2", label_color="#333333", ) task_edge = EdgeStyle( line_style="dashed", color="#f57c00", ) # Global visualization style (Pydantic model) style = VisualizationStyle( direction=MermaidDirection.LEFT_RIGHT, # TOP_BOTTOM, BOTTOM_TOP, LEFT_RIGHT, RIGHT_LEFT agent_style=agent_style, task_style=task_style, workflow_edge_style=workflow_edge, task_edge_style=task_edge, show_weights=True, # Show edge weights show_probabilities=False, # Show probabilities show_tools=True, # Show agent tools show_descriptions=False, # Show descriptions max_label_length=30, # Max label length ) # Create a visualizer with custom style viz = GraphVisualizer(graph, style) # Mermaid diagrams mermaid = viz.to_mermaid( direction=MermaidDirection.TOP_BOTTOM, # Can override style title="Agent Workflow", # Diagram title ) print(mermaid) # Save Mermaid to a file viz.save_mermaid("graph.md", title="My Workflow") # Wraps in ```mermaid``` viz.save_mermaid("graph.mmd", title="My Workflow") # Raw .mmd without wrapper # ASCII art for terminal ascii_art = viz.to_ascii( show_edges=True, box_width=20, ) print(ascii_art) # Graphviz DOT dot = viz.to_dot( graph_name="AgentGraph", rankdir="LR", # TB, LR, BT, RL ) viz.save_dot("graph.dot", graph_name="AgentGraph") # Render to image (requires installed Graphviz) viz.render_image( "output.png", format="png", # png, svg, pdf, jpg dpi=300, # For raster formats graph_name="MyGraph", ) # Interactive view viz.show_interactive(graph_name="MyGraph") # Opens system viewer # Adjacency matrix (text representation) matrix = viz.to_adjacency_matrix(show_labels=True) print(matrix) ``` #### 3. Colored terminal output (Rich Console) ```python # Automatic colored output (if Rich is installed) print_graph(graph, format="colored") # Or via visualizer viz = GraphVisualizer(graph) viz.print_colored() # Pretty output with trees, tables, and colors ``` #### 4. Full configuration example ```python from core.visualization import ( GraphVisualizer, VisualizationStyle, NodeStyle, EdgeStyle, NodeShape, MermaidDirection, ) # Fully configured style custom_style = VisualizationStyle( direction=MermaidDirection.LEFT_RIGHT, agent_style=NodeStyle( shape=NodeShape.ROUND, fill_color="#bbdefb", stroke_color="#0d47a1", icon="πŸ€–", ), task_style=NodeStyle( shape=NodeShape.DIAMOND, fill_color="#ffe0b2", stroke_color="#e65100", icon="πŸ“‹", ), workflow_edge_style=EdgeStyle( line_style="solid", color="#1976d2", ), task_edge_style=EdgeStyle( line_style="dashed", color="#f57c00", ), show_weights=True, show_tools=True, max_label_length=40, ) viz = GraphVisualizer(graph, custom_style) # Generate all formats viz.save_mermaid("docs/graph.md", title="Workflow") viz.save_dot("docs/graph.dot") viz.render_image("docs/graph.png", format="png", dpi=150) viz.render_image("docs/graph.svg", format="svg") print(viz.to_ascii()) ``` #### 5. Installing Graphviz for image rendering For `render_image()` and `render_to_image()` you need: 1. Python library: `pip install graphviz` 2. System Graphviz: - Ubuntu/Debian: `sudo apt install graphviz` - macOS: `brew install graphviz` - Windows: `winget install graphviz` or https://graphviz.org/download/ --- ### Schema System A complete system of **Pydantic schemas** for type-safe validation, serialization, and migration of graph data. All schemas inherit from `pydantic.BaseModel` and provide automatic type validation, default values, and data conversion. #### Core schema classes ```python from core.schema import ( # Versioning SCHEMA_VERSION, SchemaVersion, # Node and edge types NodeType, EdgeType, # Node schemas (Pydantic BaseModel) BaseNodeSchema, AgentNodeSchema, TaskNodeSchema, # Edge schemas (Pydantic BaseModel) BaseEdgeSchema, WorkflowEdgeSchema, CostMetrics, # Graph schema (Pydantic BaseModel) GraphSchema, # LLM configuration (Pydantic BaseModel) LLMConfig, # Validation (Pydantic BaseModel) ValidationResult, SchemaValidator, # Migrations SchemaMigration, MigrationRegistry, migrate_schema, ) ``` #### 1. Creating node schemas (Pydantic models) ```python # Agent with a full LLM configuration agent_node = AgentNodeSchema( id="solver", type=NodeType.AGENT, display_name="Math Solver", persona="You are an expert mathematician", description="Solves complex math problems step by step", tools=["calculator", "wolfram_alpha"], # LLM configuration (Pydantic model) llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, max_tokens=2000, # Metrics and state trust_score=0.95, quality_score=0.9, success_rate=1.0, total_calls=0, total_tokens_used=0, # Pydantic validates embedding automatically embedding=[0.1, 0.2, 0.3], # Can be a list or torch.Tensor embedding_dim=3, # Auto-filled if None # Metadata (arbitrary data) metadata={"priority": "high", "category": "math"}, tags={"solver", "math", "primary"}, ) # Task task_node = TaskNodeSchema( id="main_task", type=NodeType.TASK, query="Solve: x^2 + 5x + 6 = 0", description="Main mathematical task", expected_output="Two solutions: x1, x2", max_iterations=10, status="pending", # pending, running, completed, failed ) # Extract LLM configuration from the agent llm_config: LLMConfig = agent_node.get_llm_config() print(f"Model: {llm_config.model_name}") print(f"Configured: {llm_config.is_configured()}") print(f"Generation params: {llm_config.to_generation_params()}") # Check whether an LLM configuration exists if agent_node.has_llm_config(): print("Agent has LLM configuration") ``` #### 2. Creating edge schemas (Pydantic models) ```python # Base edge with cost metrics (Pydantic model) edge = BaseEdgeSchema( source="solver", target="checker", type=EdgeType.WORKFLOW, weight=1.0, probability=0.95, bidirectional=False, # Cost metrics (Pydantic model) cost=CostMetrics( estimated_tokens=500, actual_tokens=None, latency_ms=150.0, timeout_ms=5000.0, trust=0.9, reliability=0.95, cost_usd=0.01, custom={"priority": 1.0}, ), # Pydantic validates attr automatically attr=[1.0, 0.95, 0.9], # Can be a list or torch.Tensor attr_dim=3, # Auto-filled if None metadata={"route": "primary"}, ) # Workflow edge with conditional routing conditional_edge = WorkflowEdgeSchema( source="solver", target="checker", type=EdgeType.WORKFLOW, weight=0.9, probability=1.0, # Conditional routing condition="source_success", # Name of a built-in or registered condition priority=1, # Priority (higher = checked earlier) transform="extract_answer", # Optional data transform is_conditional=True, # Auto-set if condition is provided ) # Get edge features feature_vector = edge.get_feature_vector(feature_names=["trust", "reliability"]) print(f"Features: {feature_vector}") # Convert to torch.Tensor attr_tensor = edge.to_attr_tensor() print(f"Attr tensor: {attr_tensor}") ``` #### 3. Full graph schema (Pydantic model) ```python from datetime import datetime # GraphSchema - the main Pydantic model schema = GraphSchema( schema_version=SCHEMA_VERSION, # "2.0.0" name="Math Pipeline", description="A workflow for solving mathematical problems", created_at=datetime.now(), updated_at=datetime.now(), # nodes is dict[str, BaseNodeSchema], not a list! nodes={ "solver": AgentNodeSchema( id="solver", display_name="Math Solver", description="Solves math problems", tools=["calculator"], llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", ), "checker": AgentNodeSchema( id="checker", display_name="Answer Checker", description="Validates solutions", llm_backbone="gpt-4o-mini", ), "__task__": TaskNodeSchema( id="__task__", query="Solve: x^2 + 5x + 6 = 0", ), }, edges=[ WorkflowEdgeSchema( source="solver", target="checker", weight=0.9, type=EdgeType.WORKFLOW, ), ], # Feature names for feature extraction node_feature_names=["trust_score", "quality_score"], edge_feature_names=["trust", "reliability"], # Metadata metadata={ "created_by": "user@example.com", "purpose": "math_pipeline", "version": "1.0", }, ) # Add nodes and edges new_agent = AgentNodeSchema( id="reviewer", display_name="Reviewer", ) schema.add_node(new_agent) new_edge = BaseEdgeSchema( source="checker", target="reviewer", ) schema.add_edge(new_edge) # Retrieve nodes and edges solver_node = schema.get_node("solver") edges_from_solver = schema.get_edges(source="solver") edges_to_checker = schema.get_edges(target="checker") # Compute feature dimensionalities schema.compute_feature_dims() print(f"Node feature dim: {schema.node_feature_dim}") print(f"Edge feature dim: {schema.edge_feature_dim}") ``` #### 4. Serialization and validation (Pydantic) ```python # Serialization (Pydantic methods) schema_dict = schema.model_dump() # Dict[str, Any] schema_json = schema.model_dump_json(indent=2) # JSON string # Or a specialized method schema_data = schema.to_dict() # Deserialization (Pydantic methods) loaded_schema = GraphSchema.model_validate(schema_dict) loaded_from_json = GraphSchema.model_validate_json(schema_json) # Schema validation (returns ValidationResult - Pydantic model) validator = SchemaValidator( check_cycles=True, check_duplicates=True, check_orphans=True, check_connectivity=False, ) result: ValidationResult = validator.validate(schema) if result.valid: print("βœ“ Schema is valid") else: print("βœ— Validation errors:") for error in result.errors: print(f" - {error}") if result.warnings: print("⚠ Warnings:") for warning in result.warnings: print(f" - {warning}") ``` #### 5. Schema migration between versions ```python # Automatic migration of legacy data old_data = { "schema_version": "1.0.0", "agents": [ # Old format (agents list) {"agent_id": "solver", "display_name": "Solver"}, ], "edges": [ {"source": "solver", "target": "checker"}, ], } # Migrate to the current version (2.0.0) migrated_data = migrate_schema(old_data) print(f"Migrated to version: {migrated_data['schema_version']}") # Create a custom migration from core.schema import SchemaMigration, register_migration class MyCustomMigration(SchemaMigration): from_version = "1.5.0" to_version = "2.0.0" def migrate(self, data: dict) -> dict: # Your migration logic data["new_field"] = "default_value" return data # Register migration register_migration(MyCustomMigration()) ``` #### 6. Versioning ```python # Check schema version current_version = SchemaVersion.parse(SCHEMA_VERSION) # "2.0.0" print(f"Current: {current_version}") old_version = SchemaVersion.parse("1.5.0") print(f"Compatible: {current_version.is_schema_compatible(old_version)}") # False (different major versions) print(f"Newer: {current_version > old_version}") # True ``` #### Benefits of Pydantic schemas 1. **Automatic type validation** β€” Pydantic checks types when creating objects 2. **Default values** β€” fields are auto-populated 3. **Type conversion** β€” automatic conversion (torch.Tensor β†’ list) 4. **Serialization/deserialization** β€” built-in `.model_dump()`, `.model_validate()` 5. **Extensibility** β€” `extra="allow"` enables arbitrary fields 6. **Immutability** β€” `frozen=True` for immutable models 7. **Documentation** β€” automatic JSON Schema generation --- #### 7. Agent input/output validation **New:** Each agent can have **input_schema** and **output_schema** to validate incoming data and outputs. This allows you to: - πŸ”’ Guarantee data correctness - πŸ“ Automatically parse structured outputs - 🚫 Catch invalid LLM outputs - πŸ“‹ Generate JSON Schema for prompts > **Prompt injection:** `_build_prompt` automatically injects schemas into the LLM prompt. > - `output_schema` β†’ system message: `"Respond with JSON matching: {schema}"` > - `input_schema` β†’ user message: `"Input format: {schema}"` > > The schemas are serialised as compact JSON (no extra whitespace) to minimise token usage. > No manual prompt engineering is required. ##### Imports ```python from pydantic import BaseModel from core.schema import ( AgentNodeSchema, SchemaValidationResult, # Validation result ) from builder import GraphBuilder ``` ##### 7.1. Create an agent with Pydantic schemas ```python # Define input/output schemas as Pydantic models class SolverInput(BaseModel): question: str context: str | None = None difficulty: int = 1 class SolverOutput(BaseModel): answer: str confidence: float # 0.0 - 1.0 explanation: str | None = None # Create an agent with validation builder = GraphBuilder() builder.add_agent( "solver", display_name="Math Solver", persona="Expert mathematician", description="Solves mathematical problems", # Schemas for validation input_schema=SolverInput, output_schema=SolverOutput, # LLM configuration llm_backbone="gpt-4", temperature=0.0, ) graph = builder.build() ``` ##### 7.2. Using JSON Schema (without Pydantic) You can pass a plain dict with JSON Schema: ```python # JSON Schema directly (without Pydantic models) input_schema = { "type": "object", "properties": { "question": {"type": "string"}, "context": {"type": "string"}, }, "required": ["question"] } output_schema = { "type": "object", "properties": { "answer": {"type": "string"}, "confidence": {"type": "number"}, }, "required": ["answer", "confidence"] } builder.add_agent( "solver", input_schema=input_schema, # JSON Schema dict output_schema=output_schema, # JSON Schema dict ) ``` ##### 7.3. Validation via RoleGraph ```python # Check whether schemas exist has_input = graph.has_input_schema("solver") # True has_output = graph.has_output_schema("solver") # True # Validate input data result: SchemaValidationResult = graph.validate_agent_input( "solver", {"question": "Solve x^2 + 5x + 6 = 0"} ) if result.valid: print("βœ… Input is valid") print(f"Validated data: {result.validated_data}") else: print("❌ Input validation failed") print(f"Errors: {result.errors}") # Validate output data (JSON string or dict) response = '{"answer": "x1=-2, x2=-3", "confidence": 0.95}' result = graph.validate_agent_output("solver", response) if result.valid: parsed = result.validated_data print(f"Answer: {parsed['answer']}") print(f"Confidence: {parsed['confidence']}") else: print(f"Invalid output: {result.errors}") # You can raise an exception result.raise_if_invalid() # -> ValueError ``` ##### 7.4. Getting JSON Schema for prompts ```python # Get JSON Schema for LLM instructions input_schema_json = graph.get_input_schema_json("solver") output_schema_json = graph.get_output_schema_json("solver") # Use in the prompt prompt = f"""You are a math solver. INPUT FORMAT: {json.dumps(input_schema_json, indent=2)} You MUST respond in the following JSON format: {json.dumps(output_schema_json, indent=2)} Now solve: {{question}} """ ``` ##### 7.5. Validation directly via AgentNodeSchema ```python # Create an agent with schemas agent = AgentNodeSchema( id="solver", display_name="Math Solver", input_schema=SolverInput, output_schema=SolverOutput, ) # Validate result = agent.validate_input({"question": "2+2=?"}) print(f"Valid: {result.valid}") result = agent.validate_output('{"answer": "4", "confidence": 0.99}') print(f"Valid: {result.valid}, data: {result.validated_data}") # Check schema presence if agent.has_input_schema(): print("Agent has input schema") if agent.has_output_schema(): print("Agent has output schema") ``` ##### 7.6. Handling invalid LLM outputs ```python # Scenario: the LLM responds in the wrong format response = llm_call(prompt) result = graph.validate_agent_output("solver", response) if not result.valid: # Option 1: Retry with a stricter prompt retry_prompt = f"{prompt}\n\n⚠️ IMPORTANT: You MUST respond with valid JSON!" response = llm_call(retry_prompt) result = graph.validate_agent_output("solver", response) if not result.valid: # Option 2: Fallback to default values parsed = { "answer": response, "confidence": 0.5, "explanation": "LLM failed to format correctly" } else: parsed = result.validated_data else: parsed = result.validated_data print(f"Final answer: {parsed['answer']}") ``` ##### 7.7. SchemaValidationResult API ```python class SchemaValidationResult(BaseModel): """Schema validation result.""" valid: bool # True if data is valid schema_type: str # "input" or "output" errors: list[str] # Validation errors warnings: list[str] # Validation warnings validated_data: dict[str, Any] | None # Validated data message: str # Additional message # Methods result.raise_if_invalid() # Raise ValueError if invalid ``` ##### 7.8. Serialization support When saving a graph: - **Pydantic models** (`input_schema`/`output_schema`) are **NOT** serialized (exclude=True) - **JSON Schema** (`input_schema_json`/`output_schema_json`) **is** serialized ```python # When creating an agent with a Pydantic model agent = AgentNodeSchema( id="solver", input_schema=SolverInput, # Not serialized output_schema=SolverOutput, # Not serialized ) # JSON Schema is extracted automatically print(agent.input_schema_json) # {'type': 'object', 'properties': {...}} print(agent.output_schema_json) # {'type': 'object', 'properties': {...}} # When deserializing a graph from JSON # Pydantic models are lost, but JSON Schema remains # Validation works via basic type checks ``` ##### When should you use input/output schemas? | Scenario | Recommendation | |----------|----------------| | **Structured data** | βœ… Use Pydantic schemas | | **JSON outputs from an LLM** | βœ… Required! Parsing and validation | | **Free-form text** | ❌ Not needed | | **API integration** | βœ… Guarantees correct data | | **Debugging** | βœ… Quickly surfaces issues | ##### Performance impact - βœ… **Validation does not consume tokens** β€” it is pure Python - ⚠️ **Prompt instructions consume tokens** β€” embedding JSON Schema into prompts increases token usage - ⚑ **Validation is fast** β€” Pydantic is optimized for speed ##### Validation FAQ **Q: Is this required?** A: No, it is fully optional. If schemas are not set, validation is skipped. **Q: What if the LLM cannot respond in the required format?** A: `validate_output()` returns `valid=False` plus errors. Options: retry/fallback/ignore. **Q: Can I pass plain JSON Schema?** A: Yes. Pass a dict with JSON Schema instead of a Pydantic model. **Q: Does token usage increase?** A: Validation does not consume tokens. But including JSON Schema in prompts does increase token usage. --- ### Builder API (Detailed) Different ways to construct graphs. #### 1. build_property_graph (quick construction) ```python from builder import build_property_graph graph = build_property_graph( agents=[agent1, agent2, agent3], workflow_edges=[("agent1", "agent2"), ("agent2", "agent3")], context_edges=[("agent1", "agent3")], # Additional connections query="Solve this task", include_task_node=True, # Add a task node task_node_id="__task__", # Task node ID connect_task_to_all=False, # Connect task to all agents edge_weights=None, # Custom edge weights default_weight=1.0, # Default weight bidirectional=False, # Bidirectional edges encoder=None, # NodeEncoder for embeddings compute_embeddings=False, # Compute embeddings immediately ) ``` #### 2. GraphBuilder (fluent API) ```python from builder import GraphBuilder builder = GraphBuilder() # Add agents (basic) builder.add_agent( agent_id="researcher", display_name="Researcher", description="Does research", tools=["search", "read"], ) # Add an agent with multi-model configuration builder.add_agent( agent_id="analyst", display_name="Senior Analyst", persona="Expert data analyst", # LLM configuration llm_backbone="gpt-4", # Model name base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", # Or $ENV_VAR temperature=0.7, max_tokens=2000, timeout=60.0, top_p=0.9, stop_sequences=["END", "STOP"], ) # Or via an LLMConfig object from core.schema import LLMConfig llm_config = LLMConfig( model_name="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.7, max_tokens=2000, ) builder.add_agent( agent_id="writer", display_name="Writer", llm_config=llm_config, # Pass a ready configuration ) # Add edges builder.add_workflow_edge("researcher", "writer", weight=0.9) builder.add_context_edge("researcher", "writer", weight=0.5) # Add a task builder.set_task(query="Write a report", description="Main task") # Conditional edges def quality_check(state: dict) -> bool: return state.get("quality_score", 0) > 0.8 builder.add_conditional_edge( source="writer", target="editor", condition=quality_check, weight=0.9, ) # Set execution bounds (new!) builder.set_start_node("researcher") # Start node builder.set_end_node("writer") # End node # Or both at once: builder.set_execution_bounds("researcher", "writer") # Build the graph graph = builder.build(compute_embeddings=True, encoder=my_encoder) # Validate before building is_valid, errors = builder.validate() if not is_valid: print(f"Errors: {errors}") ``` #### 3. build_from_adjacency (from a matrix) ```python from builder import build_from_adjacency import torch adjacency = torch.tensor([ [0, 1, 0], [0, 0, 1], [0, 0, 0], ], dtype=torch.float32) graph = build_from_adjacency( adjacency_matrix=adjacency, agents=[agent1, agent2, agent3], query="Task", threshold=0.1, # Ignore edges with weight < threshold ) ``` #### 4. build_from_schema (from a schema) ```python from builder import build_from_schema graph = build_from_schema( schema=my_schema, compute_embeddings=True, encoder=my_encoder, validate=True, # Validate before building ) ``` --- ### Event System Subscribe to events for monitoring and debugging. ```python from core.events import ( EventBus, global_event_bus, EventType, LoggingEventHandler, MetricsEventHandler, on_event, # Events NodeAddedEvent, EdgeAddedEvent, StepCompletedEvent, BudgetWarningEvent, ) # Get the global event bus bus = global_event_bus() # 1. Subscribe via a handler logging_handler = LoggingEventHandler( log_level="INFO", include_metadata=True, ) bus.subscribe(EventType.STEP_COMPLETED, logging_handler) # 2. Subscribe via a function def on_step_completed(event): if isinstance(event, StepCompletedEvent): print(f"Agent {event.agent_id} completed: {event.tokens_used} tokens") bus.subscribe(EventType.STEP_COMPLETED, on_step_completed) # 3. Subscribe via a decorator @on_event(EventType.BUDGET_WARNING) def handle_budget_warning(event: BudgetWarningEvent): print(f"⚠️ Budget warning: {event.budget_type} at {event.ratio:.1%}") # 4. Global subscription (all events) @on_event(None) def handle_all_events(event): print(f"Event: {event.event_type.value}") # Disable event handling bus.disable() # Enable bus.enable() # Clear all handlers bus.clear() # Aggregate metrics via events metrics_handler = MetricsEventHandler() bus.subscribe(None, metrics_handler) # After execution metrics = metrics_handler.get_metrics() print(f"Total tokens: {metrics['total_tokens']}") print(f"Errors: {metrics['errors_count']}") print(f"Budget warnings: {metrics['budget_warnings']}") ``` --- ### Callback system Monitoring and logging execution via callback handlers. #### Core concepts - **`BaseCallbackHandler`** β€” base class for creating callback handlers - **`AsyncCallbackHandler`** β€” async version for asynchronous operations - **`CallbackManager`** β€” manager that orchestrates and invokes handlers - **Built-in handlers** β€” StdoutCallbackHandler, MetricsCallbackHandler, FileCallbackHandler #### Quick start ```python from execution import MACPRunner from callbacks import ( StdoutCallbackHandler, MetricsCallbackHandler, FileCallbackHandler, ) # 1. Callbacks via RunnerConfig from execution import RunnerConfig config = RunnerConfig( callbacks=[ StdoutCallbackHandler(show_outputs=True), MetricsCallbackHandler(), ] ) runner = MACPRunner(llm_caller=my_llm, config=config) result = runner.run_round(graph) # 2. Per-run callbacks (override config) result = runner.run_round( graph, callbacks=[FileCallbackHandler("execution_log.jsonl")] ) ``` #### Context Manager ```python from callbacks import collect_metrics, trace_as_callback # 1. Collect metrics with collect_metrics() as metrics: runner.run_round(graph) print(f"Total tokens: {metrics.total_tokens}") print(f"Total duration: {metrics.total_duration_ms}ms") print(f"Runs completed: {metrics.runs_completed}") print(f"Runs failed: {metrics.runs_failed}") # Full statistics all_metrics = metrics.get_metrics() print(f"Agent calls: {all_metrics['agent_calls']}") print(f"Errors: {all_metrics['errors_count']}") # 2. Tracing with arbitrary handlers from callbacks import StdoutCallbackHandler with trace_as_callback(handlers=[StdoutCallbackHandler()]) as manager: runner.run_round(graph) # Callbacks are automatically applied to this run ``` #### Creating your own CallbackHandler ```python from callbacks import BaseCallbackHandler from uuid import UUID class MySlackAlertHandler(BaseCallbackHandler): """Sends Slack alerts on errors.""" def on_run_start( self, *, run_id: UUID, query: str, num_agents: int = 0, **kwargs, ) -> None: send_slack(f"πŸš€ Started run {run_id}: {num_agents} agents") def on_agent_end( self, *, run_id: UUID, agent_id: str, output: str, tokens_used: int = 0, duration_ms: float = 0.0, **kwargs, ) -> None: print(f"βœ… Agent {agent_id}: {tokens_used} tokens, {duration_ms:.0f}ms") def on_agent_error( self, error: BaseException, *, run_id: UUID, agent_id: str, **kwargs, ) -> None: send_slack_alert( f"❌ Agent {agent_id} failed in run {run_id}: {error}", severity="high" ) def on_run_end( self, *, run_id: UUID, output: str, success: bool = True, total_tokens: int = 0, **kwargs, ) -> None: if not success: send_slack_alert(f"πŸ›‘ Run {run_id} failed!") else: send_slack(f"βœ… Run {run_id} completed: {total_tokens} tokens") # Usage runner = MACPRunner( llm_caller=my_llm, config=RunnerConfig(callbacks=[MySlackAlertHandler()]) ) ``` #### Async Callbacks ```python from callbacks import AsyncCallbackHandler import aiohttp class AsyncWebhookHandler(AsyncCallbackHandler): """Asynchronously sends a webhook on events.""" def __init__(self, webhook_url: str): self.webhook_url = webhook_url async def on_run_start( self, *, run_id: UUID, query: str, **kwargs, ) -> None: async with aiohttp.ClientSession() as session: await session.post( self.webhook_url, json={"event": "run_start", "run_id": str(run_id), "query": query} ) async def on_agent_end( self, *, run_id: UUID, agent_id: str, output: str, tokens_used: int = 0, **kwargs, ) -> None: async with aiohttp.ClientSession() as session: await session.post( self.webhook_url, json={ "event": "agent_end", "run_id": str(run_id), "agent_id": agent_id, "tokens": tokens_used, } ) # Usage with async runner runner = MACPRunner( async_llm_caller=my_async_llm, config=RunnerConfig(callbacks=[AsyncWebhookHandler("https://api.example.com/webhook")]) ) result = await runner.arun_round(graph) ``` #### Built-in handlers ##### 1. StdoutCallbackHandler β€” console output ```python from callbacks import StdoutCallbackHandler handler = StdoutCallbackHandler( color=True, # Colored output show_prompts=False, # Show prompts show_outputs=True, # Show agent outputs truncate_length=200, # Output truncation length ) runner = MACPRunner( llm_caller=my_llm, config=RunnerConfig(callbacks=[handler]) ) # Output example: # πŸš€ Run started: 5 agents # Order: researcher β†’ analyst β†’ writer β†’ editor β†’ publisher # ▢️ [0] Researcher started # πŸ› οΈ Tool 'web_search.search' started with args: {query: "market analysis"} # βœ… Success Tool 'web_search.search' ended (1200ms, 3500 chars) # βœ… [0] Researcher completed: 150 tokens, 1200ms # Output: Market analysis shows strong growth... # ▢️ [1] Analyst started # βœ… [1] Analyst completed: 200 tokens, 1500ms [FINAL] # βœ… Run completed: 350 tokens, 2700ms ``` ##### 2. MetricsCallbackHandler β€” metrics aggregation ```python from callbacks import MetricsCallbackHandler metrics_handler = MetricsCallbackHandler() runner = MACPRunner( llm_caller=my_llm, config=RunnerConfig(callbacks=[metrics_handler]) ) result = runner.run_round(graph) # Retrieve metrics metrics = metrics_handler.get_metrics() print(f"Total tokens: {metrics['total_tokens']}") print(f"Total duration: {metrics['total_duration_ms']}ms") print(f"Agent calls: {metrics['agent_calls']}") # {'researcher': 1, 'writer': 1, ...} print(f"Agent tokens: {metrics['agent_tokens']}") # {'researcher': 150, ...} print(f"Errors: {metrics['errors_count']}") print(f"Retries: {metrics['retries']}") print(f"Budget warnings: {metrics['budget_warnings']}") print(f"Runs completed: {metrics['runs_completed']}") # Averages print(f"Avg tokens per agent: {metrics['avg_tokens_per_agent']}") # Tool metrics (WebSearchTool and other tools) print(f"Tool calls: {metrics['tool_calls']}") # {'web_search.search': 3, 'web_search.fetch': 1} print(f"Tool durations: {metrics['tool_durations']}") # {'web_search.search': 3600.0, ...} print(f"Tool errors: {metrics['tool_errors_count']}") # 0 # Last 10 errors for error in metrics['errors']: print(f"Error in {error['agent_id']}: {error['error_message']}") # Last 10 tool errors for error in metrics['tool_errors']: print(f"Tool error: {error['tool_name']}.{error['action']}: {error['error_message']}") # Reset metrics metrics_handler.reset() ``` ##### 3. FileCallbackHandler β€” write to a JSON Lines file ```python from callbacks import FileCallbackHandler handler = FileCallbackHandler( file_path="execution_log.jsonl", append=True, # Append or overwrite flush_every=1, # Flush after each event ) runner = MACPRunner( llm_caller=my_llm, config=RunnerConfig(callbacks=[handler]) ) result = runner.run_round(graph) # Close the file manually (or it is closed automatically via __del__) handler.close() # File format (JSON Lines): # {"event_type": "run_start", "timestamp": "2024-...", "run_id": "...", "query": "...", "num_agents": 5} # {"event_type": "agent_start", "timestamp": "...", "run_id": "...", "agent_id": "researcher", ...} # {"event_type": "agent_end", "timestamp": "...", "run_id": "...", "agent_id": "researcher", "tokens_used": 150, ...} ``` #### Available callback methods | Method | Description | Parameters | |-------|-------------|-----------| | `on_run_start` | Run start | `run_id`, `query`, `num_agents`, `execution_order` | | `on_run_end` | Run end | `run_id`, `output`, `success`, `error`, `total_tokens`, `total_time_ms`, `executed_agents` | | `on_agent_start` | Agent started | `run_id`, `agent_id`, `agent_name`, `step_index`, `prompt`, `predecessors` | | `on_agent_end` | Agent finished | `run_id`, `agent_id`, `output`, `tokens_used`, `duration_ms`, `is_final` | | `on_agent_error` | Agent error | `error`, `run_id`, `agent_id`, `error_type`, `will_retry`, `attempt` | | `on_retry` | Retry attempt | `run_id`, `agent_id`, `attempt`, `max_attempts`, `delay_ms`, `error` | | `on_llm_new_token` | New token (streaming) | `token`, `run_id`, `agent_id`, `token_index`, `is_first`, `is_last` | | `on_plan_created` | Plan created | `run_id`, `num_steps`, `execution_order` | | `on_topology_changed` | Topology changed | `run_id`, `reason`, `old_remaining`, `new_remaining`, `change_count` | | `on_prune` | Agent pruned | `run_id`, `agent_id`, `reason` | | `on_fallback` | Fallback activated | `run_id`, `failed_agent_id`, `fallback_agent_id`, `reason` | | `on_parallel_start` | Parallel group start | `run_id`, `agent_ids`, `group_index` | | `on_parallel_end` | Parallel group end | `run_id`, `agent_ids`, `successful`, `failed` | | `on_memory_read` | Memory read | `run_id`, `agent_id`, `entries_count`, `keys` | | `on_memory_write` | Memory write | `run_id`, `agent_id`, `key`, `value_size` | | `on_budget_warning` | Budget warning | `run_id`, `budget_type`, `current`, `limit`, `ratio` | | `on_budget_exceeded` | Budget exceeded | `run_id`, `budget_type`, `current`, `limit`, `action_taken` | | `on_tool_start` | Tool started | `run_id`, `tool_name`, `action`, `arguments` | | `on_tool_end` | Tool finished | `run_id`, `tool_name`, `action`, `success`, `duration_ms`, `output_size`, `result_summary` | | `on_tool_error` | Tool error | `run_id`, `tool_name`, `action`, `error_type`, `error_message` | #### Tool Callback Events Tools emit events via the callback system. This lets you monitor all tool actions without direct logging. **Event types:** | Event | Class | Description | |------|-------|-------------| | `TOOL_START` | `ToolStartEvent` | Tool action started | | `TOOL_END` | `ToolEndEvent` | Tool action successfully completed | | `TOOL_ERROR` | `ToolErrorEvent` | Tool action failed | **Example: handling tool events** ```python from callbacks import BaseCallbackHandler, CallbackManager from tools import WebSearchTool from uuid import UUID class ToolMonitorHandler(BaseCallbackHandler): """Monitor all tool actions.""" def on_tool_start( self, *, run_id: UUID, tool_name: str, action: str, arguments: dict, **kwargs, ) -> None: print(f"[TOOL] {tool_name}.{action} started with {arguments}") def on_tool_end( self, *, run_id: UUID, tool_name: str, action: str, success: bool = True, duration_ms: float = 0.0, output_size: int = 0, result_summary: str = "", **kwargs, ) -> None: status = "OK" if success else "FAIL" print(f"[TOOL] {tool_name}.{action} {status} ({duration_ms:.0f}ms, {output_size} chars)") def on_tool_error( self, error: BaseException = None, *, run_id: UUID, tool_name: str, action: str, error_type: str = "", error_message: str = "", **kwargs, ) -> None: print(f"[TOOL ERROR] {tool_name}.{action}: {error_type} - {error_message}") # Usage cb = CallbackManager(handlers=[ToolMonitorHandler()]) tool = WebSearchTool(callback_manager=cb) tool.execute(query="Python tutorials") # [TOOL] web_search.search started with {'query': 'Python tutorials'} # [TOOL] web_search.search OK (1200ms, 3500 chars) ``` **Built-in handlers already support tool events:** - `StdoutCallbackHandler` β€” prints tool events to console with emoji - `MetricsCallbackHandler` β€” collects metrics for tool_calls, tool_durations, tool_errors #### Ignore flags You can disable specific event types: ```python class MyMinimalHandler(BaseCallbackHandler): # Ignore most events ignore_llm = True # Do not call on_llm_new_token ignore_retry = True # Do not call on_retry ignore_budget = True # Do not call on_budget_* ignore_memory = True # Do not call on_memory_* ignore_tool = True # Do not call on_tool_start/end/error # Handle only errors def on_agent_error(self, error, *, run_id, agent_id, **kwargs): log_critical_error(agent_id, error) ``` #### Combining handlers ```python from callbacks import ( StdoutCallbackHandler, MetricsCallbackHandler, FileCallbackHandler, ) # You can use multiple handlers at the same time runner = MACPRunner( llm_caller=my_llm, config=RunnerConfig(callbacks=[ StdoutCallbackHandler(show_outputs=False), # Only status to console MetricsCallbackHandler(), # Metrics collection FileCallbackHandler("debug.jsonl"), # Full log to file MySlackAlertHandler(), # Slack alerts ]) ) ``` --- ### State Storage Persistent storage for node states. ```python from utils.state_storage import ( InMemoryStateStorage, FileStateStorage, ) # 1. In-memory storage storage = InMemoryStateStorage() storage.save("agent_id", {"messages": [...], "context": {...}}) state = storage.load("agent_id") storage.delete("agent_id") all_keys = storage.keys() storage.clear() # 2. File-based storage storage = FileStateStorage(directory="./agent_states") storage.save("researcher", { "messages": [{"role": "user", "content": "Hello"}], "iteration": 5, }) state = storage.load("researcher") if state: print(f"Iteration: {state['iteration']}") storage.delete("researcher") # Get all stored IDs all_agent_ids = storage.keys() # Clear all states storage.clear() ``` --- ### Async Utils Helper functions for asynchronous execution. ```python from utils.async_utils import ( run_sync, gather_with_concurrency, timeout_wrapper, ) # 1. Run a coroutine synchronously async def my_async_function(): return "result" result = run_sync(my_async_function(), context="my_context") # 2. Parallel execution with a concurrency limit async def fetch_data(agent_id: str): # ... async call ... return response async def main(): tasks = [fetch_data(f"agent_{i}") for i in range(20)] # Run no more than 5 at once results = await gather_with_concurrency(5, *tasks) return results # 3. Timeouts async def slow_operation(): await asyncio.sleep(10) return "done" async def main(): try: result = await timeout_wrapper( slow_operation(), timeout=5.0, error_message="Operation took too long", ) except TimeoutError as e: print(f"Timeout: {e}") ``` --- ### Conditional Routing Dynamic selection of the next agent based on conditions. ```python from core.graph import ConditionalEdge from execution.scheduler import ConditionContext, ConditionEvaluator # 1. Define conditional edges def quality_above_threshold(context: ConditionContext) -> bool: """Go to editor only if quality > 0.8""" quality = context.state.get("quality_score", 0) return quality > 0.8 def has_errors(context: ConditionContext) -> bool: """Go to fixer if there are errors""" return "errors" in context.state and len(context.state["errors"]) > 0 # Add conditional edges to the graph graph.add_conditional_edge( source="writer", targets={ "editor": quality_above_threshold, "fixer": has_errors, }, default="reviewer", # Fallback if no condition matches ) # 2. Use via the builder from builder import GraphBuilder builder = GraphBuilder() builder.add_agent(agent_id="writer", display_name="Writer") builder.add_agent(agent_id="editor", display_name="Editor") builder.add_agent(agent_id="fixer", display_name="Fixer") builder.add_conditional_edge( source="writer", target="editor", condition=quality_above_threshold, weight=0.9, ) builder.add_conditional_edge( source="writer", target="fixer", condition=has_errors, weight=0.7, ) graph = builder.build() # 3. Evaluate conditions at runtime evaluator = ConditionEvaluator() context = ConditionContext( current_node="writer", state={"quality_score": 0.85, "errors": []}, history=["researcher", "writer"], metadata={"iteration": 1}, ) # Evaluate a single condition if evaluator.evaluate(quality_above_threshold, context): next_node = "editor" # Evaluate all conditions for a node next_nodes = evaluator.evaluate_all(graph, "writer", context) print(f"Next nodes: {next_nodes}") ``` --- ### Agent Tools (Tools) The `tools` module allows agents to use external tools via Native Function Calling. **Key principle:** If an agent has tools specified, they are **ALWAYS** used automatically on every LLM call. **Built-in tools:** - `shell` β€” execute shell commands - `code_interpreter` β€” execute Python code in a sandbox - `file_search` β€” search files and their contents - `web_search` β€” search the web (DuckDuckGo, Serper, Tavily) + Selenium browser for dynamic pages - `function_calling` β€” call custom functions #### Quick start ```python from builder import GraphBuilder from execution import MACPRunner from tools import tool, OpenAIToolsCaller from openai import OpenAI # 1. Register tools via the @tool decorator @tool def fibonacci(n: int) -> str: """Calculate the n-th Fibonacci number.""" a, b = 0, 1 for _ in range(n): a, b = b, a + b return str(a) @tool def is_prime(n: int) -> str: """Check if a number is prime.""" if n < 2: return "False" for i in range(2, int(n**0.5) + 1): if n % i == 0: return "False" return "True" # 2. Create an agent with tools builder = GraphBuilder() builder.add_agent( agent_id="math", display_name="Math Agent", persona="a helpful math assistant", tools=["fibonacci", "is_prime"], # <-- tools are specified here! ) builder.add_task(query="Calculate fibonacci(20) and check if it's prime") builder.connect_task_to_agents(agent_ids=["math"]) # 3. Create caller and runner client = OpenAI(api_key="...") caller = OpenAIToolsCaller(client, model="gpt-4") runner = MACPRunner(llm_caller=caller) # 4. Run β€” tools are used AUTOMATICALLY result = runner.run_round(builder.build()) print(result.final_answer) ``` **Important:** - Tools are set when creating an agent via the `tools` parameter - Runner automatically passes tools to the LLM via the API - No `enable_tools` flags are needed β€” it works automatically #### Two ways to register tools **Method 1: Global `@tool` decorator (recommended)** ```python from tools import tool @tool def calculate(expression: str) -> str: """Evaluate a math expression.""" return str(eval(expression)) @tool def search_web(query: str) -> str: """Search the web for information.""" return f"Results for: {query}" ``` **Method 2: Via ToolRegistry** ```python from tools import ToolRegistry, get_registry # Global registry registry = get_registry() @registry.function def my_tool(arg: str) -> str: """Description for the LLM.""" return arg.upper() # Or create your own registry my_registry = ToolRegistry() @my_registry.function def custom_tool(x: int) -> str: return str(x * 2) ``` #### Passing tools as objects You can pass BaseTool objects directly into AgentProfile: ```python from core.agent import AgentProfile from tools import CodeInterpreterTool, ShellTool # Create an agent with tool objects agent = AgentProfile( agent_id="coder", display_name="Code Agent", persona="a Python programmer", tools=[CodeInterpreterTool(timeout=10), ShellTool()], # <-- objects! ) # Add to the graph builder = GraphBuilder() builder.add_agent_profile(agent) ``` #### Supported tools | Tool | Description | |------|-------------| | `shell` | Execute shell commands | | `function_calling` | Call registered Python functions (grouped) | | `code_interpreter` | Execute Python code in a sandbox | | `file_search` | Search files and file contents in directories | #### Base classes ```python from tools import ( BaseTool, # Abstract base class for tools ToolCall, # A tool-call request (parsed from LLM output) ToolResult, # Tool execution result ToolRegistry, # Tool registry ShellTool, # Tool for shell commands FunctionTool, # Tool for calling (grouped) functions CodeInterpreterTool, # Tool for executing Python code FileSearchTool, # Tool for searching files ) ``` #### ShellTool β€” executing shell commands ```python from tools import ShellTool, ToolRegistry # Create a ShellTool with safety settings shell_tool = ShellTool( timeout=30, # Timeout in seconds max_output_size=8192, # Max output size working_dir="/path/to/dir", # Working directory (optional) allowed_commands=["echo", "ls", "pwd"], # Command allowlist (optional) ) # Register in a registry registry = ToolRegistry() registry.register(shell_tool) # Execute directly result = shell_tool.execute(command="echo Hello World") print(result.success) # True print(result.output) # "Hello World" # Or via the registry from tools import ToolCall call = ToolCall(name="shell", arguments={"command": "ls -la"}) result = registry.execute(call) ``` #### FunctionTool β€” calling custom functions ```python from tools import FunctionTool, ToolRegistry # Create a FunctionTool func_tool = FunctionTool() # Register functions via decorator @func_tool.register def calculate(expression: str) -> str: """Evaluate a math expression.""" return str(eval(expression)) @func_tool.register def uppercase(text: str) -> str: """Convert text to uppercase.""" return text.upper() @func_tool.register(name="word_count", description="Count words in text") def count_words(text: str) -> int: """Count words.""" return len(text.split()) # Register in the registry registry = ToolRegistry() registry.register(func_tool) # Call a function result = func_tool.execute(function="calculate", expression="2 ** 10") print(result.output) # "1024" # List registered functions print(func_tool.list_functions()) # ['calculate', 'uppercase', 'word_count'] ``` #### Two ways to register functions There are two ways to register functions as tools: **Method 1: Via FunctionTool (grouped functions)** Functions are grouped under a single tool named `function_calling`. The LLM must call them like this: ```json {"name": "function_calling", "arguments": {"function": "calculate", "expression": "2+2"}} ``` ```python func_tool = FunctionTool() @func_tool.register def calculate(expression: str) -> str: return str(eval(expression)) registry.register(func_tool) ``` **Method 2: Via `@registry.function` (separate tools) β€” RECOMMENDED** Each function becomes a separate tool. The LLM calls them directly: ```json {"name": "calculate", "arguments": {"expression": "2+2"}} ``` ```python @registry.function def calculate(expression: str) -> str: return str(eval(expression)) @registry.function def fibonacci(n: int) -> str: """Calculate the n-th Fibonacci number.""" a, b = 0, 1 for _ in range(n): a, b = b, a + b return str(a) ``` **Recommendation:** Use `@registry.function` β€” it is simpler for the LLM and avoids confusion with nested arguments. #### CodeInterpreterTool β€” executing Python code Allows agents to execute arbitrary Python code in a safe sandbox environment. ```python from tools import CodeInterpreterTool, ToolRegistry, ToolCall # Create a CodeInterpreterTool code_tool = CodeInterpreterTool( timeout=30, # Execution timeout in seconds max_output_size=8192, # Maximum output size safe_mode=True, # Restricted builtins for safety ) # Register registry = ToolRegistry() registry.register(code_tool) # Example 1: Simple computation result = code_tool.execute(code="2 ** 10 + sum(range(5))") print(result.output) # "1034" # Example 2: Multi-line code with functions code = """ def fibonacci(n): a, b = 0, 1 for _ in range(n): a, b = b, a + b return a for i in range(10): print(f"fib({i}) = {fibonacci(i)}") """ result = code_tool.execute(code=code) print(result.output) # fib(0) = 0 # fib(1) = 1 # fib(2) = 1 # ... # Example 3: Using preloaded modules # Available in sandbox: math, statistics, json, re, datetime, # collections, itertools, functools, random result = code_tool.execute(code=""" # Modules are already loaded; no import needed print(f"pi = {math.pi:.6f}") print(f"e = {math.e:.6f}") data = {"name": "Alice", "age": 30} print(json.dumps(data, indent=2)) """) print(result.output) # Example 4: Error handling result = code_tool.execute(code="1 / 0") print(result.success) # False print(result.error) # "ZeroDivisionError: division by zero" ``` **Safety:** - With `safe_mode=True`, built-in functions are restricted - Forbidden: `open`, `exec`, `eval`, `__import__`, `compile` - Only safe modules are available - Timeout prevents infinite loops #### FileSearchTool β€” searching files and contents Allows agents to search files by name, search text within files, and read file contents. ```python from tools import FileSearchTool, ToolRegistry, ToolCall # Create a FileSearchTool file_tool = FileSearchTool( base_directory="./project", # Base directory to search within max_results=50, # Maximum number of results max_depth=10, # Maximum recursion depth max_file_size=100_000, # Max file size for content search max_read_size=10_000, # Max size for reading a file allowed_extensions=[".py", ".txt", ".md"], # Allowed extensions (optional) ) registry = ToolRegistry() registry.register(file_tool) # Example 1: Find all Python files result = file_tool.execute(pattern="*.py") print(result.output) # Found 15 file(s) matching '*.py': # src/main.py (1,234 bytes) # src/utils.py (567 bytes) # ... # Example 2: Search in a specific directory result = file_tool.execute(pattern="test_*.py", directory="tests") print(result.output) # Example 3: Search within file contents result = file_tool.execute(pattern="*.py", query="def main") print(result.output) # Search results for 'def main' in 15 file(s): # Found 3 match(es). # === src/main.py === # 42: def main(): # === src/cli.py === # 15: def main_entry(): # ... # Example 4: Regex search result = file_tool.execute(pattern="*.py", query=r"def \w+_handler", regex=True) # Example 5: Read a specific file result = file_tool.execute(read_file="src/config.py") print(result.output) # === src/config.py === # """Configuration module.""" # import os # ... # Example 6: Via ToolCall (how the LLM calls it) call = ToolCall( name="file_search", arguments={"pattern": "*.py", "query": "class Agent"} ) result = registry.execute(call) ``` **Safety:** - Cannot escape outside `base_directory` - Hidden files and directories (starting with `.`) are skipped - File size limits prevent reading huge files #### WebSearchTool β€” searching, reading, and interacting with web pages A tool for working with the internet: search (DuckDuckGo/Serper/Tavily), fetching pages, and full interaction via Selenium (clicks, forms, JS, crawl). > **Install Selenium** (optional): > ```bash > pip install selenium webdriver-manager > ``` ##### Quick start **Method 1 β€” dict config (recommended):** ```python from builder import GraphBuilder from execution import MACPRunner builder = GraphBuilder() builder.add_agent( "researcher", persona="research assistant", # Dict config β€” tool is created automatically with the desired parameters tools=[{"name": "web_search", "use_selenium": True, "fetch_content": True}], ) builder.add_task(query="Find information about Python 3.12") builder.connect_task_to_agents(agent_ids=["researcher"]) graph = builder.build() runner = MACPRunner(llm_caller=my_caller) result = runner.run_round(graph) ``` **Method 2 β€” registry registration:** ```python from tools import WebSearchTool, get_registry registry = get_registry() registry.register(WebSearchTool(use_selenium=True, fetch_content=True)) # Agent references it by name builder.add_agent("researcher", tools=["web_search"]) ``` **Method 3 β€” pass the object directly:** ```python from tools import WebSearchTool builder.add_agent( "researcher", tools=[WebSearchTool(use_selenium=True)], ) ``` ##### Dict config parameters ```python tools=[{ "name": "web_search", # All WebSearchTool constructor parameters: "use_selenium": True, "fetch_content": True, "max_results": 5, "timeout": 15, "max_content_length": 4000, "selenium_config": { "headless": True, "browser": "edge", # "chrome", "firefox", "edge" "extra_wait": 1.0, "disable_images": True, "page_load_timeout": 30, }, # Provider by string: # "provider": "serper", # "duckduckgo", "serper", "tavily" # "api_key": "...", }] ``` The browser is detected automatically. If `webdriver-manager` cannot download a driver (no internet, SSL error), a system driver is used. ##### Actions (the `action` parameter) `action` is a command that defines what to do. All actions run within the same browser session. | action | Description | Required parameters | |--------|-------------|---------------------| | `search` | Web search | `query` | | `fetch` | Open and read a page | `url` | | `click` | Click an element | `selector` | | `fill` | Fill an input | `selector`, `value` | | `extract_links` | Extract links from a page | β€” | | `execute_js` | Execute JavaScript | `js_code` | | `crawl` | Recursive site crawl | `url` | | `get_content` | Text of the current page | β€” | `search` and `fetch` work without Selenium. The rest require `use_selenium=True`. If `action` is not provided, it is inferred automatically: `query` β†’ search, `url` β†’ fetch, `selector` β†’ click, `js_code` β†’ execute_js. ##### Action examples ```python from tools import WebSearchTool with WebSearchTool(use_selenium=True) as tool: # Search result = tool.execute(action="search", query="Python tutorials") # Fetch a page (wait for an element) result = tool.execute(action="fetch", url="https://example.com", wait_for_selector="h1") # Click result = tool.execute(action="click", selector="a.nav-link") # Fill a form and submit result = tool.execute(action="fill", selector="input[name=q]", value="Python", submit=True) # Extract links result = tool.execute(action="extract_links", url="https://example.com") # Execute JS result = tool.execute(action="execute_js", js_code="return document.title") # Crawl result = tool.execute(action="crawl", url="https://docs.python.org", max_depth=2, max_pages=5) # Current page text result = tool.execute(action="get_content") ``` ##### Search providers | Provider | API key | Description | |----------|---------|-------------| | `DuckDuckGoProvider` | No | Default, free | | `SerperProvider` | Yes (serper.dev) | Google Search | | `TavilyProvider` | Yes (tavily.com) | With AI summarization | ```python # Via dict config tools=[{"name": "web_search", "provider": "tavily", "api_key": "tvly-..."}] # Or directly from tools import WebSearchTool, TavilyProvider tool = WebSearchTool(provider=TavilyProvider(api_key="tvly-...")) ``` Custom provider: ```python from tools import WebSearchTool, SearchProvider class MyProvider(SearchProvider): def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]: return [{"title": "Result", "url": "https://example.com", "snippet": query}] tool = WebSearchTool(provider=MyProvider()) ``` ##### Constructor parameters | Parameter | Type | Default | Description | |----------|------|---------|-------------| | `provider` | `SearchProvider \| None` | `DuckDuckGoProvider` | Search provider | | `max_results` | `int` | `5` | Max search results | | `max_content_length` | `int` | `4000` | Max page content length | | `fetch_content` | `bool` | `False` | Fetch page contents during search | | `timeout` | `int` | `15` | Request timeout (sec) | | `use_selenium` | `bool` | `False` | Use Selenium | | `selenium_config` | `dict \| None` | `None` | Selenium settings (headless, browser, extra_wait, etc.) | | `selenium_fetcher` | `SeleniumFetcher \| None` | `None` | A pre-built SeleniumFetcher instance | | `callback_manager` | `CallbackManager \| None` | `None` | For events (if None β€” taken from context) | ##### execute() parameters | Parameter | Type | Description | |----------|------|-------------| | `action` | `str` | Action (see table above). Auto-inferred if omitted | | `query` | `str` | Search query | | `url` | `str` | Page URL | | `selector` | `str` | CSS selector | | `value` | `str` | Value for fill | | `submit` | `bool` | Submit the form (default: False) | | `js_code` | `str` | JavaScript code | | `max_pages` | `int` | Max pages for crawl (default: 10) | | `max_depth` | `int` | Max crawl depth (default: 2) | | `url_filter` | `str` | Regex filter for crawl URLs | | `fetch_content` | `bool` | Fetch contents (for search) | | `max_results` | `int` | Max results (for search) | | `wait_for_selector` | `str` | CSS selector to wait for page readiness | ##### Callback integration WebSearchTool emits `on_tool_start`/`on_tool_end`/`on_tool_error` events via the callback system: ```python from callbacks import CallbackManager, StdoutCallbackHandler from tools import WebSearchTool cb = CallbackManager(handlers=[StdoutCallbackHandler()]) tool = WebSearchTool(callback_manager=cb, use_selenium=True) tool.execute(action="fetch", url="https://example.com") # πŸ› οΈ Tool 'web_search.fetch' started # βœ… Tool 'web_search.fetch' ended (1200ms) ``` ##### Notes - Two modes: `urllib` (no dependencies) and Selenium (full browser) - Browsers: Chrome, Firefox, Edge (automatic fallback to system driver) - Context manager: `with WebSearchTool(...) as tool:` β€” auto-closes the browser - Built-in HTML parser without external dependencies - `create_tool_from_config()` β€” build from dict config for agent integration #### ToolRegistry β€” tool registry ```python from tools import ToolRegistry, ShellTool, FunctionTool # Create a registry registry = ToolRegistry() # Register tools registry.register(ShellTool(timeout=10)) registry.register(FunctionTool()) # Register functions via the registry decorator (convenient) @registry.function def greet(name: str) -> str: """Greeting.""" return f"Hello, {name}!" @registry.function(name="add", description="Add two numbers") def add_numbers(a: int, b: int) -> int: return a + b # Check tool presence print(registry.has("shell")) # True print(registry.has("greet")) # True # List tools print(registry.list_tools()) # ['shell', 'function_calling', 'greet', 'add'] # Get tools for an agent tools = registry.get_tools_for_agent(["shell", "greet"]) print([t.name for t in tools]) # ['shell', 'greet'] # Format a prompt with tool descriptions prompt = registry.format_tools_prompt(["shell", "greet"]) print(prompt) # Available tools: # - shell: Execute a shell command... # - greet: Greeting. # To use a tool, format your response as: # {"name": "tool_name", "arguments": {...}} ``` #### Parsing tool_call from an LLM response An agent can call a tool by including a special tag in its response: ```python from tools import ToolCall # LLM returns a response with tool calls llm_response = """ I need to compute the result. {"name": "calculate", "arguments": {"expression": "2 + 2"}} And also check the directory: {"name": "shell", "arguments": {"command": "ls"}} """ # Parse all calls calls = ToolCall.parse_from_response(llm_response) print(len(calls)) # 2 print(calls[0].name) # "calculate" print(calls[0].arguments) # {"expression": "2 + 2"} # Execute all calls results = registry.execute_all(calls) for result in results: print(f"{result.tool_name}: {result.output if result.success else result.error}") ``` #### Integration with MACPRunner Tools are used **automatically** β€” it is enough to specify them when creating the agent. ```python from execution import MACPRunner, RunnerConfig from builder import GraphBuilder from tools import ( tool, get_registry, register_tool, ShellTool, CodeInterpreterTool, FileSearchTool, OpenAIToolsCaller, ) from openai import OpenAI # 1. Register built-in tools register_tool(ShellTool(timeout=10)) register_tool(CodeInterpreterTool(timeout=10, safe_mode=True)) register_tool(FileSearchTool(base_directory=".")) # Register custom functions via @tool @tool def get_current_time() -> str: """Get current date and time.""" from datetime import datetime return datetime.now().strftime("%Y-%m-%d %H:%M:%S") @tool def calculate(expression: str) -> str: """Evaluate math expression safely.""" return str(eval(expression, {"__builtins__": {}}, {})) # 2. Create a graph with agents builder = GraphBuilder() builder.add_agent( "assistant", display_name="AI Assistant", persona="Helpful assistant who uses tools to solve problems.", tools=["shell", "get_current_time"], # <-- tools are used automatically! ) builder.add_agent( "coder", display_name="Python Coder", persona="Python expert who writes and executes code.", tools=["code_interpreter"], ) builder.add_agent( "calculator", display_name="Calculator Agent", persona="Math expert who calculates expressions.", tools=["calculate"], ) builder.add_workflow_edge("assistant", "calculator") builder.add_task(query="What is 25 * 17 and what time is it?") builder.connect_task_to_agents() graph = builder.build() # 3. Create caller and runner client = OpenAI(api_key="...") caller = OpenAIToolsCaller(client, model="gpt-4") runner = MACPRunner(llm_caller=caller) # No extra configuration needed! # 4. Execute β€” tools are used automatically result = runner.run_round(graph) print(result.final_answer) ``` **Note:** The `max_tool_iterations` parameter in `RunnerConfig` limits the number of tool-calling loops (default is 3). #### Creating a custom tool ```python from tools import BaseTool, ToolResult from typing import Any class WeatherTool(BaseTool): """A tool for getting weather.""" @property def name(self) -> str: return "weather" @property def description(self) -> str: return "Get current weather for a city" @property def parameters_schema(self) -> dict[str, Any]: return { "type": "object", "properties": { "city": { "type": "string", "description": "City name" } }, "required": ["city"] } def execute(self, city: str = "", **kwargs) -> ToolResult: if not city: return ToolResult( tool_name=self.name, success=False, error="City is required" ) # A real API call would go here weather = f"Sunny, 22Β°C in {city}" return ToolResult( tool_name=self.name, success=True, output=weather ) # Usage registry = ToolRegistry() registry.register(WeatherTool()) result = registry.execute(ToolCall(name="weather", arguments={"city": "Moscow"})) print(result.output) # "Sunny, 22Β°C in Moscow" ``` #### Example: full workflow with tools ```python """Full example of using tools in a multi-agent system.""" import math from execution import MACPRunner, RunnerConfig from builder import GraphBuilder from tools import ( ToolRegistry, ShellTool, CodeInterpreterTool, FileSearchTool, ) # Configure tools registry = ToolRegistry() # Shell with allowlist registry.register(ShellTool( timeout=5, allowed_commands=["echo", "date", "pwd", "ls"] )) # Code interpreter to execute Python code registry.register(CodeInterpreterTool(timeout=10, safe_mode=True)) # File search to find files registry.register(FileSearchTool(base_directory=".", max_results=20)) # Math functions β€” register directly via @registry.function # This allows the LLM to call them by name: {"name": "sqrt", "arguments": {"x": 144}} @registry.function def sqrt(x: float) -> float: """Calculate square root.""" return math.sqrt(x) @registry.function def power(base: float, exp: float) -> float: """Calculate base^exp.""" return math.pow(base, exp) @registry.function def factorial(n: int) -> int: """Calculate factorial.""" return math.factorial(n) # Build the graph builder = GraphBuilder() builder.add_agent( "math_solver", persona="Expert mathematician", tools=["sqrt", "power", "factorial"], # Direct access to functions ) builder.add_agent( "coder", persona="Python developer", tools=["code_interpreter"], # Execute Python code ) builder.add_agent( "researcher", persona="Code researcher", tools=["file_search"], # Search files ) builder.add_agent( "coordinator", persona="Task coordinator that combines results", tools=[], # No tools ) builder.add_workflow_edge("math_solver", "coordinator") builder.add_workflow_edge("coder", "coordinator") builder.add_workflow_edge("researcher", "coordinator") builder.add_task(query="Calculate sqrt(144), then write Python to verify") builder.connect_task_to_agents() graph = builder.build() # Execute def mock_llm(prompt: str) -> str: if "mathematician" in prompt: return '''I'll calculate the square root. {"name": "sqrt", "arguments": {"x": 144}} ''' elif "developer" in prompt: return '''Let me verify with Python code. {"name": "code_interpreter", "arguments": {"code": "import math\\nprint(f'sqrt(144) = {math.sqrt(144)}')"}} ''' elif "researcher" in prompt: return '''Let me find Python files. {"name": "file_search", "arguments": {"pattern": "*.py", "directory": "src"}} ''' else: return "Based on the results: sqrt(144) = 12 and we're in the current directory." config = RunnerConfig(enable_tools=True, max_tool_iterations=2) runner = MACPRunner(llm_caller=mock_llm, tool_registry=registry, config=config) result = runner.run_round(graph) print("Final:", result.final_answer) ``` #### Running the example ```bash # Run the tools example uv run python examples/tools_example.py # Run tests uv run pytest tests/test_tools.py -v ``` --- ## API Reference ### Core classes | Class | Description | Pydantic | |-------|-------------|----------| | `RoleGraph` | Role/agent graph with adjacency matrices | ❌ | | `AgentProfile` | **Pydantic BaseModel** β€” Immutable agent profile | βœ… | | `TaskNode` | **Pydantic BaseModel** β€” Virtual task node | βœ… | | `NodeEncoder` | Text-to-embeddings encoder | ❌ | | `MACPRunner` | MACP protocol executor | ❌ | | `AdaptiveScheduler` | Adaptive scheduler | ❌ | | `LLMCallerFactory` | Factory for creating LLM callers (multi-model) | ❌ | | `LLMConfig` | **Pydantic BaseModel** β€” LLM configuration for schemas | βœ… | | `AgentLLMConfig` | **Pydantic BaseModel** β€” LLM configuration for AgentProfile | βœ… | | `AgentMemory` | Agent memory manager | ❌ | | `SharedMemoryPool` | Shared memory pool | ❌ | | `BudgetTracker` | Token/request budget tracker | ❌ | | `MetricsTracker` | Performance metrics tracker | ❌ | | `GraphVisualizer` | Graph visualization | ❌ | | `BaseCallbackHandler` | Base callback handler | ❌ | | `AsyncCallbackHandler` | Async callback handler | ❌ | | `CallbackManager` | Callback handlers manager | ❌ | | `AsyncCallbackManager` | Async callbacks manager | ❌ | | `StdoutCallbackHandler` | Console event output | ❌ | | `MetricsCallbackHandler` | Execution metrics aggregation | ❌ | | `FileCallbackHandler` | Write events to JSON Lines file | ❌ | | `EventBus` | Event bus for graph monitoring | ❌ | | `EarlyStopCondition` | Early stopping condition | ❌ | | `StepContext` | **Pydantic BaseModel** β€” Step context for hooks | βœ… | | `TopologyAction` | **Pydantic BaseModel** β€” Topology modification action | βœ… | ### Schemas (Pydantic BaseModel) | Schema class | Description | Usage | |-------------|-------------|-------| | `GraphSchema` | **Pydantic** β€” Full graph schema | Validation, serialization, migration | | `BaseNodeSchema` | **Pydantic** β€” Base node schema | Parent class for nodes | | `AgentNodeSchema` | **Pydantic** β€” Agent node schema | LLM config, tools, metrics, embeddings | | `TaskNodeSchema` | **Pydantic** β€” Task node schema | Query, status, deadline | | `BaseEdgeSchema` | **Pydantic** β€” Base edge schema | Weight, probability, cost | | `WorkflowEdgeSchema` | **Pydantic** β€” Workflow edge | Conditions, priority, transforms | | `CostMetrics` | **Pydantic** β€” Cost metrics | Tokens, latency, trust, reliability | | `ValidationResult` | **Pydantic** β€” Validation result | Errors, warnings | ### Visualization (Pydantic BaseModel) | Class | Description | Usage | |-------|-------------|-------| | `VisualizationStyle` | **Pydantic** β€” Global visualization style | Configure colors, shapes, what to show | | `NodeStyle` | **Pydantic** β€” Node style | Shape, fill_color, stroke_color, icon | | `EdgeStyle` | **Pydantic** β€” Edge style | Line style, arrow, colors | | `NodeShape` | Enum β€” Node shapes | RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc. | | `MermaidDirection` | Enum β€” Graph direction | TOP_BOTTOM, LEFT_RIGHT, etc. | ### GNN (Pydantic BaseModel) | Class | Description | Usage | |-------|-------------|-------| | `FeatureConfig` | **Pydantic** β€” Feature configuration | Node/edge feature dimensions | | `TrainingConfig` | **Pydantic** β€” Training configuration | Learning rate, epochs, optimizer | ### Graph construction functions | Function | Description | |---------|-------------| | `build_property_graph()` | Main graph builder | | `build_from_schema()` | Build from GraphSchema | | `build_from_adjacency()` | Build from adjacency matrix | | `GraphBuilder` | Fluent graph builder with multi-model support | ### Multi-model functions | Function | Description | |---------|-------------| | `create_openai_caller()` | Create a legacy flat-string `(str) -> str` LLM caller | | `create_openai_structured_caller()` | Create a sync structured caller `(list[dict]) -> str` β€” **recommended** | | `create_openai_async_structured_caller()` | Create an async structured caller β€” required for `astream()` with `enable_parallel=True` | | `LLMCallerFactory.create_openai_factory()` | Create a factory for automatic caller generation | | `LLMConfig.merge_with()` | Merge LLM configurations (fallback) | | `AgentProfile.with_llm_config()` | Set LLM configuration for an agent | | `AgentProfile.has_custom_llm()` | Check whether an agent has a custom LLM config | ### Scheduling functions | Function | Description | |---------|-------------| | `build_execution_order()` | Topological execution order | | `get_parallel_groups()` | Parallel execution groups | | `extract_agent_adjacency()` | Extract the agent adjacency matrix | | `get_incoming_agents()` | Agent predecessors | | `get_outgoing_agents()` | Agent successors | ### Configuration classes | Class | Description | |------|-------------| | `RunnerConfig` | MACPRunner configuration | | `LLMConfig` | LLM configuration for an agent (multi-model) | | `AgentLLMConfig` | Immutable LLM configuration for AgentProfile | | `RoutingPolicy` | Routing policies | | `PruningConfig` | Agent pruning configuration | | `MemoryConfig` | Memory system configuration | | `TrainingConfig` | GNN training configuration | | `ErrorPolicy` | Error-handling policies | | `FrameworkSettings` | Global framework settings | --- ## FAQ ### Why Pydantic? What benefits does it provide? gMAS Framework is built entirely on **Pydantic 2.0+** to ensure type safety, automatic validation, and convenient serialization. Key benefits: 1. **Automatic type validation** β€” errors are caught when objects are created, not later at runtime 2. **Declarative typing** β€” IDE autocompletion, static checking (mypy, pyright) 3. **Automatic serialization** β€” `.model_dump()`, `.model_dump_json()` work out of the box 4. **Default values** β€” no need to write boilerplate 5. **Nested models** β€” automatic validation of nested structures 6. **Migrations** β€” safe schema upgrades between versions 7. **Immutability** β€” `frozen=True` prevents accidental mutation ```python from core import AgentProfile from pydantic import ValidationError # βœ… Correct usage β€” Pydantic validates agent = AgentProfile( agent_id="test", display_name="Test Agent", tools=["tool1", "tool2"], ) # ❌ Incorrect β€” Pydantic will raise ValidationError try: bad_agent = AgentProfile( agent_id=123, # Must be str, not int display_name="Test", ) except ValidationError as e: print(e.errors()) # Detailed error info # Automatic serialization (Pydantic v2 API) data = agent.model_dump() # β†’ dict json_str = agent.model_dump_json(indent=2) # β†’ JSON string # Automatic deserialization loaded = AgentProfile.model_validate(data) from_json = AgentProfile.model_validate_json(json_str) ``` ### Which Pydantic version is required? Is it compatible with Pydantic 1.x? **gMAS Framework requires Pydantic 2.0+ and is not compatible with Pydantic 1.x.** Key API differences: - Pydantic 1.x: `.dict()`, `.parse_obj()`, `.json()` - Pydantic 2.x: `.model_dump()`, `.model_validate()`, `.model_dump_json()` If you have Pydantic 1.x installed: ```bash pip install --upgrade "pydantic>=2.0" ``` Version check: ```python import pydantic print(pydantic.VERSION) # Must be >= 2.0.0 ``` ### How do I use different models for different agents? ```python from builder import GraphBuilder from execution import MACPRunner, LLMCallerFactory # Method 1: Via GraphBuilder (recommended) builder = GraphBuilder() builder.add_agent( "analyst", llm_backbone="gpt-4", # Strong model base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.0, max_tokens=4000, ) builder.add_agent( "formatter", llm_backbone="gpt-4o-mini", # Cheaper model base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.3, max_tokens=1000, ) builder.add_workflow_edge("analyst", "formatter") graph = builder.build() # Factory auto-creates callers factory = LLMCallerFactory.create_openai_factory() runner = MACPRunner(llm_factory=factory) result = runner.run_round(graph) ``` ### How do I integrate with OpenAI? ```python import openai # Method 1: Simple integration (one LLM for all) def openai_caller(prompt: str) -> str: response = openai.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], ) return response.choices[0].message.content runner = MACPRunner(llm_caller=openai_caller) # Method 2: Multi-model integration (recommended) from execution import create_openai_caller # Uses the openai SDK automatically runner = MACPRunner( llm_factory=LLMCallerFactory.create_openai_factory( default_api_key="sk-...", default_base_url="https://api.openai.com/v1", ) ) ``` ### How do I use local models (Ollama)? ```python import requests def ollama_caller(prompt: str) -> str: response = requests.post( "http://localhost:11434/api/generate", json={"model": "llama2", "prompt": prompt, "stream": False}, ) return response.json()["response"] runner = MACPRunner(llm_caller=ollama_caller) ``` ### How do I add custom tools? Tools are just strings that are included in the agent prompt: ```python agent = AgentProfile( agent_id="code_executor", display_name="Code Executor", tools=["python_execute", "file_read", "file_write"], ) ``` Tool logic is implemented inside your LLM call. ### How do I visualize the graph? Which formats are supported? gMAS Framework provides a powerful visualization system with **Pydantic styles** and support for multiple formats: **Supported formats:** 1. **Mermaid** β€” for GitHub/docs 2. **ASCII art** β€” for terminals 3. **Graphviz DOT** β€” for professional visualization 4. **Rich Console** β€” colored terminal output 5. **PNG/SVG/PDF** β€” image rendering (requires system Graphviz) ```python from core.visualization import ( GraphVisualizer, VisualizationStyle, NodeStyle, NodeShape, MermaidDirection, # Convenience functions to_mermaid, to_ascii, print_graph, render_to_image, ) # Quick visualization (convenience functions) print(to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT)) print(to_ascii(graph, show_edges=True)) print_graph(graph, format="auto") # Auto-selects colored/ascii # Advanced custom styles (Pydantic models) style = VisualizationStyle( direction=MermaidDirection.LEFT_RIGHT, agent_style=NodeStyle( shape=NodeShape.ROUND, fill_color="#e3f2fd", stroke_color="#1976d2", icon="πŸ€–", ), show_weights=True, show_tools=True, ) viz = GraphVisualizer(graph, style) viz.save_mermaid("graph.md", title="My Workflow") viz.save_dot("graph.dot") # Image rendering (requires: pip install graphviz + system graphviz) try: render_to_image(graph, "output.png", format="png", dpi=150, style=style) render_to_image(graph, "output.svg", format="svg", style=style) print("βœ… Images created") except Exception as e: print(f"⚠️ Install system Graphviz: {e}") # Ubuntu: sudo apt install graphviz # macOS: brew install graphviz ``` **Installing Graphviz for image rendering:** ```bash # Python library pip install graphviz # System Graphviz # Ubuntu/Debian: sudo apt install graphviz # macOS: brew install graphviz # Windows: winget install graphviz ``` ### How do I save and load a graph? ```python import json # Save data = graph.to_dict() with open("graph.json", "w") as f: json.dump(data, f) # Load with open("graph.json", "r") as f: data = json.load(f) graph = RoleGraph.from_dict(data) ``` **Saving via Pydantic schemas (recommended):** ```python from core.schema import GraphSchema # Build a schema from the graph schema = GraphSchema( name="MyGraph", nodes={agent.agent_id: AgentNodeSchema.from_profile(agent) for agent in graph.agents}, edges=[BaseEdgeSchema.from_edge(e) for e in graph.edges], ) # Save (Pydantic auto-serialization) schema_json = schema.model_dump_json(indent=2) with open("graph_schema.json", "w") as f: f.write(schema_json) # Load (Pydantic auto-validation) with open("graph_schema.json", "r") as f: loaded_schema = GraphSchema.model_validate_json(f.read()) # Build a graph from the schema from builder import build_from_schema graph = build_from_schema(loaded_schema) ``` ### How do I handle agent errors? ```python from execution import RunnerConfig, ErrorPolicy config = RunnerConfig( error_policy=ErrorPolicy( on_error="fallback", # skip, retry, fallback, fail max_retries=3, ), pruning_config=PruningConfig( enable_fallback=True, max_fallback_attempts=2, ), ) result = runner.run_round(graph) if result.errors: for error in result.errors: print(f"Error in {error.agent_id}: {error.message}") ``` ### How do I track agent performance? ```python from core.metrics import MetricsTracker tracker = MetricsTracker() # Runner integration runner = MACPRunner(llm_caller=my_llm, metrics_tracker=tracker) result = runner.run_round(graph) # Retrieve metrics for agent_id in graph.node_ids: metrics = tracker.get_node_metrics(agent_id) print(f"{agent_id}:") print(f" Reliability: {metrics.reliability:.2%}") print(f" Avg latency: {metrics.avg_latency_ms:.0f}ms") print(f" Quality: {metrics.avg_quality:.2f}") # Save metrics tracker.save("metrics.json") ``` ### How do I use dynamic topology? ```python # Modify the graph at runtime graph.add_node(new_agent, connections_to=["existing_agent"]) graph.add_edge("agent1", "new_agent", weight=0.8) # Remove inefficient agents if metrics.get_node_metrics("slow_agent").avg_latency_ms > 5000: graph.remove_node("slow_agent", policy=StateMigrationPolicy.DISCARD) # Update weights based on performance new_weights = compute_weights_from_metrics(tracker) graph.update_communication(new_weights) ``` ### How do I integrate with LangChain? ```python from langchain.chat_models import ChatOpenAI from langchain.schema import HumanMessage llm = ChatOpenAI(model="gpt-4") def langchain_caller(prompt: str) -> str: messages = [HumanMessage(content=prompt)] response = llm(messages) return response.content runner = MACPRunner(llm_caller=langchain_caller) result = runner.run_round(graph) ``` ### How do I implement human-in-the-loop? ```python from execution import StreamEventType def human_approval(agent_id: str, response: str) -> bool: print(f"\n{agent_id} replied: {response}") approval = input("Approve? (y/n): ") return approval.lower() == 'y' def stream_with_approval(graph): for event in runner.stream(graph): if event.event_type == StreamEventType.AGENT_OUTPUT: if not human_approval(event.agent_id, event.content): # Restart the agent with feedback feedback = input("Your feedback: ") # ... restart logic ... yield event ``` ### How do I use a graph with multiple tasks? ```python # Option 1: sequential queries = ["Task 1", "Task 2", "Task 3"] for query in queries: graph.query = query result = runner.run_round(graph) print(f"{query}: {result.final_answer}") # Option 2: parallel (async) async def process_queries(queries): tasks = [] for query in queries: graph_copy = copy.deepcopy(graph) graph_copy.query = query tasks.append(runner.arun_round(graph_copy)) results = await asyncio.gather(*tasks) return results ``` ### How do I combine cloud and local models? ```python from builder import GraphBuilder builder = GraphBuilder() # Cloud model for public data builder.add_agent( "public_analyzer", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", ) # Local model (Ollama) for confidential data builder.add_agent( "private_analyzer", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1", api_key="not-needed", # Ollama does not require an API key ) builder.add_workflow_edge("public_analyzer", "private_analyzer") graph = builder.build() factory = LLMCallerFactory.create_openai_factory() runner = MACPRunner(llm_factory=factory) ``` ### How do I optimize LLM cost with multi-model routing? ```python # Strategy: cheap models for routine tasks, expensive for complex tasks builder = GraphBuilder() # Steps 1-3: simple operations β†’ cheap model for i in range(3): builder.add_agent( f"processor_{i}", llm_backbone="gpt-4o-mini", # $0.15/$0.60 per 1M tokens max_tokens=500, ) # Step 4: complex analysis β†’ expensive model builder.add_agent( "analyst", llm_backbone="gpt-4", # $30/$60 per 1M tokens max_tokens=2000, ) # Step 5: final formatting β†’ cheap model builder.add_agent( "formatter", llm_backbone="gpt-4o-mini", max_tokens=500, ) # Savings: ~70–80% vs using gpt-4 for all steps ``` ### How do I use API keys safely? ```python # ❌ DO NOT do this (hardcode keys) builder.add_agent("agent", api_key="sk-1234567890...") # βœ… Correct: use environment variables import os # Method 1: load from a .env file from dotenv import load_dotenv load_dotenv() builder.add_agent("agent", api_key="$OPENAI_API_KEY") # Method 2: set the env var explicitly os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip() builder.add_agent("agent", api_key="$OPENAI_API_KEY") # Method 3: use a factory with a default key factory = LLMCallerFactory.create_openai_factory( default_api_key=os.getenv("OPENAI_API_KEY"), ) ``` ### How do I configure logging? ```python from config import setup_logging # Configure global logging setup_logging( level="DEBUG", log_file="framework.log", rotation="500 MB", retention="10 days", format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", backtrace=True, diagnose=True, ) # Use in code from config import logger logger.info("Starting execution") logger.debug(f"Graph has {graph.num_nodes} nodes") logger.error("Failed to execute agent", exc_info=True) ``` ### How do I export a graph for analysis? ```python # 1. JSON serialization import json graph_data = graph.to_dict() with open("graph.json", "w") as f: json.dump(graph_data, f, indent=2) # 2. PyTorch Geometric format pyg_data = graph.to_pyg_data() torch.save(pyg_data, "graph.pt") # 3. NetworkX format (if needed) import networkx as nx G = nx.DiGraph() for node_id in graph.node_ids: G.add_node(node_id, **graph.get_agent_by_id(node_id).to_dict()) for i, j in zip(*graph.edge_index): src = graph.node_ids[i] tgt = graph.node_ids[j] G.add_edge(src, tgt, weight=graph.A_com[i, j]) nx.write_gexf(G, "graph.gexf") # 4. CSV export import pandas as pd # Nodes nodes_df = pd.DataFrame([ {"id": agent.agent_id, "name": agent.display_name, "tools": ",".join(agent.tools)} for agent in graph.agents ]) nodes_df.to_csv("nodes.csv", index=False) # Edges edges = [] for i in range(graph.num_nodes): for j in range(graph.num_nodes): if graph.A_com[i, j] > 0: edges.append({ "source": graph.node_ids[i], "target": graph.node_ids[j], "weight": graph.A_com[i, j], }) edges_df = pd.DataFrame(edges) edges_df.to_csv("edges.csv", index=False) ``` ### How do I test agents? ```python import pytest from unittest.mock import Mock def test_agent_execution(): # Mock the LLM mock_llm = Mock(return_value="Mocked response") # Build a graph agents = [AgentProfile(agent_id="test", display_name="Test Agent")] graph = build_property_graph(agents, [], query="Test query") # Run runner = MACPRunner(llm_caller=mock_llm) result = runner.run_round(graph) # Assertions assert result.final_answer == "Mocked response" assert len(result.execution_order) == 1 assert result.total_tokens >= 0 mock_llm.assert_called_once() def test_error_handling(): # Mock the LLM with an error mock_llm = Mock(side_effect=Exception("LLM error")) graph = build_property_graph([agent], [], query="Test") config = RunnerConfig( max_retries=2, error_policy=ErrorPolicy(on_error=ErrorAction.SKIP), ) runner = MACPRunner(llm_caller=mock_llm, config=config) result = runner.run_round(graph) assert len(result.errors) > 0 assert result.final_answer is None def test_parallel_execution(): agents = [ AgentProfile(agent_id=f"agent_{i}", display_name=f"Agent {i}") for i in range(3) ] edges = [("agent_0", "agent_1"), ("agent_0", "agent_2")] graph = build_property_graph(agents, edges, query="Test") config = RunnerConfig(enable_parallel=True, max_parallel_size=2) runner = MACPRunner(llm_caller=mock_llm, config=config) result = runner.run_round(graph) assert len(result.execution_order) == 3 ``` ### How do I scale to large graphs? ```python # 1. Use pruning to cut inefficient paths config = RunnerConfig( pruning_config=PruningConfig( min_weight_threshold=0.2, min_probability_threshold=0.1, token_budget=5000, ), ) # 2. Use parallel execution config.enable_parallel = True config.max_parallel_size = 10 # 3. Use beam search to cap paths config.routing_policy = RoutingPolicy.BEAM_SEARCH scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=5) # 4. Use subgraph filtering from core.algorithms import GraphAlgorithms, SubgraphFilter algo = GraphAlgorithms(graph) subgraph = algo.filter_subgraph(SubgraphFilter( max_hop_distance=3, from_node="start", min_edge_weight=0.3, )) # 5. Use async for parallel requests async def process_large_graph(graph): results = await runner.arun_round(graph) return results ``` --- ## License --- ## Support - GitHub Issues: [github.com/yourusername/rustworkx-agent-framework/issues](https://github.com/yourusername/rustworkx-agent-framework/issues) - Documentation: [github.com/yourusername/rustworkx-agent-framework#readme](https://github.com/yourusername/rustworkx-agent-framework#DOCUMENTATION) ---

Made with ❀️ for the multi-agent systems developer community