Spaces:
Running
RustworkX Agent Framework β Full Documentation
A modern graph-based framework for multi-agent systems
A flexible, high-performance alternative to LangGraph with dynamic topology, decentralized memory, and full access to graph structures
π Table of Contents
- Introduction
- Installation
- Quick Start
- Key Concepts
- Core Components
- RoleGraph
- AgentProfile
- TaskNode
- NodeEncoder
- MACPRunner
- Scheduler
- Memory System
- Streaming API
- Token Budget
- Error Handling
- Graph Algorithms
- Metrics Tracking
- Visualization
- Graph Schemas
- Builder API
- Event System
- Callback System (LangChain-like)
- State Storage
- Async Utilities
- Conditional Routing
- Agent Tools (Tools)
- Advanced Features
- Configuration
- Usage Examples
- API Reference
- FAQ
Introduction
RustworkX Agent Framework (gMAS) is a framework for building multi-agent systems that uses the rustworkx library for high-performance graph operations. It addresses key limitations of existing solutions such as LangGraph:
Why is gMAS better than LangGraph?
| Feature | LangGraph | gMAS Framework |
|---|---|---|
| Topology | Fixed | Dynamic (runtime changes via hooks) |
| Token optimization | Minimal | Automatic (filtering isolated nodes, disabled nodes, early stopping) |
| Memory | Centralized | Decentralized (agentsβ local state) |
| Graph | Hidden from the developer | First-class citizen (full access) |
| Representations | Text only | Text + embeddings + hidden states |
| Typing and validation | Minimal | Full Pydantic validation (type safety) |
| Data schemas | Informal | Pydantic BaseModel (auto-validation, serialization) |
| Multi-model | Limited | Full support for different LLMs per agent |
| Parallelism | Limited | Full async/parallel support |
| ML integration | None | PyTorch Geometric, GNN routing, RL hooks |
| Serialization | Manual | Automatic (Pydantic .model_dump()) |
| Runtime adaptation | None | Topology hooks, early stopping, disabled nodes |
| Callbacks | BaseCallbackHandler | Full compatibility (same methods: on_run_start, on_agent_end, on_tool_start/end/error, etc.) |
Installation
Requirements
- Python 3.12+
- PyTorch 2.0+
- Pydantic 2.0+ (required β the framework is fully built on Pydantic)
Via pip (from sources)
git clone https://github.com/yourusername/rustworkx-agent-framework.git
cd rustworkx-agent-framework
pip install -e .
Dependencies
# Core (required)
pip install rustworkx>=0.13 pydantic>=2.0 pydantic-settings>=2.0 torch>=2.0 loguru>=0.7
# For embeddings (optional)
pip install sentence-transformers>=2.0
# For GNN routing (optional)
pip install torch-geometric>=2.0
# For visualization (optional)
pip install rich>=13.0 graphviz>=0.20
Install all optional dependencies
pip install -e ".[all]"
Important: Pydantic 2.0+
gMAS Framework requires Pydantic 2.0+ and is incompatible with Pydantic 1.x. All models (AgentProfile, TaskNode, schemas, configurations) use the Pydantic v2 API:
.model_dump()instead of.dict().model_validate()instead of.parse_obj().model_dump_json()instead of.json()
If you have Pydantic 1.x installed:
pip install --upgrade "pydantic>=2.0"
Quick Start
Minimal example
from core import AgentProfile, RoleGraph
from execution import MACPRunner
from builder import build_property_graph
# 1. Define agents
agents = [
AgentProfile(
agent_id="solver",
display_name="Math Solver",
description="Solves math problems step by step",
tools=["calculator"],
),
AgentProfile(
agent_id="checker",
display_name="Answer Checker",
description="Checks solutions for correctness",
),
]
# 2. Define connections between agents
workflow_edges = [("solver", "checker")]
# 3. Build the graph
graph = build_property_graph(
agents,
workflow_edges=workflow_edges,
query="What is 25 Γ 17?",
)
# 4. Define an LLM call function
def my_llm_caller(prompt: str) -> str:
# Integrate your LLM here (OpenAI, Anthropic, local, etc.)
return call_your_llm(prompt)
# 5. Run execution
runner = MACPRunner(llm_caller=my_llm_caller)
result = runner.run_round(graph)
# 6. Get results
print(f"Answer: {result.final_answer}")
print(f"Execution order: {result.execution_order}")
print(f"Tokens used: {result.total_tokens}")
Quick Start: with monitoring (Callbacks)
from execution import MACPRunner, RunnerConfig
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
collect_metrics,
)
# 1. Add callback handlers
config = RunnerConfig(
callbacks=[
StdoutCallbackHandler(show_outputs=True), # Console output
MetricsCallbackHandler(), # Metrics collection
]
)
runner = MACPRunner(llm_caller=my_llm_caller, config=config)
result = runner.run_round(graph)
# 2. Or use a context manager
with collect_metrics() as metrics:
result = runner.run_round(graph)
print(f"Total tokens: {metrics.total_tokens}")
print(f"Execution time: {metrics.total_duration_ms}ms")
print(f"Agent calls: {metrics.get_metrics()['agent_calls']}")
Quick Start: multi-model (different LLM for each agent)
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# 1. Create a builder and add agents with different models
builder = GraphBuilder()
# Agent 1: strong model for complex analysis
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=2000,
)
# Agent 2: smaller model for formatting
builder.add_agent(
agent_id="formatter",
display_name="Report Formatter",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=500,
)
# 2. Define edges
builder.add_workflow_edge("analyst", "formatter")
# 3. Set the query and build the graph
builder.add_task(query="Analyze Q4 sales")
graph = builder.build()
# 4. Create an LLM factory (automatically creates callers for each agent)
factory = LLMCallerFactory.create_openai_factory()
# 5. Run execution
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
# 6. Get the result
print(f"Final answer: {result.final_answer}")
print("Savings: use gpt-4 only for analysis, gpt-4o-mini for formatting")
Quick Start: token optimization and dynamic topology
from builder import GraphBuilder
from execution import (
MACPRunner, RunnerConfig, EarlyStopCondition, TopologyAction
)
# 1. Create a graph with explicit boundaries
builder = GraphBuilder()
builder.add_agent("input", persona="Input processor")
builder.add_agent("solver", persona="Problem solver")
builder.add_agent("checker", persona="Solution checker")
builder.add_agent("expert", persona="Expert reviewer (expensive)")
builder.add_agent("output", persona="Output formatter")
builder.add_agent("optional", persona="Optional analyzer")
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "output")
# expert is connected dynamically when needed
# Set boundaries (for filtering unreachable nodes)
builder.set_start_node("input")
builder.set_end_node("output")
builder.add_task(query="Solve the problem")
builder.connect_task_to_agents()
graph = builder.build()
# 2. Disable optional nodes
graph.disable("optional") # Will not run, token savings
# 3. Hook for topology adaptation
def adaptive_hook(ctx, graph):
# If checker found an error β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expert", 1.0), ("expert", "output", 1.0)],
trigger_rebuild=True
)
# If solver is confident β skip checker
if ctx.agent_id == "solver" and "CONFIDENT" in (ctx.response or ""):
return TopologyAction(skip_agents=["checker"])
return None
# 4. Configure runner with optimization
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[adaptive_hook],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(5000),
],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 5. Execute with filtering unreachable nodes
result = runner.run_round(
graph,
filter_unreachable=True # Exclude nodes not on the input->output path
)
# 6. Result
print(f"Executed: {result.execution_order}")
print(f"Pruned: {result.pruned_agents}") # optional + unreachable
print(f"Early stopped: {result.early_stopped}")
print(f"Topology mods: {result.topology_modifications}") # was expert added?
print(f"Tokens: {result.total_tokens}")
Key Concepts
Pydantic-oriented architecture
gMAS Framework is fully built on Pydantic for type safety, validation, and data serialization. All key models inherit from pydantic.BaseModel:
Core Pydantic models in the framework
| Model | Purpose | Notes |
|---|---|---|
AgentProfile |
Agent profile | frozen=True (immutable), arbitrary_types_allowed for torch.Tensor |
AgentLLMConfig |
Agent LLM configuration | Validates model parameters, supports env vars |
TaskNode |
Task node | Stores the query and task context |
GraphSchema |
Schema of the whole graph | Nodes (dict), edges (list), metadata |
AgentNodeSchema |
Agent-node schema | LLM config, tools, metrics, embeddings |
TaskNodeSchema |
Task-node schema | Query, status, deadline |
BaseEdgeSchema |
Base edge schema | Weight, probability, cost metrics |
WorkflowEdgeSchema |
Workflow edge | Conditions, priority, transformations |
CostMetrics |
Cost metrics | Tokens, latency, trust, reliability |
LLMConfig |
Full LLM configuration | Model name, base URL, API key, generation parameters |
VisualizationStyle |
Visualization styles | Settings for colors, shapes, what to show |
NodeStyle |
Node style | Shape, colors, icon |
EdgeStyle |
Edge style | Line style, arrow, colors |
ValidationResult |
Validation result | Errors, warnings |
FeatureConfig |
GNN configuration | Feature dimensions |
TrainingConfig |
Training configuration | Learning rate, epochs, optimizer |
Benefits of Pydantic in gMAS
Automatic type validation
# Pydantic automatically checks types agent = AgentProfile( agent_id="test", # str - OK display_name="Test Agent", # str - OK tools=["search", "calc"], # list[str] - OK ) # Validation error for a wrong type agent = AgentProfile(agent_id=123) # β ValidationError: agent_id must be strDefault values
# Pydantic fills fields with default values agent = AgentProfile(agent_id="test", display_name="Test") print(agent.tools) # [] (empty list by default) print(agent.persona) # "" (empty string by default)Automatic type conversion
# Pydantic validators can automatically convert types schema = AgentNodeSchema( id="test", embedding=torch.tensor([0.1, 0.2, 0.3]) # torch.Tensor β list[float] ) print(type(schema.embedding)) # <class 'list'>Nested models
# Pydantic validates nested models agent = AgentProfile( agent_id="test", display_name="Test", llm_config=AgentLLMConfig( # Nested Pydantic model model_name="gpt-4", temperature=0.7, ) )Serialization and deserialization
# Built-in Pydantic methods data = agent.model_dump() # β dict json_str = agent.model_dump_json(indent=2) # β JSON string # Load from dict/JSON loaded = AgentProfile.model_validate(data) loaded_json = AgentProfile.model_validate_json(json_str)Immutability
# frozen=True for AgentProfile agent = AgentProfile(agent_id="test", display_name="Test") agent.agent_id = "new_id" # β ValidationError: frozen model # Use copy methods for changes updated = agent.model_copy(update={"display_name": "New Name"})Extensibility
# extra="allow" enables arbitrary fields schema = GraphSchema( name="MyGraph", custom_field="custom_value", # Additional field another_field=123, # Another one )
Declarative typing
Thanks to Pydantic, all types are declarative and are checked both statically (mypy, pyright) and dynamically (at runtime):
from core import AgentProfile
from core.schema import AgentNodeSchema, LLMConfig
# Static typing (IDE autocompletion)
agent: AgentProfile = AgentProfile(...)
config: LLMConfig = LLMConfig(...)
schema: AgentNodeSchema = AgentNodeSchema(...)
# Dynamic validation (runtime)
try:
bad_agent = AgentProfile(agent_id=None) # β None instead of str
except ValidationError as e:
print(e.errors()) # Detailed error information
Decentralized data storage
Unlike centralized architectures, gMAS uses a decentralized approach:
- Embeddings are stored inside
AgentProfile.embedding - Hidden states are stored inside
AgentProfile.hidden_state - Local memory is stored inside
AgentProfile.state RoleGraph.embeddingsis an accessor that gathers embeddings from all agents into a single tensor
This allows each agent to own its representations and ensures node independence.
System architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RoleGraph β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Agent ββββ Agent ββββ Agent ββββ Agent β β
β β Profile β β Profile β β Profile β β Profile β β
β β(embeddingβ β(embeddingβ β(embeddingβ β(embeddingβ β
β β state) β β state) β β state) β β state) β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βββββββββββββββ΄ββββββββββββββ΄ββββββββββββββ β
β Adjacency matrix (A_com) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MACPRunner β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Scheduler β β Memory β β Budget β β
β β β β Pool β β Tracker β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββ
β MACPResult β
β β’ messages β
β β’ final_answer β
β β’ metrics β
βββββββββββββββββββ
Data flow
- Create agents β
AgentProfiledescribes the role, capabilities, and tools - Build the graph β
build_property_graphcreates aRoleGraphwith topology - Planning β
Schedulerdetermines the execution order - Execution β
MACPRunnerruns agents sequentially/in parallel - Result β
MACPResultcontains all agentsβ responses and metrics
Core Components
RoleGraph
RoleGraph is the central data structure representing the agent graph.
from core import RoleGraph
# === Graph properties ===
graph.num_nodes # Number of nodes
graph.num_edges # Number of edges
graph.agents # List of AgentProfile objects
graph.node_ids # List of node IDs ["agent1", "agent2", ...]
graph.role_sequence # Role order (legacy)
graph.A_com # Adjacency matrix (torch.Tensor, N x N)
graph.edge_index # Edge index in PyG format (torch.Tensor, 2 x E)
graph.edge_attr # Edge attributes (torch.Tensor, E x feature_dim)
graph.embeddings # Accessor: gathers agent embeddings into a tensor (N x dim)
graph.graph # Internal rustworkx.PyDiGraph object
graph.task_node # TaskNode if enabled, otherwise None
graph.query # Task query (string)
# === Node operations ===
# Add a node
graph.add_node(
agent, # AgentProfile
connections_to=["other"], # List of IDs for outgoing edges
connections_from=["prev"], # List of IDs for incoming edges
weight=1.0, # Default edge weight
)
# Remove a node with a state migration policy
graph.remove_node(
"agent_id",
policy=StateMigrationPolicy.ARCHIVE, # DISCARD, COPY, ARCHIVE
)
# Replace a node
graph.replace_node(
old_node_id="old",
new_agent=new_agent_profile,
policy=StateMigrationPolicy.COPY, # Copy state
keep_connections=True, # Preserve edges
)
# Get an agent
agent = graph.get_agent_by_id("agent_id")
# Get node index in the matrix
idx = graph.get_node_index("agent_id") # -> int
# Existence check
if "agent_id" in graph.node_ids:
...
# === Edge operations ===
# Add an edge
graph.add_edge(
source="agent1",
target="agent2",
weight=0.8,
edge_type="workflow", # Edge type (optional)
metadata={"priority": 1}, # Additional data
)
# Remove an edge
graph.remove_edge("agent1", "agent2")
# Update edge weight
graph.update_edge_weight("agent1", "agent2", new_weight=0.9)
# Get neighbors
out_neighbors = graph.get_neighbors("agent_id", direction="out") # Outgoing
in_neighbors = graph.get_neighbors("agent_id", direction="in") # Incoming
all_neighbors = graph.get_neighbors("agent_id", direction="both") # All
# Check whether an edge exists
has_edge = graph.has_edge("agent1", "agent2")
# Get edge weight
weight = graph.get_edge_weight("agent1", "agent2")
# === Execution bounds (start/end nodes) ===
# Set start and end nodes for optimization
graph.set_start_node("input_agent")
graph.set_end_node("output_agent")
# Or set both at once
graph.set_execution_bounds("input_agent", "output_agent")
# Inspect bounds
print(f"Start: {graph.start_node}, End: {graph.end_node}")
# === Disabled nodes ===
# Disable nodes (they remain in the graph but will not be executed)
graph.disable("agent1") # One node
graph.disable(["agent2", "agent3"]) # Multiple nodes
# Enable back
graph.enable("agent1") # One node
graph.enable(["agent2", "agent3"]) # Multiple nodes
graph.enable() # All disabled nodes
# Check status
graph.is_enabled("agent1") # -> bool
graph.get_enabled() # -> ["agent1", ...]
graph.get_disabled() # -> ["agent2", ...]
# Use case: token savings based on algorithms
if rl_model.predict(graph_state) < threshold:
graph.disable("expensive_agent")
# === Reachability analysis ===
# Get nodes reachable from start_node
reachable = graph.get_reachable_from("input_agent")
# Get nodes that can reach end_node
reaching = graph.get_nodes_reaching("output_agent")
# Get relevant nodes (on the path start -> end)
relevant = graph.get_relevant_nodes()
# Automatically uses graph.start_node and graph.end_node
# Get isolated nodes (not on the path start -> end)
isolated = graph.get_isolated_nodes()
# Optimized execution order (without isolated nodes)
order = graph.get_optimized_execution_order()
# === Conditional edges ===
# Add an edge with a condition
from execution.scheduler import ConditionContext
def condition_func(context: ConditionContext) -> bool:
return context.state.get("quality") > 0.8
graph.add_conditional_edge(
source="writer",
target="editor",
condition=condition_func,
weight=0.9,
)
# === Dynamic topology updates ===
# Full update of the adjacency matrix
graph.update_communication(
a_new, # New adjacency matrix (torch.Tensor)
s_tilde=scores, # Quality score matrix (optional)
p_matrix=probabilities, # Transition probability matrix (optional)
)
# === Conversion and export ===
# Serialize to a dictionary
data = graph.to_dict()
# {
# "agents": [...],
# "adjacency": [[...]],
# "query": "...",
# "task_node": {...},
# }
# Convert to PyTorch Geometric Data
pyg_data = graph.to_pyg_data()
# Data(x=node_features, edge_index=edges, edge_attr=weights)
# Extract a subgraph
subgraph = graph.subgraph(["agent1", "agent2", "agent3"])
# Copy the graph
graph_copy = graph.copy()
# === Integrity checks ===
# Verify consistency of internal structures
graph.verify_integrity(raise_on_error=True)
# Quick check
is_valid = graph.is_consistent()
# === Graph analysis ===
# Check whether it is a DAG (directed acyclic graph)
is_dag = graph.is_dag()
# Get topological order (if DAG)
if graph.is_dag():
topo_order = graph.topological_sort()
# === Agent updates ===
# Update an agent's embedding
agent = graph.get_agent_by_id("solver")
agent = agent.with_embedding(new_embedding)
graph.update_agent("solver", agent)
# Update an agent's state
agent = agent.append_state({"role": "assistant", "content": "Response"})
graph.update_agent("solver", agent)
# === Batch operations ===
# Update multiple agents
updates = {
"agent1": updated_agent1,
"agent2": updated_agent2,
}
graph.batch_update_agents(updates)
# Add multiple edges
edges = [
("a", "b", 0.8),
("b", "c", 0.9),
("c", "d", 0.7),
]
graph.batch_add_edges(edges)
State migration policies
When removing or replacing a node, you can specify a migration policy:
from core.graph import StateMigrationPolicy
# DISCARD β state is removed
graph.remove_node("agent_id", policy=StateMigrationPolicy.DISCARD)
# COPY β state is copied into the new node
graph.replace_node("old_id", new_agent, policy=StateMigrationPolicy.COPY)
# ARCHIVE β state is saved to external storage
graph.remove_node("agent_id", policy=StateMigrationPolicy.ARCHIVE)
AgentProfile
AgentProfile is an immutable Pydantic model (BaseModel with frozen=True) representing an agent profile with description, tools, state, and LLM configuration.
Important:
AgentProfileinherits frompydantic.BaseModel, providing automatic type validation and type safety- Embeddings and hidden states are stored at the agent level, not at the graph level
- Multi-model support β each agent can have its own LLM configuration
- Immutability (
frozen=True) β methods return new objects
AgentProfile structure (Pydantic model)
| Field | Type | Description |
|---|---|---|
agent_id |
str |
Unique agent identifier (required) |
display_name |
str |
Display name (required) |
persona |
str |
Agent role/persona (e.g., "Expert analyst") |
description |
str |
Textual description of agent capabilities |
llm_backbone |
str | None |
LLM model identifier (legacy; use llm_config) |
llm_config |
AgentLLMConfig | None |
Pydantic model for the agentβs LLM configuration |
tools |
list[str] |
List of available tools (shell, code_interpreter, file_search, web_search, custom) |
raw |
Mapping[str, Any] |
Arbitrary extra data |
embedding |
torch.Tensor | None |
Agent vector representation (arbitrary_types_allowed) |
state |
list[dict[str, Any]] |
Local state / message history |
hidden_state |
torch.Tensor | None |
Hidden state passed between agents |
AgentLLMConfig (Pydantic model)
from core.agent import AgentLLMConfig
# AgentLLMConfig - a Pydantic model for LLM configuration
llm_config = AgentLLMConfig(
model_name="gpt-4", # Model name
base_url="https://api.openai.com/v1", # API endpoint
api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR)
max_tokens=2000, # Max tokens
temperature=0.7, # Temperature
timeout=60.0, # Timeout in seconds
top_p=0.9, # Top-p sampling
stop_sequences=["END", "STOP"], # Stop sequences
extra_params={"frequency_penalty": 0.5}, # Extra parameters
)
# AgentLLMConfig methods
api_key = llm_config.resolve_api_key() # Resolve $ENV_VAR
is_set = llm_config.is_configured() # Check whether configured
params = llm_config.to_generation_params() # Build params for the LLM
Creating and working with AgentProfile
from core import AgentProfile
from core.agent import AgentLLMConfig
# 1. Basic creation (Pydantic validates types)
agent = AgentProfile(
agent_id="analyzer", # Unique ID (str, required)
display_name="Data Analyzer", # Display name (str, required)
persona="Expert data analyst", # Role/persona (str, default="")
description="Analyzes data and produces insights", # Description (str, default="")
tools=["python", "sql"], # Available tools (list[str], default=[])
)
# 2. Creation with LLM config (Pydantic model)
llm_config = AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY", # Resolved from environment
temperature=0.7,
max_tokens=2000,
)
agent = AgentProfile(
agent_id="researcher",
display_name="Researcher",
llm_config=llm_config, # Pydantic validates the nested model
tools=["web_search"],
)
# 3. State operations (immutable β returns a NEW object)
agent = agent.append_state({"role": "user", "content": "Hello!"})
agent = agent.with_state([{"role": "system", "content": "You are helpful"}])
agent = agent.clear_state()
# 4. Embeddings (arbitrary_types_allowed for torch.Tensor)
import torch
embedding = torch.randn(384)
agent = agent.with_embedding(embedding)
hidden_state = torch.randn(768)
agent = agent.with_hidden_state(hidden_state)
# 5. LLM config operations
agent = agent.with_llm_config(llm_config)
# Get the agent model name (priority: llm_config.model_name β llm_backbone)
model_name = agent.get_model_name() # "gpt-4"
# Check if a custom LLM configuration is set
if agent.has_custom_llm():
print(f"Agent uses custom LLM: {agent.llm_config.model_name}")
print(f"Base URL: {agent.llm_config.base_url}")
print(f"Generation params: {agent.llm_config.to_generation_params()}")
# 6. Serialization (Pydantic methods)
# For encoder (text)
text = agent.to_text()
# For persistence (dict, includes llm_config)
data = agent.to_dict()
# Pydantic serialization methods
agent_dict = agent.model_dump() # Dict[str, Any]
agent_json = agent.model_dump_json(indent=2) # JSON string
# 7. Deserialization (Pydantic methods)
loaded_agent = AgentProfile.model_validate(agent_dict)
loaded_from_json = AgentProfile.model_validate_json(agent_json)
Example: agents with different LLMs
from core import AgentProfile
from core.agent import AgentLLMConfig
# Agent 1: strong model for analysis
analyst = AgentProfile(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst with 10 years experience",
description="Performs deep analysis of complex data",
llm_config=AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0, # Deterministic for analysis
max_tokens=2000,
),
tools=["python", "sql", "visualization"],
)
# Agent 2: cheaper model for formatting
formatter = AgentProfile(
agent_id="formatter",
display_name="Report Formatter",
persona="Technical writer",
description="Formats analysis results into readable reports",
llm_config=AgentLLMConfig(
model_name="gpt-4o-mini", # Cheaper for simple tasks
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=500,
),
tools=["markdown", "latex"],
)
# Agent 3: local model
local_agent = AgentProfile(
agent_id="local_llm",
display_name="Local Assistant",
llm_config=AgentLLMConfig(
model_name="llama3:70b",
base_url="http://localhost:11434/v1", # Ollama
temperature=0.5,
),
)
Benefits of Pydantic validation
- Automatic type checking when creating objects
- Default values for optional fields
- Immutability (
frozen=True) prevents accidental changes - Nested models (
AgentLLMConfigis validated automatically) - Serialization/deserialization via
.model_dump()and.model_validate() - Support for arbitrary types (
arbitrary_types_allowed) for torch.Tensor
TaskNode
TaskNode is an immutable Pydantic model (BaseModel with frozen=True) representing a virtual task node that stores the task query and can be connected to all agents.
Important:
TaskNodeinherits frompydantic.BaseModel, providing automatic type validation and immutability (just likeAgentProfile).
TaskNode structure (Pydantic model)
| Field | Type | Description |
|---|---|---|
agent_id (id) |
str |
Task node identifier (default __task__) |
type |
str |
Node type ("task", automatically) |
query |
str |
Task statement / query |
description |
str |
Additional context description |
embedding |
torch.Tensor | None |
Task embedding (arbitrary_types_allowed) |
display_name |
str |
Display name (default "Task") |
persona |
str |
Task persona/role (default empty) |
llm_backbone |
str | None |
Model identifier, if needed |
tools |
list[str] |
Tools available to the task node (default=[]) |
state |
list[dict[str, Any]] |
Local task state / message history (default=[]) |
from core import TaskNode
# Pydantic validates types on creation
task = TaskNode(
agent_id="__task__", # can be overridden (str)
query="Draft a market research plan", # required (str)
description="A task for the whole team of agents", # optional (str, default="")
)
# Task embedding (optional, arbitrary_types_allowed for torch.Tensor)
import torch
task_embedding = torch.randn(384)
task = task.with_embedding(task_embedding)
# TaskNode is immutable (frozen=True), use copy methods
updated_task = task.model_copy(update={"description": "New description"})
# Pydantic serialization
task_dict = task.model_dump()
task_json = task.model_dump_json(indent=2)
# Deserialization
loaded = TaskNode.model_validate(task_dict)
When using
build_property_graph(..., include_task_node=True), the task node is created automatically and connected to agents via context/update edges.
TaskNode methods (immutable)
# Embedding operations (returns a new object)
task = task.with_embedding(embedding_tensor)
# State operations (returns a new object)
task = task.append_state({"role": "system", "content": "Context"})
task = task.with_state([{"role": "user", "content": "Query"}])
task = task.clear_state()
# Convert to text
task_text = task.to_text() # For encoder
# Convert to dict
task_data = task.to_dict() # For persistence
NodeEncoder
NodeEncoder converts textual agent descriptions into vector representations.
from core import NodeEncoder
# sentence-transformers (recommended)
encoder = NodeEncoder(
model_name="sentence-transformers/all-MiniLM-L6-v2",
normalize_embeddings=True,
)
# hash fallback (fast, no model required)
encoder = NodeEncoder(model_name="hash:256")
# Encode texts
texts = [agent.to_text() for agent in agents]
embeddings = encoder.encode(texts) # torch.Tensor (N x dim)
# Get dimensionality
dim = encoder.embedding_dim
MACPRunner
MACPRunner is the executor of the Multi-Agent Communication Protocol.
from execution import MACPRunner, RunnerConfig
# β
Recommended for modern chat LLMs (OpenAI, GigaChat, etc.)
# Sends proper system/user roles β no flat-string workaround needed.
from openai import OpenAI
client = OpenAI(api_key="sk-...")
def my_structured_caller(messages: list[dict]) -> str:
resp = client.chat.completions.create(model="gpt-4o", messages=messages)
return resp.choices[0].message.content or ""
runner = MACPRunner(structured_llm_caller=my_structured_caller)
# Legacy setup β one flat-string LLM for all agents (still supported)
runner = MACPRunner(
llm_caller=sync_llm_function, # Callable[[str], str]
async_llm_caller=async_llm_function, # Callable[[str], Awaitable[str]]
token_counter=my_token_counter, # Token counting
)
# Multi-model setup (different LLMs for different agents)
from execution import LLMCallerFactory, create_openai_caller
# Option 1: Use a factory (recommended)
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini",
default_base_url="https://api.openai.com/v1",
)
runner = MACPRunner(llm_factory=factory)
# Option 2: A dictionary of callers per agent
runner = MACPRunner(
llm_callers={
"analyst": create_openai_caller(model="gpt-4", temperature=0.0),
"writer": create_openai_caller(model="gpt-4o-mini", temperature=0.7),
},
async_llm_callers={
"analyst": create_openai_caller(model="gpt-4", is_async=True),
"writer": create_openai_caller(model="gpt-4o-mini", is_async=True),
},
)
# Option 3: Combined (factory + overrides for specific agents)
runner = MACPRunner(
llm_factory=factory, # Default for everyone
llm_callers={"critical_agent": specialized_caller}, # Override for critical_agent
)
# Advanced configuration
config = RunnerConfig(
timeout=60.0, # Per-agent timeout
adaptive=True, # Adaptive mode
enable_parallel=True, # Parallel execution
max_parallel_size=5, # Max parallel agents
max_retries=2, # Retries on errors
update_states=True, # Update agent states
enable_memory=True, # Enable memory
callbacks=[StdoutCallbackHandler()], # Callbacks for logging
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Synchronous execution
result = runner.run_round(graph)
# With explicit execution bounds and filtering
result = runner.run_round(
graph,
start_agent_id="input", # Start agent (overrides graph.start_node)
final_agent_id="output", # Final agent (overrides graph.end_node)
filter_unreachable=True, # Exclude isolated nodes (token savings)
update_states=True, # Update agent states
)
# Asynchronous execution
result = await runner.arun_round(
graph,
start_agent_id="input",
final_agent_id="output",
filter_unreachable=True,
)
# Execution with hidden channels
result = runner.run_round_with_hidden(graph, hidden_encoder=encoder)
RunnerConfig (full specification)
from execution import RunnerConfig, RoutingPolicy, PruningConfig, BudgetConfig, ErrorPolicy, ErrorAction
config = RunnerConfig(
# === Basic parameters ===
timeout=60.0, # Per-agent timeout (sec)
max_retries=3, # Max attempts on errors
update_states=True, # Update AgentProfile.state
# === Adaptive mode ===
# adaptive controls conditional edges, pruning, fallback, and routing
# policies. It does NOT affect whether agents run in parallel.
adaptive=True, # Enable conditional routing & pruning
routing_policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy
# === Parallel execution ===
# enable_parallel works independently of adaptive: when True,
# independent agents (those with all predecessors done) are executed
# concurrently via asyncio.gather. Works with both astream() and
# arun_round(), regardless of the adaptive flag.
enable_parallel=True, # Parallel group execution
max_parallel_size=5, # Max agents in a parallel group
# === Pruning ===
pruning_config=PruningConfig(
min_weight_threshold=0.1, # Min edge weight
min_probability_threshold=0.05, # Min transition probability
max_consecutive_errors=3, # Max consecutive errors
token_budget=10000, # Token budget for pruning
enable_fallback=True, # Use fallback agents
max_fallback_attempts=2, # Max fallback attempts
quality_scorer=None, # Quality scoring function
min_quality_threshold=0.3, # Min quality to continue
),
# === Budget ===
budget_config=BudgetConfig(
total_token_limit=50000,
node_token_limit=2000,
max_prompt_length=4000,
max_response_length=2000,
warn_at_usage_ratio=0.8,
total_time_limit_seconds=600,
total_request_limit=100,
),
# === Memory ===
enable_memory=True, # Enable memory system
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
working_default_ttl=3600.0,
auto_compress=True,
promote_after_accesses=3,
),
memory_context_limit=5, # Memory entries injected into the prompt
# === Hidden channels ===
enable_hidden_channels=True, # Passing hidden_state
hidden_combine_strategy="mean", # mean, sum, concat, attention
pass_embeddings=True, # Pass embeddings
# === Task query broadcast ===
broadcast_task_to_all=True, # True: task query is sent to all agents
# False: only to agents connected to the task node
# === Dynamic topology (runtime modification) ===
enable_dynamic_topology=True, # Enable runtime graph modifications
topology_hooks=[my_hook_func], # Sync hooks for topology modification
async_topology_hooks=[async_hook], # Async hooks for topology modification
early_stop_conditions=[ # Early stopping conditions
EarlyStopCondition.on_keyword("FINAL ANSWER"),
EarlyStopCondition.on_token_limit(10000),
EarlyStopCondition.on_custom(lambda ctx: my_logic(ctx)),
],
# === Callbacks (monitoring and logging) ===
callbacks=[ # Callback handlers
StdoutCallbackHandler( # Console output
show_prompts=False,
show_outputs=True,
),
MetricsCallbackHandler(), # Metrics aggregation
FileCallbackHandler("run.jsonl"), # File logging
],
# === Error handling ===
error_policy=ErrorPolicy(
on_timeout=ErrorAction.RETRY,
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
on_validation_error=ErrorAction.ABORT,
),
# === Streaming ===
enable_token_streaming=False, # Enable token-level streaming if LLM supports it
)
Execution result (MACPResult)
result.messages # Dict[agent_id -> response]
result.final_answer # Final agent answer
result.final_agent_id # Final agent ID
result.execution_order # Execution order
result.agent_states # Updated agent states
result.total_tokens # Total tokens
result.total_time # Execution time (sec)
result.topology_changed_count # Number of topology changes
result.fallback_count # Number of fallbacks
result.pruned_agents # Pruned agents (including disabled and isolated)
result.errors # List of errors
result.hidden_states # Agents' hidden states
result.metrics # ExecutionMetrics with detailed statistics
# New fields (dynamic topology)
result.early_stopped # bool: whether early stopping occurred
result.early_stop_reason # str: early stop reason
result.topology_modifications # int: number of topology modifications
Scheduler
The scheduler determines the agent execution order.
from execution import (
build_execution_order,
get_parallel_groups,
AdaptiveScheduler,
RoutingPolicy,
PruningConfig,
)
# Simple topological order
order = build_execution_order(graph.A_com, agent_ids)
# Parallel execution groups
groups = get_parallel_groups(graph.A_com, agent_ids)
# Result: [["a", "b"], ["c"], ["d", "e"]]
# Adaptive scheduler
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.WEIGHTED_TOPO, # Routing policy
pruning_config=PruningConfig(
min_weight_threshold=0.1, # Min edge weight
min_probability_threshold=0.05, # Min probability
max_consecutive_errors=3, # Max consecutive errors
token_budget=10000, # Token budget
enable_fallback=True, # Enable fallback
max_fallback_attempts=2, # Max fallback attempts
),
beam_width=3, # Beam search width
)
# Build a plan
plan = scheduler.build_plan(
a_agents, # Agent adjacency matrix
agent_ids, # List of IDs
p_matrix=probs, # Probability matrix
end_agent="final", # Final agent
)
# Working with the plan
step = plan.get_current_step()
plan.mark_completed("agent_id", tokens=100)
plan.mark_failed("agent_id")
plan.mark_skipped("agent_id")
Routing policies (detailed)
from execution import RoutingPolicy, AdaptiveScheduler
# ========== 1. TOPOLOGICAL (Topological sort) ==========
# Description: Classic topological sort for a DAG
# Use case: Simple pipelines without adaptivity
# Complexity: O(V + E)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.TOPOLOGICAL)
plan = scheduler.build_plan(adjacency, agent_ids)
# Example:
# A β B β C β D
# Order: [A, B, C, D]
# ========== 2. WEIGHTED_TOPO (Weighted topological) ==========
# Description: Topological sort with priority based on edge weights
# Use case: When you need to account for connection importance
# Complexity: O(V + E log V)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.WEIGHTED_TOPO)
plan = scheduler.build_plan(adjacency, agent_ids)
# Example:
# ββ(0.9)β B ββ
# A βββ€ ββ D
# ββ(0.3)β C ββ
# Order: [A, B, C, D] (B runs before C because 0.9 > 0.3)
# ========== 3. GREEDY (Greedy selection) ==========
# Description: At each step, selects the agent with the maximum edge weight
# Use case: Optimize for connection quality
# Complexity: O(VΒ²)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.GREEDY)
plan = scheduler.build_plan(
adjacency,
agent_ids,
start_node="coordinator",
end_node="final",
)
# Example:
# Start β A(0.9) β B(0.8) β End
# Start β C(0.5) β D(0.7) β End
# Selected: Start β A β B β End (higher total weight)
# ========== 4. BEAM_SEARCH (Beam search) ==========
# Description: Keeps beam_width best paths and selects the optimal one
# Use case: Balance between quality and speed
# Complexity: O(V * beam_width * E)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.BEAM_SEARCH,
beam_width=3, # Keep 3 best paths
)
plan = scheduler.build_plan(
adjacency,
agent_ids,
p_matrix=probability_matrix, # Transition probabilities
)
# Example with beam_width=2:
# Start ββ¬β A(0.8) ββ¬β B(0.9) β End [path 1: 0.72]
# β ββ C(0.6) β End [path 2: 0.48]
# ββ D(0.7) ββ E(0.8) β End [path 3: 0.56]
# Beam keeps paths 1 and 3, drops path 2
# Final choice: path 1
# ========== 5. K_SHORTEST (K shortest paths) ==========
# Description: Finds K shortest paths and selects the best by a criterion
# Use case: When alternative routes are required
# Complexity: O(K * (V + E) log V)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.K_SHORTEST,
k_paths=5, # Find 5 shortest paths
)
plan = scheduler.build_plan(
adjacency,
agent_ids,
start_node="input",
end_node="output",
path_metric=PathMetric.WEIGHTED, # HOP_COUNT, WEIGHTED, RELIABILITY
)
# Example:
# Found paths:
# 1. input β A β B β output (cost=3, hops=3)
# 2. input β C β output (cost=4, hops=2)
# 3. input β A β D β output (cost=5, hops=3)
# 4. input β E β F β output (cost=6, hops=3)
# 5. input β G β output (cost=7, hops=2)
# Selection by metric: path 1 (minimum cost)
# ========== 6. GNN_BASED (GNN-based) ==========
# Description: Uses a trained GNN to predict the optimal route
# Use case: Adaptive routing based on history
# Requires: A trained GNN model
from core.gnn import GNNRouterInference
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=gnn_inference, # GNNRouterInference object
gnn_threshold=0.7, # Min confidence to use the GNN
)
# If confidence < threshold, fallback policy is used
scheduler.set_fallback_policy(RoutingPolicy.WEIGHTED_TOPO)
plan = scheduler.build_plan(
adjacency,
agent_ids,
metrics_tracker=tracker, # For GNN features
)
# ========== Policy comparison ==========
# | Policy | Adaptivity | Complexity | Quality | Use case |
# |----------------|----------------|----------------|----------|--------------------------------|
# | TOPOLOGICAL | No | O(V+E) | β | Simple pipelines |
# | WEIGHTED_TOPO | Low | O(V+EΒ·logV) | ββ | Priority-based pipelines |
# | GREEDY | Medium | O(VΒ²) | βββ | Weight-optimized routing |
# | BEAM_SEARCH | High | O(VΒ·kΒ·E) | ββββ | Quality/speed balance |
# | K_SHORTEST | High | O(KΒ·VΒ·logV) | ββββ | Alternative route search |
# | GNN_BASED | Very high | O(GNN) | βββββ | Trained systems |
# ========== Choosing a policy based on the task ==========
# Simple linear pipeline
config = RunnerConfig(routing_policy=RoutingPolicy.TOPOLOGICAL)
# Graph with different agent priorities
config = RunnerConfig(routing_policy=RoutingPolicy.WEIGHTED_TOPO)
# Optimize route quality
config = RunnerConfig(routing_policy=RoutingPolicy.GREEDY)
# Balance exploration vs exploitation
config = RunnerConfig(
routing_policy=RoutingPolicy.BEAM_SEARCH,
adaptive=True,
)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=3)
# Need fallback alternatives
config = RunnerConfig(routing_policy=RoutingPolicy.K_SHORTEST)
scheduler = AdaptiveScheduler(policy=RoutingPolicy.K_SHORTEST, k_paths=3)
# Advanced trained system
config = RunnerConfig(routing_policy=RoutingPolicy.GNN_BASED)
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=trained_router,
)
Memory System
A stratified memory system with working and long-term levels, supporting TTL, tags, priorities, and automatic compression.
Memory architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AgentMemory β
β ββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β Working Memory β β Long-term Memory β β
β β (TTL: 1 hour) β β (TTL: β) β β
β β Max: 20 entries β β Max: 100 entries β β
β β β β β β
β β - Recent messages ββββββΆβ - Important facts β β
β β - Temp context β β - Key insights β β
β β - Active tasks β β - Historical data β β
β ββββββββββββββββββββββ ββββββββββββββββββββββββ β
β β² β² β
β β promotion β β
β β (after N accesses) β β
β ββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
β sharing
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SharedMemoryPool β
β Memory sharing between agents β
β - Broadcast: one β all β
β - Share: one β selected β
β - Query: search by tags β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Basic usage of AgentMemory
from utils.memory import (
AgentMemory,
MemoryConfig,
MemoryLevel,
MemoryEntry,
)
# 1. Memory configuration
config = MemoryConfig(
# Working memory (short-term)
working_max_entries=20, # Max entries
working_default_ttl=3600.0, # TTL: 1 hour
# Long-term memory
long_term_max_entries=100, # Max entries
long_term_default_ttl=None, # No expiration
# Automatic management
auto_compress=True, # Auto-compress on limit overflow
compress_strategy="truncate", # truncate, summarize
promote_after_accesses=3, # Promote to long-term after N accesses
# Prioritization
use_priority=True, # Consider priorities when evicting
priority_weight=0.3, # Priority weight vs recency
)
# 2. Create an agent memory
memory = AgentMemory("researcher", config)
# 3. Add entries
# 3.1. Add messages (the simplest way)
memory.add_message(role="user", content="Analyze the dataset")
memory.add_message(role="assistant", content="I will analyze it")
# 3.2. Add with parameters
memory.add(
content={"type": "insight", "text": "Pattern detected in data"},
level=MemoryLevel.WORKING, # WORKING or LONG_TERM
priority=5, # 0-10 (higher = more important)
tags={"insight", "data"}, # Tags for search
ttl=7200.0, # Custom TTL (2 hours)
metadata={"source": "analysis", "confidence": 0.95},
)
# 3.3. Add directly into long-term
memory.add(
content="Critical finding: correlation coefficient = 0.87",
level=MemoryLevel.LONG_TERM,
priority=10,
tags={"critical", "finding"},
)
# 4. Retrieve entries
# 4.1. Get recent messages
messages = memory.get_messages(limit=5)
for msg in messages:
print(f"{msg['role']}: {msg['content']}")
# 4.2. Get from working memory
working_entries = memory.get(level=MemoryLevel.WORKING, limit=10)
for entry in working_entries:
print(f"[{entry.priority}] {entry.content}")
# 4.3. Get from long-term memory
longterm_entries = memory.get(level=MemoryLevel.LONG_TERM)
# 4.4. Search by tags
insights = memory.search_by_tags({"insight"}, level=MemoryLevel.WORKING)
critical = memory.search_by_tags({"critical"}, level=MemoryLevel.LONG_TERM)
# 4.5. Get all entries
all_entries = memory.get_all()
# 5. Memory management
# 5.1. Remove an entry
memory.remove(entry_key)
# 5.2. Clear a level
memory.clear(level=MemoryLevel.WORKING)
# 5.3. Force compression
memory.compress(level=MemoryLevel.WORKING)
# 5.4. Promote an entry to long-term
memory.promote(entry_key)
# 5.5. Update an entry
memory.update(entry_key, new_content={"updated": "data"})
# 6. Stats
stats = memory.get_stats()
print(f"Working: {stats['working_count']}/{stats['working_max']}")
print(f"Long-term: {stats['longterm_count']}/{stats['longterm_max']}")
print(f"Total accesses: {stats['total_accesses']}")
print(f"Promotions: {stats['promotion_count']}")
SharedMemoryPool β memory sharing between agents
from utils.memory import SharedMemoryPool
# 1. Create a pool
pool = SharedMemoryPool(max_shared_entries=1000)
# 2. Register agents
memory_a = AgentMemory("agent_a", config)
memory_b = AgentMemory("agent_b", config)
memory_c = AgentMemory("agent_c", config)
pool.register(memory_a)
pool.register(memory_b)
pool.register(memory_c)
# 3. Broadcast β send to everyone
pool.broadcast(
from_agent="agent_a",
entry={
"content": "Important discovery: X correlates with Y",
"priority": 8,
"tags": {"discovery", "shared"},
},
)
# All agents will receive this entry in working memory
# 4. Share β send to specific agents
pool.share(
from_agent="agent_a",
entry={"content": "Secret info", "priority": 9},
to_agents=["agent_b", "agent_c"],
)
# Only agent_b and agent_c receive the entry
# 5. Query β request information from the pool
results = pool.query(
tags={"discovery"},
min_priority=5,
limit=10,
)
for result in results:
print(f"From {result['source_agent']}: {result['content']}")
# 6. Subscribe to updates (callback)
def on_shared_entry(entry, from_agent, to_agents):
print(f"{from_agent} shared: {entry['content']}")
pool.subscribe("agent_b", on_shared_entry)
# 7. Remove from the pool
pool.unregister("agent_c")
# 8. Clear the pool
pool.clear()
Memory compression
from utils.memory import (
TruncateCompressor,
SummaryCompressor,
)
# 1. Truncate β simple removal of old entries
compressor = TruncateCompressor(keep_ratio=0.5) # Keep 50%
memory = AgentMemory("agent", config)
memory.set_compressor(compressor)
# When over the limit, 50% of old entries are removed automatically
# 2. Summary β summarization using an LLM
def summarize_llm(entries: list[MemoryEntry]) -> str:
texts = [e.content for e in entries]
combined = "\n".join(texts)
return my_llm(f"Summarize these entries: {combined}")
compressor = SummaryCompressor(
summarizer=summarize_llm,
chunk_size=10, # Summarize in chunks of 10 entries
)
memory.set_compressor(compressor)
# On compression, 10 entries are replaced with 1 summarized entry
# 3. Custom compressor
from utils.memory import MemoryCompressor
class SmartCompressor(MemoryCompressor):
def compress(self, entries: list[MemoryEntry], target_count: int) -> list[MemoryEntry]:
# Remove low-priority and old entries
sorted_entries = sorted(
entries,
key=lambda e: (e.priority, e.timestamp),
reverse=True,
)
return sorted_entries[:target_count]
memory.set_compressor(SmartCompressor())
Integrating memory with the Runner
from execution import MACPRunner, RunnerConfig
# 1. Configuration with memory enabled
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
auto_compress=True,
promote_after_accesses=3,
),
memory_context_limit=5, # How many entries to inject into the prompt
enable_shared_memory=True, # Enable SharedMemoryPool
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 2. Run β memory is updated automatically
result1 = runner.run_round(graph)
# 3. Access an agentβs memory
memory = runner.get_agent_memory("researcher")
entries = memory.get_messages(limit=10)
print(f"Researcher memory: {entries}")
# 4. Manually add to memory
runner.add_to_memory(
"researcher",
content="External knowledge: XYZ",
level=MemoryLevel.LONG_TERM,
priority=8,
)
# 5. Second round β agents retain context
graph.query = "Continue analysis from previous round"
result2 = runner.run_round(graph)
# 6. Export memories
memory_export = runner.export_memories()
# {
# "agent_a": {"working": [...], "long_term": [...]},
# "agent_b": {"working": [...], "long_term": [...]},
# }
# 7. Import memories (restore state)
runner.import_memories(memory_export)
# 8. Clear memory for all agents
runner.clear_all_memories()
Advanced usage: Semantic memory search
from utils.memory import SemanticMemoryIndex
from core import NodeEncoder
# 1. Create a semantic index
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
semantic_index = SemanticMemoryIndex(encoder)
# 2. Add entries to the index
memory = AgentMemory("agent", config)
for entry in memory.get_all():
semantic_index.add(entry.key, entry.content, entry.tags)
# 3. Semantic search
query = "findings about correlation"
results = semantic_index.search(
query,
top_k=5,
min_similarity=0.7,
filter_tags={"finding"},
)
for result in results:
print(f"[{result['similarity']:.3f}] {result['content']}")
# 4. Integration with AgentMemory
memory.enable_semantic_search(encoder)
# Now you can search semantically
results = memory.semantic_search(
query="data patterns",
top_k=3,
level=MemoryLevel.LONG_TERM,
)
Practical example: Multi-round conversation with memory
# Create a graph with memory
agents = [
AgentProfile(agent_id="analyzer", display_name="Data Analyzer"),
AgentProfile(agent_id="reporter", display_name="Report Writer"),
]
graph = build_property_graph(
agents,
workflow_edges=[("analyzer", "reporter")],
query="Analyze dataset.csv",
)
# Memory-enabled configuration
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=15,
long_term_max_entries=50,
auto_compress=True,
promote_after_accesses=2,
),
memory_context_limit=5,
enable_shared_memory=True,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Round 1: Initial analysis
graph.query = "Analyze the dataset and find key patterns"
result1 = runner.run_round(graph)
print(f"Round 1 answer: {result1.final_answer}")
# Analyzer saved findings to memory
analyzer_memory = runner.get_agent_memory("analyzer")
print(f"Analyzer memory entries: {len(analyzer_memory.get_all())}")
# Round 2: Deeper analysis (agents remember the previous round)
graph.query = "Based on previous findings, analyze correlations"
result2 = runner.run_round(graph)
print(f"Round 2 answer: {result2.final_answer}")
# Round 3: Report generation
graph.query = "Generate final report summarizing all findings"
result3 = runner.run_round(graph)
print(f"Round 3 answer: {result3.final_answer}")
# Reporter used accumulated memory for a complete report
reporter_memory = runner.get_agent_memory("reporter")
# Export full history
history = {
"round_1": result1.to_dict(),
"round_2": result2.to_dict(),
"round_3": result3.to_dict(),
"memories": runner.export_memories(),
}
import json
with open("conversation_history.json", "w") as f:
json.dump(history, f, indent=2)
Streaming API
LangGraph-like streaming for real-time output.
from execution import (
MACPRunner,
StreamEventType,
StreamBuffer,
format_event,
print_stream,
)
runner = MACPRunner(llm_caller=my_llm)
# Synchronous streaming
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
print(f"{event.agent_id}: {event.content}")
elif event.event_type == StreamEventType.TOKEN:
print(event.token, end="", flush=True)
# Asynchronous streaming
async for event in runner.astream(graph):
print(format_event(event))
# Using a buffer
buffer = StreamBuffer()
for event in runner.stream(graph):
buffer.add(event)
# ... handle the event
print(f"Final answer: {buffer.final_answer}")
print(f"Agent outputs: {buffer.agent_outputs}")
# Convenience printing
answer = print_stream(runner.stream(graph), show_tokens=True)
Event types (full specification)
from execution.streaming import StreamEventType, StreamEvent
# === Execution lifecycle ===
StreamEventType.RUN_START
# Fields: run_id, query, num_agents, config
StreamEventType.RUN_END
# Fields: run_id, success, total_time, total_tokens, execution_order, final_answer
# === Agent events ===
StreamEventType.AGENT_START
# Fields: agent_id, step_index, predecessors, prompt_preview
StreamEventType.AGENT_OUTPUT
# Fields: agent_id, step_index, content, tokens_used, latency_ms
StreamEventType.AGENT_ERROR
# Fields: agent_id, step_index, error_type, error_message, will_retry
# === Token streaming ===
StreamEventType.TOKEN
# Fields: agent_id, token (str), token_index
# === Adaptive execution ===
StreamEventType.TOPOLOGY_CHANGED
# Fields: reason, old_plan, new_plan, remaining_steps
StreamEventType.PRUNE
# Fields: agent_id, reason (low_weight/low_probability/budget/quality)
StreamEventType.FALLBACK
# Fields: original_agent, fallback_agent, reason, attempt
# === Parallel execution ===
StreamEventType.PARALLEL_START
# Fields: group_agents (list), group_index
StreamEventType.PARALLEL_END
# Fields: group_agents, completed_count, failed_count, duration_ms
# === Budget ===
StreamEventType.BUDGET_WARNING
# Fields: budget_type (tokens/requests/time), current, limit, ratio
StreamEventType.BUDGET_EXCEEDED
# Fields: budget_type, current, limit, action_taken
# === Memory ===
StreamEventType.MEMORY_WRITE
# Fields: agent_id, memory_level (working/long_term), entry_key
StreamEventType.MEMORY_READ
# Fields: agent_id, memory_level, entry_key, found
StreamEventType.MEMORY_PROMOTED
# Fields: agent_id, entry_key, from_level, to_level
# === Metrics ===
StreamEventType.METRICS_UPDATE
# Fields: agent_id, metrics (dict with reliability, latency, quality, cost)
# Example: handling all event types
for event in runner.stream(graph):
match event.event_type:
case StreamEventType.RUN_START:
print(f"Starting run {event.run_id} with {event.num_agents} agents")
case StreamEventType.AGENT_START:
print(f"Agent {event.agent_id} starting (step {event.step_index})")
case StreamEventType.AGENT_OUTPUT:
print(f"Agent {event.agent_id}: {event.content[:100]}...")
print(f" Tokens: {event.tokens_used}, Latency: {event.latency_ms}ms")
case StreamEventType.TOKEN:
print(event.token, end="", flush=True)
case StreamEventType.TOPOLOGY_CHANGED:
print(f"β³ Topology changed: {event.reason}")
print(f" New plan: {event.new_plan}")
case StreamEventType.PRUNE:
print(f"β Pruned {event.agent_id}: {event.reason}")
case StreamEventType.FALLBACK:
print(f"β€· Fallback: {event.original_agent} β {event.fallback_agent}")
case StreamEventType.PARALLEL_START:
print(f"β«Έ Starting parallel group: {event.group_agents}")
case StreamEventType.PARALLEL_END:
print(f"β«· Parallel group done: {event.completed_count}/{len(event.group_agents)}")
case StreamEventType.BUDGET_WARNING:
print(f"β Budget warning: {event.budget_type} at {event.ratio:.1%}")
case StreamEventType.BUDGET_EXCEEDED:
print(f"β Budget exceeded: {event.budget_type}")
case StreamEventType.RUN_END:
print(f"β Execution completed in {event.total_time:.2f}s")
print(f" Total tokens: {event.total_tokens}")
print(f" Final answer: {event.final_answer[:100]}...")
Advanced Features
Execution optimization and token savings
The framework provides several mechanisms to optimize execution and reduce token usage:
1. Filtering isolated nodes
Automatically exclude nodes that are not on the path from start to end:
# Set execution bounds
graph.set_execution_bounds("input", "output")
# Filter isolated nodes during execution
result = runner.run_round(
graph,
filter_unreachable=True # Exclude nodes not on the input->output path
)
# Nodes unrelated to the input->output path will not be executed
print(f"Agents excluded: {len(result.pruned_agents or [])}")
Example:
builder = GraphBuilder()
builder.add_agent("a1")
builder.add_agent("a2")
builder.add_agent("a3")
builder.add_agent("isolated") # Not connected to a1->a3
builder.add_workflow_edge("a1", "a2")
builder.add_workflow_edge("a2", "a3")
builder.set_execution_bounds("a1", "a3")
graph = builder.build()
# Reachability analysis
relevant = graph.get_relevant_nodes() # {"a1", "a2", "a3"}
isolated = graph.get_isolated_nodes() # {"isolated"}
result = runner.run_round(graph, filter_unreachable=True)
# "isolated" will not run β token savings
2. Node deactivation (Disabled Nodes)
Temporarily deactivate nodes without removing them from the graph:
# Deactivate based on metrics/RL
if quality_score < threshold:
graph.disable("expensive_agent")
# Or multiple nodes
graph.disable(["agent1", "agent2"])
# Check
if graph.is_enabled("agent1"):
...
# Re-enable
graph.enable("agent1")
graph.enable() # All
result = runner.run_round(graph)
# Deactivated nodes appear in result.pruned_agents
Use case: RL control
# An RL agent decides which nodes to deactivate
for agent_id in graph.node_ids:
rl_score = rl_model.predict(graph_state, agent_id)
if rl_score < 0.3:
graph.disable(agent_id)
result = runner.run_round(graph)
3. Early stopping
Stop execution when a condition is met:
from execution import EarlyStopCondition, RunnerConfig
# By keyword
stop1 = EarlyStopCondition.on_keyword("FINAL ANSWER")
# By token limit
stop2 = EarlyStopCondition.on_token_limit(5000)
# By number of agents
stop3 = EarlyStopCondition.on_agent_count(3)
# By metadata (for RL/metrics)
stop4 = EarlyStopCondition.on_metadata(
"quality", 0.95,
comparator=lambda v, t: v > t
)
# Custom logic
stop5 = EarlyStopCondition.on_custom(
lambda ctx: my_evaluator.is_done(ctx.messages),
reason="Evaluator decided task is done",
min_agents_executed=2 # At least 2 agents before checking
)
# Combination (OR)
stop_any = EarlyStopCondition.combine_any([
EarlyStopCondition.on_keyword("DONE"),
EarlyStopCondition.on_token_limit(10000),
])
config = RunnerConfig(
early_stop_conditions=[stop1, stop2, stop5]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
if result.early_stopped:
print(f"Reason: {result.early_stop_reason}")
saved = len(graph.node_ids) - len(result.execution_order)
print(f"Agents saved: {saved}")
4. Runtime topology (Topology Hooks)
Modify the graph during execution based on intermediate results:
from execution import TopologyAction, StepContext
def adaptive_topology(ctx: StepContext, graph) -> TopologyAction:
"""Hook is called after each agent."""
# ctx.agent_id β current agent
# ctx.response β its response
# ctx.messages β all responses
# ctx.execution_order β execution order
# ctx.remaining_agents β remaining agents
# ctx.total_tokens β tokens used
# Add an edge if review is needed
if "uncertain" in (ctx.response or "").lower():
return TopologyAction(
add_edges=[(ctx.agent_id, "reviewer", 1.0)],
trigger_rebuild=True
)
# Remove an edge
if confident:
return TopologyAction(
remove_edges=[("agent1", "checker")]
)
# Skip agents
if ctx.total_tokens > 8000:
return TopologyAction(
skip_agents=["expensive_agent"]
)
# Early stop
if "DONE" in (ctx.response or ""):
return TopologyAction(
early_stop=True,
early_stop_reason="Task completed"
)
return None
config = RunnerConfig(
enable_dynamic_topology=True,
topology_hooks=[adaptive_topology]
)
5. Combined optimization
Use all mechanisms together for maximum optimization:
from execution import (
GraphBuilder, MACPRunner, RunnerConfig,
EarlyStopCondition, TopologyAction, StepContext
)
# Build a graph
builder = GraphBuilder()
builder.add_agent("input")
builder.add_agent("solver")
builder.add_agent("checker")
builder.add_agent("expert") # Expensive agent
builder.add_agent("formatter")
builder.add_agent("optional") # Optional
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "formatter")
# Set execution bounds
builder.set_execution_bounds("input", "formatter")
graph = builder.build()
# Disable optional nodes
graph.disable("optional")
# Adaptation hooks
def smart_topology(ctx: StepContext, graph) -> TopologyAction:
# If solver is confident β skip checker
if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95:
return TopologyAction(skip_agents=["checker"])
# If checker found an issue β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expert", 1.0), ("expert", "formatter", 1.0)],
trigger_rebuild=True
)
return None
# Configure runner with optimization
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[smart_topology],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(10000),
],
pruning_config=PruningConfig(token_budget=15000),
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(
graph,
filter_unreachable=True # Exclude isolated nodes
)
# Optimization analysis
print(f"Agents executed: {len(result.execution_order)}")
print(f"Pruned: {len(result.pruned_agents or [])}")
print(f"Early stopped: {result.early_stopped}")
print(f"Modifications: {result.topology_modifications}")
print(f"Tokens: {result.total_tokens}")
Multi-Model Support (Multi-Model Support)
Each agent in the graph can use its own LLM model with individual settings. This makes it possible to:
- Optimize costs β use expensive models only for complex tasks
- Balance performance β fast models for simple operations
- Specialize agents β models trained for specific domains
- Hybrid solutions β combine cloud and local models
Multi-model architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TASK NODE β
β "Analyze the market" β
ββββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββ΄βββββββββ
βΌ βΌ
βββββββββββββββββ βββββββββββββββββ
β ANALYST β β COORDINATOR β
β ββββΆβ β
β GPT-4 β β GPT-4o-mini β
β temp: 0.0 β β temp: 0.3 β
β tokens: 4000 β β tokens: 1000 β
βββββββββββββββββ βββββββββββββββββ
Key components
1. LLMConfig β an agentβs LLM configuration
from core.schema import LLMConfig
llm_config = LLMConfig(
model_name="gpt-4", # Model name
base_url="https://api.openai.com/v1", # API endpoint
api_key="$OPENAI_API_KEY", # Key (or $ENV_VAR)
max_tokens=2000, # Max tokens in the response
temperature=0.7, # Generation temperature
timeout=60.0, # Request timeout
top_p=0.9, # Nucleus sampling
stop_sequences=["END"], # Stop sequences
)
# Validate configuration
if llm_config.is_configured():
params = llm_config.to_generation_params()
print(f"Generation params: {params}")
# Merge configurations (fallback)
default_config = LLMConfig(model_name="gpt-4o-mini", temperature=0.5)
final_config = llm_config.merge_with(default_config)
2. AgentLLMConfig β an immutable configuration for AgentProfile
from core.agent import AgentLLMConfig
agent_llm_config = AgentLLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="sk-...",
temperature=0.7,
max_tokens=2000,
)
# Convert to LLMConfig
llm_config = agent_llm_config.to_llm_config()
3. LLMCallerFactory β a factory for creating LLM callers
from execution import LLMCallerFactory
# Create a factory for OpenAI-compatible APIs
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini",
default_base_url="https://api.openai.com/v1",
default_api_key="sk-...",
default_temperature=0.7,
default_max_tokens=2000,
)
# The factory automatically creates callers based on AgentLLMConfig
# when used with MACPRunner
4. Caller factory helpers
Three ready-made functions cover the most common setups:
| Function | Interface | Use with |
|---|---|---|
create_openai_caller() |
(str) -> str |
Legacy llm_caller |
create_openai_structured_caller() |
(list[dict]) -> str |
structured_llm_caller β
recommended |
create_openai_async_structured_caller() |
async (list[dict]) -> str |
async_structured_llm_caller β
parallel |
from execution import (
create_openai_caller,
create_openai_structured_caller,
create_openai_async_structured_caller,
)
# ββ Legacy flat-string caller ββββββββββββββββββββββββββββββββββββββββββββββββ
caller = create_openai_caller(
model="gpt-4",
base_url="https://api.openai.com/v1",
api_key="sk-...",
temperature=0.7,
max_tokens=2000,
)
response = caller("What is 2+2?") # (str) -> str
# ββ Structured sync caller (recommended for chat LLMs) ββββββββββββββββββββββ
sync_caller = create_openai_structured_caller(
api_key="sk-...",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
)
# Use as: MACPRunner(structured_llm_caller=sync_caller)
# ββ Structured async caller (required for parallel astream) βββββββββββββββββ
async_caller = create_openai_async_structured_caller(
api_key="sk-...",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
)
# Use as: MACPRunner(async_structured_llm_caller=async_caller)
# ββ Full parallel setup ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
from execution import MACPRunner, RunnerConfig
runner = MACPRunner(
structured_llm_caller=sync_caller,
async_structured_llm_caller=async_caller,
config=RunnerConfig(enable_parallel=True),
)
# Sequential graphs β stream() uses sync_caller
for event in runner.stream(graph):
...
# Parallel graphs β astream() uses async_caller for concurrent groups
import asyncio
async def run():
async for event in runner.astream(graph):
...
asyncio.run(run())
Ways to configure multi-model support
Method 1: Via GraphBuilder (recommended)
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
builder = GraphBuilder()
# Agent 1: strong model for analysis
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst with deep domain knowledge",
llm_backbone="gpt-4", # Or model_name
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0, # Strict analysis
max_tokens=4000,
timeout=120.0,
)
# Agent 2: weaker model for formatting
builder.add_agent(
agent_id="formatter",
display_name="Report Formatter",
persona="Formats data into readable reports",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=1000,
timeout=30.0,
)
# Agent 3: local model for confidential data
builder.add_agent(
agent_id="privacy_checker",
display_name="Privacy Checker",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1", # Ollama
api_key="not-needed",
temperature=0.1,
max_tokens=500,
)
builder.add_workflow_edge("analyst", "formatter")
builder.add_workflow_edge("analyst", "privacy_checker")
graph = builder.build()
# The factory will automatically create callers for each agent
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
print(f"Final answer: {result.final_answer}")
Method 2: Explicit LLMConfig
from core.schema import LLMConfig
# Predefined configurations
gpt4_config = LLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.7,
max_tokens=2000,
)
gpt4_mini_config = LLMConfig(
model_name="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
max_tokens=1000,
)
builder = GraphBuilder()
builder.add_agent(
"researcher",
display_name="Researcher",
llm_config=gpt4_config, # Pass a ready configuration
)
builder.add_agent(
"writer",
display_name="Writer",
llm_config=gpt4_mini_config,
)
graph = builder.build()
Method 3: llm_callers dictionary
from execution import create_openai_caller
# Create callers manually
callers = {
"analyst": create_openai_caller(
model="gpt-4",
temperature=0.0,
max_tokens=4000,
),
"formatter": create_openai_caller(
model="gpt-4o-mini",
temperature=0.3,
max_tokens=1000,
),
"privacy_checker": create_openai_caller(
model="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
),
}
# Pass directly into the runner
runner = MACPRunner(llm_callers=callers)
result = runner.run_round(graph)
Method 4: Combined approach
# Use the factory as default, but override for some agents
factory = LLMCallerFactory.create_openai_factory(
default_model="gpt-4o-mini", # Default
)
# Create a custom caller for a specific agent
specialized_caller = create_openai_caller(
model="gpt-4",
temperature=0.0,
max_tokens=4000,
)
runner = MACPRunner(
llm_factory=factory, # For all agents
llm_callers={"analyst": specialized_caller}, # Override for analyst
)
LLM caller resolution priority
1. llm_callers[agent_id] β Explicitly provided caller
β
2. llm_factory.get_caller() β Factory creates based on agent.llm_config
β
3. llm_caller β Default caller for all agents
β
4. Exception β Error: no caller specified
Usage examples
Example 1: Cost optimization
# Cheap model for routine operations, expensive one for complex tasks
builder = GraphBuilder()
# 5 simple analysts (cheap model)
for i in range(5):
builder.add_agent(
f"analyst_{i}",
display_name=f"Junior Analyst {i}",
llm_backbone="gpt-4o-mini",
temperature=0.3,
max_tokens=500,
)
builder.add_workflow_edge(f"analyst_{i}", "senior")
# 1 senior analyst (expensive model)
builder.add_agent(
"senior",
display_name="Senior Analyst",
llm_backbone="gpt-4",
temperature=0.7,
max_tokens=4000,
)
graph = builder.build()
# Savings: ~80% of tokens use the cheap model
Example 2: Hybrid solution (cloud + local model)
builder = GraphBuilder()
# Public data β cloud model
builder.add_agent(
"public_analyzer",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
# Confidential data β local model
builder.add_agent(
"private_analyzer",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
)
# Aggregator β cheap cloud model
builder.add_agent(
"aggregator",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
builder.add_workflow_edge("public_analyzer", "aggregator")
builder.add_workflow_edge("private_analyzer", "aggregator")
graph = builder.build()
Example 3: Specialized models
builder = GraphBuilder()
# Medical expert β a model trained on medical data
builder.add_agent(
"medical_expert",
llm_backbone="medical-llm-v2",
base_url="https://medical-api.example.com/v1",
api_key="$MEDICAL_API_KEY",
temperature=0.0, # Strict medical recommendations
)
# Legal expert β a model trained on legal texts
builder.add_agent(
"legal_expert",
llm_backbone="legal-llm-v3",
base_url="https://legal-api.example.com/v1",
api_key="$LEGAL_API_KEY",
temperature=0.0,
)
# Coordinator β general model
builder.add_agent(
"coordinator",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
)
builder.add_workflow_edge("medical_expert", "coordinator")
builder.add_workflow_edge("legal_expert", "coordinator")
graph = builder.build()
Example 4: Different temperatures for different styles
builder = GraphBuilder()
# Creative writer (high temperature)
builder.add_agent(
"creative_writer",
llm_backbone="gpt-4",
temperature=0.9, # Creativity
max_tokens=2000,
)
# Strict editor (low temperature)
builder.add_agent(
"strict_editor",
llm_backbone="gpt-4",
temperature=0.1, # Precision
max_tokens=1500,
)
# Final formatter (medium temperature)
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini",
temperature=0.5, # Balance
max_tokens=1000,
)
builder.add_workflow_edge("creative_writer", "strict_editor")
builder.add_workflow_edge("strict_editor", "formatter")
graph = builder.build()
Supported providers
The framework supports any OpenAI-compatible API:
| Provider | Base URL | Notes |
|---|---|---|
| OpenAI | https://api.openai.com/v1 |
GPT-4, GPT-4o-mini, GPT-3.5-turbo |
| Anthropic | via wrapper | Claude (requires an adapter) |
| Ollama | http://localhost:11434/v1 |
Local models (llama3, mistral, etc.) |
| vLLM | custom | Self-hosted models |
| LiteLLM | custom | Unified API for all providers |
| Azure OpenAI | https://<resource>.openai.azure.com/ |
Azure-hosted models |
| GigaChat | custom | Sber models |
| Cloudflare Tunnels | custom | Via Cloudflare tunnels |
# Examples for different providers
# OpenAI
builder.add_agent("agent1", llm_backbone="gpt-4",
base_url="https://api.openai.com/v1")
# Ollama (local)
builder.add_agent("agent2", llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1")
# Azure OpenAI
builder.add_agent("agent3", llm_backbone="gpt-4",
base_url="https://myresource.openai.azure.com/")
# GigaChat
builder.add_agent("agent4", llm_backbone="GigaChat-Lightning",
base_url="https://gigachat-api.trycloudflare.com/v1")
# vLLM
builder.add_agent("agent5", llm_backbone="./models/Qwen3-80B",
base_url="https://my-vllm-server.com/v1")
Async and streaming support
from execution import create_openai_caller
# Async caller per agent
async_callers = {
"agent1": create_openai_caller(model="gpt-4", is_async=True),
"agent2": create_openai_caller(model="gpt-4o-mini", is_async=True),
}
runner = MACPRunner(async_llm_callers=async_callers)
result = await runner.arun_round(graph)
# Streaming callers
streaming_callers = {
"agent1": create_openai_caller(model="gpt-4", is_streaming=True),
"agent2": create_openai_caller(model="gpt-4o-mini", is_streaming=True),
}
runner = MACPRunner(streaming_llm_callers=streaming_callers)
for event in runner.stream(graph):
if event.event_type == StreamEventType.TOKEN:
print(f"[{event.agent_id}] {event.token}", end="")
API key handling
# 1. Direct
builder.add_agent("agent", api_key="sk-...")
# 2. From an environment variable (recommended)
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# When parsing, it is automatically resolved as os.getenv("OPENAI_API_KEY")
# 3. From a file
import os
os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
Monitoring multi-model execution
from core.metrics import MetricsTracker
tracker = MetricsTracker()
runner = MACPRunner(
llm_factory=factory,
metrics_tracker=tracker,
)
result = runner.run_round(graph)
# Per-model analysis
for agent_id in graph.node_ids:
agent = graph.get_agent_by_id(agent_id)
model = agent.llm_config.model_name if agent.llm_config else "default"
metrics = tracker.get_node_metrics(agent_id)
print(f"\n{agent_id} ({model}):")
print(f" Latency: {metrics.avg_latency_ms:.0f}ms")
print(f" Tokens: {metrics.total_cost_tokens}")
print(f" Reliability: {metrics.reliability:.2%}")
Backward compatibility
Old code continues to work without changes:
# Old approach (one LLM for all agents)
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
# β
Works as before
# New approach (multi-model)
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
# β
Uses per-agent models
Structured Prompt β modern chat LLMs (recommended)
TL;DR β if you use OpenAI, GigaChat, Anthropic, or any other chat-completion API, pass
structured_llm_callerinstead of the legacyllm_caller. The runner will send propersystem/userroles to the LLM instead of one flat string. This produces shorter, more focused responses and saves tokens β especially in long agent chains.
The problem with the legacy llm_caller
The classic llm_caller: Callable[[str], str] interface passes the entire
prompt as a single flat string, combining persona, description, task and
messages from other agents:
"You are a mathematician.\n\nSolve step by step.\n\nTask: ...\n\nMessages from other agents:\n..."
Modern chat LLMs (OpenAI GPT-4, GigaChat, Claude, Geminiβ¦) expect messages
to be split into roles (system, user, assistant). When everything
arrives in one blob the model has to re-parse it, which leads to:
- π΄ Verbose, padded responses β the model does not know how strictly to follow the system instruction
- π΄ Token accumulation β long chains accumulate more and more context
- π΄ Lower instruction-following quality β especially for role-specific behaviour
The fix: structured_llm_caller
MACPRunner now supports a second caller interface that receives a
list[dict[str, str]] β exactly what the OpenAI chat completions API expects:
The full message list produced by _build_prompt is:
[
# 1. system β persona, description, tools hint, output_schema instruction
{"role": "system", "content": "You are a mathematician. Solve step by step.\n\nAvailable tools: calculator.\n\nRespond with JSON matching: {\"type\":\"object\",...}"},
# 2..N-1. agent.state β previous conversation turns replayed with correct roles
{"role": "assistant", "content": "Previous answer turn 1β¦"},
{"role": "user", "content": "Follow-up question turn 2β¦"},
# β¦ (as many entries as agent.state contains)
# N. user β current task, input_schema hint, memory context, incoming agent messages
{"role": "user", "content": "Task: 3xΒ² - 7x + 2 = 0\n\nInput format: {...}\n\nMessages from other agents: ..."},
]
The runner builds this automatically inside _build_prompt β StructuredPrompt
and dispatches via _call_llm. No parsing, no heuristics, no hacks.
How it works internally
_build_prompt()
β
βββΊ StructuredPrompt
βββ .text β flat string (used by legacy llm_caller)
βββ .messages β list[dict] (used by structured_llm_caller)
MACPRunner._call_llm(caller, prompt)
βββ if structured_llm_caller is set β calls structured_llm_caller(prompt.messages)
βββ else β calls caller(prompt.text) # backward compat
Both representations are always built β switching between interfaces requires zero changes to graph/agent code.
What goes where in
messages:
Source field Role Note persona+descriptionsystemAlways first message tool names ( has_tools())systemAppended to system content output_schemasystem"Respond with JSON matching: β¦"agent.stateentriesassistant/userReplayed in order between system and final user query + input_schema+ memory + incoming msgsuserAlways last message
Built-in factory helpers (recommended, zero boilerplate)
The framework ships ready-made factory functions so you don't need to write any boilerplate caller code yourself:
from execution import (
MACPRunner,
RunnerConfig,
create_openai_structured_caller, # sync β for stream() / run_round()
create_openai_async_structured_caller, # async β for astream() / arun_round()
)
# ββ Sequential graphs (chains, single agent) ββββββββββββββββββββββββββββββββ
runner = MACPRunner(
structured_llm_caller=create_openai_structured_caller(
api_key="sk-...",
base_url="https://api.openai.com/v1",
model="gpt-4o",
temperature=0.7,
max_tokens=1024,
),
)
for event in runner.stream(graph):
...
# ββ Parallel graphs (fan-in, fan-out) ββββββββββββββββββββββββββββββββββββββ
runner = MACPRunner(
structured_llm_caller=create_openai_structured_caller(
api_key="sk-...", model="gpt-4o"
),
async_structured_llm_caller=create_openai_async_structured_caller(
api_key="sk-...", model="gpt-4o"
),
config=RunnerConfig(enable_parallel=True),
)
async for event in runner.astream(graph):
...
Why two callers for parallel mode?
stream()is synchronous and usesstructured_llm_caller.astream()withenable_parallel=Trueruns independent agents concurrently viaasyncio.gatherand therefore requiresasync_structured_llm_caller. For purely sequential graphs only the sync caller is needed.
Quick start (manual caller)
If you need custom logic (retries, logging, token tracking), write the caller yourself β the interface is a simple function:
from openai import OpenAI
from execution import MACPRunner, RunnerConfig
client = OpenAI(api_key="sk-...")
def my_structured_caller(messages: list[dict[str, str]]) -> str:
"""Drop-in replacement for any str->str llm_caller."""
resp = client.chat.completions.create(
model="gpt-4o",
messages=messages, # passed through as-is
max_tokens=1024,
temperature=0.7,
)
return resp.choices[0].message.content or ""
runner = MACPRunner(
structured_llm_caller=my_structured_caller,
config=RunnerConfig(timeout=60.0),
)
result = runner.run_round(graph)
print(result.final_answer)
Async variant (manual caller)
import asyncio
from openai import AsyncOpenAI
aclient = AsyncOpenAI(api_key="sk-...")
async def my_async_structured_caller(messages: list[dict[str, str]]) -> str:
resp = await aclient.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=1024,
)
return resp.choices[0].message.content or ""
runner = MACPRunner(async_structured_llm_caller=my_async_structured_caller)
result = await runner.arun_round(graph)
Tracking tokens (benchmark pattern)
When you need to count tokens across many agents (e.g. for benchmarks), wrap
the OpenAI client to intercept usage:
from openai import OpenAI
class TrackedLLM:
def __init__(self, api_key, base_url, model):
self._client = OpenAI(api_key=api_key, base_url=base_url)
self._model = model
self.total_tokens = 0
self.call_count = 0
def reset(self):
self.total_tokens = 0
self.call_count = 0
def chat(self, system: str, user: str, max_tokens: int = 1024) -> str:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": user})
resp = self._client.chat.completions.create(
model=self._model, messages=messages,
temperature=0.7, max_tokens=max_tokens,
)
self.total_tokens += resp.usage.total_tokens if resp.usage else 0
self.call_count += 1
return resp.choices[0].message.content or ""
def as_structured_caller(self, max_tokens: int = 1024):
"""Return a structured_llm_caller for MACPRunner."""
def _caller(messages: list[dict[str, str]]) -> str:
system = next((m["content"] for m in messages if m["role"] == "system"), "")
user = next((m["content"] for m in messages if m["role"] == "user"), "")
return self.chat(system, user, max_tokens=max_tokens)
return _caller
llm = TrackedLLM(api_key="...", base_url="...", model="gpt-4o")
runner = MACPRunner(
structured_llm_caller=llm.as_structured_caller(max_tokens=1024),
)
result = runner.run_round(graph)
print(f"Tokens used: {llm.total_tokens}, calls: {llm.call_count}")
Caller priority
All caller types can coexist. The resolution priority is:
structured_llm_caller β Used for ALL plain agent calls when set
β
β (automatic strβstr wrapper also registered as llm_caller
β for internal checks β no code change needed)
βΌ
llm_callers[agent_id] β Per-agent override (always takes precedence)
βΌ
llm_factory β Factory by AgentLLMConfig
βΌ
llm_caller β Legacy default
You can mix structured_llm_caller (global default) with per-agent
llm_callers overrides β the structured caller will be used for all agents
that don't have an explicit override.
Providers comparison
| Provider | Recommended interface | Notes |
|---|---|---|
| OpenAI (GPT-4o, GPT-4, β¦) | structured_llm_caller β
|
Native chat completions |
| GigaChat / Sber | structured_llm_caller β
|
OpenAI-compatible API |
| Anthropic Claude | structured_llm_caller β
|
Via adapter or LiteLLM |
| Ollama (local) | structured_llm_caller β
|
OpenAI-compatible /v1/chat/completions |
| vLLM | structured_llm_caller β
|
OpenAI-compatible server |
| Azure OpenAI | structured_llm_caller β
|
Same API, different base URL |
| Custom / non-chat API | llm_caller (legacy) |
Falls back to flat string |
Benchmark results (gMAS vs LangGraph)
The table below was measured with examples/benchmark_vs_langgraph.py --runs 10
using structured_llm_caller. LangGraph uses an equivalent explicit
system / user split on its side.
| Test topology | LangGraph time | gMAS time | Token Ξ |
|---|---|---|---|
| Single agent (1) | baseline | ~+10% | ~+10% |
| Chain of 3 (3) | baseline | β18 % | β11 % |
| Fan-in 2β1 (3) | baseline | β30 % | β22 % |
| Chain of 7 (7) | baseline | β10 % | β17 % |
| Fan-out 1β3β1 (5) | baseline | β19 % | β13 % |
Single-agent test is slightly slower in gMAS due to protocol overhead; this overhead amortises quickly as the number of agents grows.
Migration from llm_caller to structured_llm_caller
No changes to graph or agent code are required. Only the runner instantiation changes:
# Before (legacy)
runner = MACPRunner(llm_caller=lambda prompt: my_model(prompt))
# After (recommended)
runner = MACPRunner(
structured_llm_caller=lambda messages: my_model_chat(messages)
)
Both interfaces are fully supported. The legacy llm_caller is not
deprecated and will not be removed.
Dynamic Topology
Static graph modification
Modify the graph structure before execution:
# Add a new agent
new_agent = AgentProfile(agent_id="expert", display_name="Expert")
graph.add_node(new_agent, connections_to=["checker"])
# Change connections
graph.add_edge("solver", "expert", weight=0.9)
graph.remove_edge("solver", "checker")
# Disable nodes (without deletion)
graph.disable("expensive_agent") # Will not run, but remains in the graph
# Full topology update from a matrix
import torch
new_adjacency = torch.tensor([
[0, 1, 0],
[0, 0, 1],
[0, 0, 0],
], dtype=torch.float32)
graph.update_communication(
new_adjacency,
s_tilde=score_matrix, # Connection quality scores
p_matrix=probability_matrix # Transition probabilities
)
Runtime modification (during execution)
A powerful feature for modifying the graph during a round based on intermediate results:
Early stopping (Early Stopping)
from execution import EarlyStopCondition, RunnerConfig
# 1. By keyword in the response
stop_on_answer = EarlyStopCondition.on_keyword(
"FINAL ANSWER",
reason="Answer found"
)
# 2. By token limit
stop_on_tokens = EarlyStopCondition.on_token_limit(
max_tokens=5000,
reason="Token budget exceeded"
)
# 3. By number of executed agents
stop_on_count = EarlyStopCondition.on_agent_count(
max_agents=5,
reason="Sufficient agents executed"
)
# 4. By a metadata value (for RL, metrics)
stop_on_quality = EarlyStopCondition.on_metadata(
"quality_score",
0.95,
comparator=lambda v, threshold: v > threshold,
reason="Quality threshold reached"
)
# 5. Custom condition
stop_custom = EarlyStopCondition.on_custom(
condition=lambda ctx: my_rl_agent.should_stop(ctx.messages),
reason="RL agent decided to stop",
min_agents_executed=2 # At least 2 agents before checking
)
# 6. Combine conditions (OR)
stop_any = EarlyStopCondition.combine_any([
EarlyStopCondition.on_keyword("DONE"),
EarlyStopCondition.on_token_limit(10000),
stop_on_quality,
])
# 7. Combine conditions (AND)
stop_all = EarlyStopCondition.combine_all([
EarlyStopCondition.on_keyword("answer"),
stop_on_quality,
])
# Usage
config = RunnerConfig(
early_stop_conditions=[stop_on_answer, stop_on_tokens]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
if result.early_stopped:
print(f"Stopped: {result.early_stop_reason}")
print(f"Saved: {len(graph.node_ids) - len(result.execution_order)} agents")
Topology Hooks (on-the-fly graph modification)
from execution import TopologyAction, StepContext, RunnerConfig
def my_topology_hook(ctx: StepContext, graph) -> TopologyAction:
"""Called after each execution step.
StepContext contains:
- agent_id: current agent
- response: its response
- messages: all responses so far
- execution_order: execution order
- remaining_agents: remaining agents
- total_tokens: tokens used
- metadata: arbitrary data
"""
# 1. Early stopping based on custom logic
if "TASK_COMPLETE" in (ctx.response or ""):
return TopologyAction(
early_stop=True,
early_stop_reason="Task marked as complete"
)
# 2. Add an edge if quality is low
if ctx.metadata.get("quality", 1.0) < 0.5:
return TopologyAction(
add_edges=[
(ctx.agent_id, "reviewer_agent", 1.0),
],
trigger_rebuild=True # Re-plan remaining steps
)
# 3. Remove an edge
if some_condition:
return TopologyAction(
remove_edges=[
("agent1", "agent2"),
]
)
# 4. Skip upcoming agents
if ctx.total_tokens > 8000:
return TopologyAction(
skip_agents=["expensive_agent1", "expensive_agent2"]
)
# 5. Force execution of agents
if needs_expert_review:
return TopologyAction(
force_agents=["expert_reviewer"]
)
# 6. Change the final agent
if early_finish:
return TopologyAction(
new_end_agent="quick_finalizer"
)
return None # No changes
# Async hook for integration with RL, APIs, etc.
async def rl_topology_hook(ctx: StepContext, graph) -> TopologyAction:
"""Async hook for more complex logic."""
# You can call async APIs, RL models, etc.
decision = await my_rl_agent.get_topology_decision(
messages=ctx.messages,
graph_state=graph.to_dict()
)
if decision.add_connection:
return TopologyAction(
add_edges=[(decision.from_node, decision.to_node, decision.weight)]
)
return None
# Usage
config = RunnerConfig(
enable_dynamic_topology=True,
topology_hooks=[my_topology_hook],
async_topology_hooks=[rl_topology_hook],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
print(f"Topology modifications: {result.topology_modifications}")
Example: RL-controlled topology
import torch
from your_rl_agent import RLAgent
class TopologyRL:
def __init__(self):
self.rl_agent = RLAgent()
def should_stop(self, ctx: StepContext) -> bool:
"""RL-agent decision for early stopping."""
state = self.encode_state(ctx)
action = self.rl_agent.predict(state)
return action == "STOP"
def get_topology_action(self, ctx: StepContext) -> TopologyAction | None:
"""RL agent decides how to change topology."""
state = self.encode_state(ctx)
action = self.rl_agent.predict(state)
if action == "ADD_REVIEWER":
return TopologyAction(
add_edges=[(ctx.agent_id, "reviewer", 1.0)],
trigger_rebuild=True
)
elif action == "SKIP_EXPENSIVE":
return TopologyAction(
skip_agents=["expensive_model"]
)
return None
def encode_state(self, ctx: StepContext) -> torch.Tensor:
# Encode state for RL
return torch.tensor([
len(ctx.messages),
ctx.total_tokens,
len(ctx.remaining_agents),
])
# Usage
rl_controller = TopologyRL()
config = RunnerConfig(
enable_dynamic_topology=True,
early_stop_conditions=[
EarlyStopCondition.on_custom(
rl_controller.should_stop,
reason="RL decided to stop"
)
],
topology_hooks=[rl_controller.get_topology_action],
)
Full example: adaptive system
from execution import (
GraphBuilder, MACPRunner, RunnerConfig,
EarlyStopCondition, TopologyAction, StepContext
)
# Build the graph
builder = GraphBuilder()
builder.add_agent("input", persona="Input processor")
builder.add_agent("solver", persona="Problem solver")
builder.add_agent("checker", persona="Solution checker")
builder.add_agent("expensive_expert", persona="Expert (expensive)")
builder.add_agent("output", persona="Output formatter")
builder.add_workflow_edge("input", "solver")
builder.add_workflow_edge("solver", "checker")
builder.add_workflow_edge("checker", "output")
# expensive_expert is connected dynamically
builder.set_start_node("input")
builder.set_end_node("output")
builder.add_task(query="Solve the complex problem")
builder.connect_task_to_agents()
graph = builder.build()
# Hooks for adaptation
def adaptive_hook(ctx: StepContext, graph) -> TopologyAction:
# If checker found an issue β add expert
if ctx.agent_id == "checker" and "ERROR" in (ctx.response or ""):
return TopologyAction(
add_edges=[("checker", "expensive_expert", 1.0),
("expensive_expert", "output", 1.0)],
trigger_rebuild=True
)
# If solver produced a good answer β skip checker
if ctx.agent_id == "solver" and ctx.metadata.get("confidence", 0) > 0.95:
return TopologyAction(
skip_agents=["checker"],
reason="High confidence, skipping validation"
)
return None
# Configure runner
config = RunnerConfig(
adaptive=True,
enable_dynamic_topology=True,
topology_hooks=[adaptive_hook],
early_stop_conditions=[
EarlyStopCondition.on_keyword("FINAL_ANSWER"),
EarlyStopCondition.on_token_limit(10000),
],
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(
graph,
filter_unreachable=True # Exclude isolated nodes
)
# Result
print(f"Executed: {result.execution_order}")
print(f"Early stopped: {result.early_stopped}")
print(f"Topology mods: {result.topology_modifications}")
print(f"Tokens saved: calculated from pruned_agents")
GNN Routing (Graph Neural Networks for Routing)
Using graph neural networks for learnable optimal routing based on execution history.
Overview of GNN models
| Model | Description | When to use |
|---|---|---|
| GCN (Graph Convolutional Network) | Classic convolution for graphs | Homogeneous graphs, simple tasks |
| GAT (Graph Attention Network) | Uses an attention mechanism | Edge importance varies |
| GraphSAGE | Neighbor sampling for large graphs | Large graphs, inductive learning |
| GIN (Graph Isomorphism Network) | Maximally expressive architecture | Complex patterns, small graphs |
Full example: training a GNN router
from core.gnn import (
create_gnn_router,
GNNTrainer,
GNNRouterInference,
GNNModelType,
TrainingConfig,
FeatureConfig,
RoutingStrategy,
DefaultFeatureGenerator,
)
from core.metrics import MetricsTracker
import torch
from torch_geometric.data import Data
# ========== STEP 1: Collect execution data ==========
tracker = MetricsTracker()
# Run multiple rounds to accumulate metrics
for i in range(100):
result = runner.run_round(graph)
# Record per-node metrics
for agent_id in result.execution_order:
response = result.messages[agent_id]
tracker.record_node_execution(
node_id=agent_id,
success=True,
latency_ms=response["latency"],
cost_tokens=response["tokens"],
quality=evaluate_quality(response["content"]),
)
# Record edge traversal metrics
for i, agent_id in enumerate(result.execution_order[:-1]):
next_agent = result.execution_order[i + 1]
tracker.record_edge_traversal(
source=agent_id,
target=next_agent,
weight=graph.get_edge_weight(agent_id, next_agent),
success=True,
latency_ms=50,
)
# ========== STEP 2: Feature generation ==========
feature_config = FeatureConfig(
include_degree=True, # Node degrees
include_centrality=True, # Centrality (betweenness, closeness)
include_embeddings=True, # Agent embeddings
include_metrics=True, # Performance metrics
include_structural=True, # Structural features (clustering coef)
normalize=True, # Feature normalization
)
feature_gen = DefaultFeatureGenerator(config=feature_config)
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
tracker,
) # Shape: (num_nodes, feature_dim)
edge_features = feature_gen.generate_edge_features(
graph,
tracker,
) # Shape: (num_edges, edge_feature_dim)
print(f"Node features shape: {node_features.shape}")
print(f"Edge features shape: {edge_features.shape}")
# ========== STEP 3: Prepare the dataset ==========
# Create PyTorch Geometric Data objects
train_data_list = []
val_data_list = []
for sample in dataset: # Your dataset with execution history
data = Data(
x=sample['node_features'], # Node features
edge_index=sample['edge_index'], # Edge connections (2, E)
edge_attr=sample['edge_features'], # Edge features
y=sample['labels'], # Labels (optimal next node, quality score, etc.)
)
if sample['is_train']:
train_data_list.append(data)
else:
val_data_list.append(data)
# ========== STEP 4: Training configuration ==========
training_config = TrainingConfig(
# Hyperparameters
learning_rate=1e-3,
hidden_dim=64,
num_layers=3,
dropout=0.2,
# Training
epochs=100,
batch_size=32,
patience=10, # Early stopping
# Task
task="node_classification", # or "link_prediction", "graph_regression"
num_classes=2, # For classification
# Optimization
optimizer="adam", # adam, sgd, adamw
weight_decay=1e-5,
scheduler="reduce_on_plateau", # step, cosine, reduce_on_plateau
# Device
device="cuda" if torch.cuda.is_available() else "cpu",
# Logging
log_interval=10,
save_best=True,
)
# ========== STEP 5: Create the model ==========
# 5.1. GCN (Graph Convolutional Network)
model_gcn = create_gnn_router(
model_type=GNNModelType.GCN,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
)
# 5.2. GAT (Graph Attention Network)
model_gat = create_gnn_router(
model_type=GNNModelType.GAT,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
heads=4, # Number of attention heads
concat=True, # Concatenate heads or average
)
# 5.3. GraphSAGE
model_sage = create_gnn_router(
model_type=GNNModelType.GraphSAGE,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
aggr="mean", # mean, max, lstm
)
# 5.4. GIN (Graph Isomorphism Network)
model_gin = create_gnn_router(
model_type=GNNModelType.GIN,
in_channels=node_features.shape[1],
out_channels=training_config.num_classes,
config=training_config,
train_eps=True, # Trainable epsilon
)
# ========== STEP 6: Train ==========
trainer = GNNTrainer(model_gat, training_config)
training_result = trainer.train(
train_data_list,
val_data_list,
verbose=True,
)
print(f"Best validation accuracy: {training_result['best_val_acc']:.3f}")
print(f"Best epoch: {training_result['best_epoch']}")
print(f"Training time: {training_result['training_time']:.2f}s")
# Save the model
trainer.save("gnn_router.pt")
# Load the model
trainer.load("gnn_router.pt")
# ========== STEP 7: Inference ==========
router = GNNRouterInference(
model=model_gat,
feature_generator=feature_gen,
)
# 7.1. Predict the next node (node selection)
prediction = router.predict(
graph,
source="coordinator",
candidates=["researcher", "analyst", "writer"],
metrics_tracker=tracker,
strategy=RoutingStrategy.ARGMAX, # ARGMAX, TOP_K, SAMPLING, THRESHOLD
)
print(f"Recommended nodes: {prediction.recommended_nodes}")
print(f"Scores: {prediction.scores}")
print(f"Confidence: {prediction.confidence:.3f}")
# 7.2. Top-K prediction
prediction_topk = router.predict(
graph,
source="coordinator",
candidates=["a", "b", "c", "d"],
strategy=RoutingStrategy.TOP_K,
k=2, # Return top 2
)
print(f"Top 2: {prediction_topk.recommended_nodes}")
# 7.3. Probabilistic sampling
prediction_sample = router.predict(
graph,
source="coordinator",
candidates=candidates,
strategy=RoutingStrategy.SAMPLING,
temperature=0.8, # Sampling temperature
)
# 7.4. Threshold filtering
prediction_threshold = router.predict(
graph,
source="coordinator",
candidates=candidates,
strategy=RoutingStrategy.THRESHOLD,
threshold=0.7, # Only nodes with prob > 0.7
)
# ========== STEP 8: Integrate with AdaptiveScheduler ==========
from execution import AdaptiveScheduler, RoutingPolicy
scheduler = AdaptiveScheduler(
policy=RoutingPolicy.GNN_BASED,
gnn_router=router,
gnn_threshold=0.6, # Min confidence to use the GNN
fallback_policy=RoutingPolicy.WEIGHTED_TOPO # Fallback on low confidence
)
plan = scheduler.build_plan(
graph.A_com,
graph.node_ids,
metrics_tracker=tracker,
)
# ========== STEP 9: Monitoring and fine-tuning ==========
# Collect new data after deployment
new_data = []
for i in range(20):
result = runner.run_round(graph)
# ... record data ...
new_data.append(create_data_sample(result))
# Fine-tune
trainer.fine_tune(
new_data,
epochs=10,
learning_rate=1e-4,
)
trainer.save("gnn_router_finetuned.pt")
# ========== Evaluation ==========
from core.gnn import evaluate_router
metrics = evaluate_router(
router,
test_data_list,
metrics=["accuracy", "f1", "precision", "recall"],
)
print(f"Test accuracy: {metrics['accuracy']:.3f}")
print(f"F1 score: {metrics['f1']:.3f}")
Comparing GNN models
# Experiment: compare performance across models
models = {
"GCN": create_gnn_router(GNNModelType.GCN, in_channels, out_channels, config),
"GAT": create_gnn_router(GNNModelType.GAT, in_channels, out_channels, config),
"GraphSAGE": create_gnn_router(GNNModelType.GraphSAGE, in_channels, out_channels, config),
"GIN": create_gnn_router(GNNModelType.GIN, in_channels, out_channels, config),
}
results = {}
for name, model in models.items():
trainer = GNNTrainer(model, training_config)
result = trainer.train(train_data, val_data)
results[name] = result
# Comparison
import pandas as pd
df = pd.DataFrame([
{
"Model": name,
"Val Acc": res["best_val_acc"],
"Train Time": res["training_time"],
"Params": sum(p.numel() for p in models[name].parameters()),
}
for name, res in results.items()
])
print(df)
# Output:
# | Model | Val Acc | Train Time | Params |
# |-----------|---------|------------|---------|
# | GCN | 0.853 | 12.5s | 45123 |
# | GAT | 0.891 | 18.3s | 67891 |
# | GraphSAGE | 0.874 | 15.2s | 52341 |
# | GIN | 0.867 | 14.8s | 48976 |
Production usage
# Load a trained model
router = GNNRouterInference.load("gnn_router.pt", feature_gen)
# Integrate with the runner
config = RunnerConfig(
adaptive=True,
routing_policy=RoutingPolicy.GNN_BASED,
)
runner = MACPRunner(
llm_caller=my_llm,
config=config,
gnn_router=router,
metrics_tracker=tracker,
)
# Execute with GNN routing
result = runner.run_round(graph)
# Monitor GNN predictions
print(f"GNN predictions used: {result.gnn_prediction_count}")
print(f"Fallback to heuristic: {result.fallback_to_heuristic_count}")
Hidden Channels
Hidden channels allow passing implicit information between agents as vector representations, bypassing text prompts. This is especially useful for:
- Passing contextual information without increasing prompt length
- Preserving semantic embeddings for downstream tasks
- Implementing attention mechanisms between agents
- Integrating with a GNN to predict next steps
Hidden channel architecture
βββββββββββββββ hidden_state βββββββββββββββ
β Agent A β ββββββββββββββββββ> β Agent B β
β (embedding) β embedding β (receives β
βββββββββββββββ β combined) β
βββββββββββββββ
Each agent owns its:
embeddingβ vector representation of the agent descriptionhidden_stateβ hidden state updated after execution
The runner combines predecessor hidden_state and embedding and passes them to the next agent.
Using hidden channels
from execution import RunnerConfig, MACPRunner, HiddenState
from core import NodeEncoder
# 1. Create an encoder for embeddings
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
# 2. Hidden-channel configuration
config = RunnerConfig(
enable_hidden_channels=True,
hidden_combine_strategy="mean", # Combine strategy
pass_embeddings=True, # Pass embeddings too
hidden_dim=384, # Hidden state dimensionality
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# 3. Compute agent embeddings
texts = [agent.to_text() for agent in graph.agents]
embeddings = encoder.encode(texts)
for agent, emb in zip(graph.agents, embeddings):
agent = agent.with_embedding(emb)
graph.update_agent(agent.agent_id, agent)
# 4. Execute with hidden channels
result = runner.run_round_with_hidden(
graph,
hidden_encoder=encoder, # To create hidden_state from responses
)
# 5. Access hidden states after execution
for agent_id, hidden in result.hidden_states.items():
print(f"{agent_id}:")
print(f" Hidden state: {hidden.tensor.shape}") # (hidden_dim,)
print(f" Embedding: {hidden.embedding.shape}") # (embedding_dim,)
print(f" Combined: {hidden.combined.shape}") # (hidden_dim + embedding_dim,)
# 6. Use hidden states for downstream tasks
hidden_states_matrix = torch.stack([
result.hidden_states[aid].tensor for aid in graph.node_ids
]) # Shape: (num_agents, hidden_dim)
# For example, cluster agents by semantics
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=3)
clusters = kmeans.fit_predict(hidden_states_matrix.cpu().numpy())
Combine strategies (combine_strategy)
When an agent has multiple predecessors, their hidden states are combined:
# 1. "mean" β average (default)
# hidden_combined = mean([h1, h2, h3])
config.hidden_combine_strategy = "mean"
# 2. "sum" β sum
# hidden_combined = h1 + h2 + h3
config.hidden_combine_strategy = "sum"
# 3. "concat" β concatenation
# hidden_combined = concat([h1, h2, h3]) # dimensionality increases
config.hidden_combine_strategy = "concat"
# 4. "attention" β weighted attention (weights from adjacency)
# hidden_combined = w1*h1 + w2*h2 + w3*h3, where wi = edge_weight(i -> current)
config.hidden_combine_strategy = "attention"
# 5. "max" β elementwise max
# hidden_combined = max(h1, h2, h3)
config.hidden_combine_strategy = "max"
Advanced: custom hidden-state processing
from utils.memory import HiddenChannel
# Create a custom HiddenChannel
channel = HiddenChannel(
node_id="agent_id",
hidden_dim=384,
)
# Set hidden state
import torch
channel.set_hidden(torch.randn(384))
channel.set_embedding(torch.randn(384))
# Get combined representation
combined = channel.get_combined(strategy="attention", edge_weights=torch.tensor([0.8, 0.2]))
# Reset
channel.reset()
# Integration with agent memory
from utils.memory import AgentMemory
memory = AgentMemory("agent_id")
memory.hidden_state = torch.randn(384)
memory.embedding = torch.randn(384)
# Get what to pass to the next agent
hidden_to_pass = memory.hidden_state
embedding_to_pass = memory.embedding
Using with a GNN
from core.gnn import GNNRouterInference, DefaultFeatureGenerator
# 1. Hidden states as features for a GNN
feature_gen = DefaultFeatureGenerator()
# Include hidden states into node features
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
metrics_tracker,
include_hidden_states=True, # Add hidden_state to features
)
# 2. GNN predicts the next agent based on hidden states
router = GNNRouterInference(model, feature_gen)
prediction = router.predict(
graph,
source="current_agent",
candidates=["next1", "next2"],
metrics_tracker=tracker,
hidden_states=result.hidden_states, # Pass current hidden states
)
# 3. Update the graph based on GNN predictions
if prediction.confidence > 0.8:
next_agent = prediction.recommended_nodes[0]
graph.add_edge("current_agent", next_agent, weight=prediction.confidence)
Example: multi-hop reasoning with hidden channels
# Task: multi-hop reasoning where each agent accumulates context
agents = [
AgentProfile(agent_id="reader", display_name="Document Reader"),
AgentProfile(agent_id="analyzer", display_name="Analyzer"),
AgentProfile(agent_id="reasoner", display_name="Reasoner"),
AgentProfile(agent_id="answerer", display_name="Final Answerer"),
]
edges = [
("reader", "analyzer"),
("analyzer", "reasoner"),
("reasoner", "answerer"),
]
graph = build_property_graph(agents, edges, query="Complex question")
# Enable hidden channels for context passing
config = RunnerConfig(
enable_hidden_channels=True,
hidden_combine_strategy="attention",
pass_embeddings=True,
)
encoder = NodeEncoder(model_name="sentence-transformers/all-MiniLM-L6-v2")
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round_with_hidden(graph, hidden_encoder=encoder)
# After each step, hidden_state contains the "accumulated context"
# answerer receives a weighted combination of all previous hidden states
Adaptive execution
Full control over adaptive execution:
from execution import (
MACPRunner,
RunnerConfig,
RoutingPolicy,
PruningConfig,
BudgetConfig,
ErrorPolicy,
)
config = RunnerConfig(
adaptive=True,
enable_parallel=True,
max_parallel_size=5,
routing_policy=RoutingPolicy.BEAM_SEARCH,
pruning_config=PruningConfig(
min_weight_threshold=0.1,
token_budget=10000,
enable_fallback=True,
max_fallback_attempts=2,
quality_scorer=lambda response: evaluate_quality(response),
min_quality_threshold=0.5,
),
budget_config=BudgetConfig(
total_token_limit=50000,
max_prompt_length=4000,
node_token_limit=2000,
),
error_policy=ErrorPolicy(
on_timeout=ErrorAction.RETRY,
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
),
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
print(f"Topology changes: {result.topology_changed_count}")
print(f"Fallbacks: {result.fallback_count}")
print(f"Pruned agents: {result.pruned_agents}")
Configuration
Environment variables
# API key (required)
export RWXF_API_KEY="sk-your-api-key"
# or via file
export RWXF_API_KEY_FILE=/secure/rwxf.key
# LLM service URL
export RWXF_BASE_URL="https://api.openai.com/v1"
# Models
export RWXF_MODEL_NAME="gpt-4o-mini"
export RWXF_EMBEDDING_MODEL="sentence-transformers/all-MiniLM-L6-v2"
# Logging
export RWXF_LOG_LEVEL="INFO"
export RWXF_LOG_FILE="./logs/framework.log"
# Network settings
export RWXF_DEFAULT_TIMEOUT=60
export RWXF_MAX_RETRIES=3
Programmatic configuration
from config import FrameworkSettings, load_settings
# Load from environment
settings = FrameworkSettings()
# Load from a .env file
settings = load_settings(".env")
# Access settings
api_key = settings.resolved_api_key
model = settings.model_name
timeout = settings.default_timeout
Usage examples
Example 1: Simple pipeline
from execution import AgentProfile, MACPRunner
from builder import build_property_graph
agents = [
AgentProfile(agent_id="researcher", display_name="Researcher"),
AgentProfile(agent_id="writer", display_name="Writer"),
AgentProfile(agent_id="editor", display_name="Editor"),
]
graph = build_property_graph(
agents,
workflow_edges=[("researcher", "writer"), ("writer", "editor")],
query="Write an article about quantum computers",
)
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
print(result.final_answer)
Example 2: Parallel processing
# Agents work in parallel, then results are aggregated
agents = [
AgentProfile(agent_id="analyst_1", display_name="Financial Analyst"),
AgentProfile(agent_id="analyst_2", display_name="Market Analyst"),
AgentProfile(agent_id="analyst_3", display_name="Risk Analyst"),
AgentProfile(agent_id="aggregator", display_name="Report Aggregator"),
]
edges = [
("analyst_1", "aggregator"),
("analyst_2", "aggregator"),
("analyst_3", "aggregator"),
]
graph = build_property_graph(agents, workflow_edges=edges, query="Analyze company X")
config = RunnerConfig(
enable_parallel=True,
max_parallel_size=3,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = await runner.arun_round(graph)
Example 3: Streaming with a callback
def on_event(event):
if event.event_type == StreamEventType.AGENT_OUTPUT:
save_to_db(event.agent_id, event.content)
notify_frontend(event)
runner = MACPRunner(llm_caller=my_llm)
for event in runner.stream(graph):
on_event(event)
if event.event_type == StreamEventType.TOKEN:
yield event.token # For SSE or WebSocket
Example 4: Working with memory
from execution import MACPRunner, RunnerConfig, MemoryConfig
config = RunnerConfig(
enable_memory=True,
memory_config=MemoryConfig(
working_max_entries=20,
long_term_max_entries=100,
),
memory_context_limit=5, # Include last 5 entries in the prompt
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# First round
result1 = runner.run_round(graph)
# Second round β agents remember context
graph.query = "Continue the previous task"
result2 = runner.run_round(graph)
# Access agent memory
agent_memory = runner.get_agent_memory("solver")
entries = agent_memory.get_messages()
Example 5: Graph visualization
from core import AgentProfile
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
MermaidDirection,
NodeStyle,
NodeShape,
# Convenience functions
to_mermaid,
to_ascii,
to_dot,
print_graph,
render_to_image,
)
from builder import build_property_graph
# Create a graph
agents = [
AgentProfile(
agent_id="input",
display_name="Input Handler",
tools=["api_reader"],
),
AgentProfile(
agent_id="processor",
display_name="Data Processor",
tools=["pandas", "torch"],
),
AgentProfile(
agent_id="output",
display_name="Output Formatter",
tools=["json", "csv"],
),
]
graph = build_property_graph(
agents,
workflow_edges=[("input", "processor"), ("processor", "output")],
query="Process data pipeline",
include_task_node=True,
)
# Option 1: Quick visualization (convenience functions)
print("=== MERMAID ===")
mermaid = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT)
print(mermaid)
print("\n=== ASCII ===")
ascii_art = to_ascii(graph, show_edges=True)
print(ascii_art)
print("\n=== COLORED (if Rich is installed) ===")
print_graph(graph, format="auto") # Automatically chooses colored or ascii
# Option 2: Advanced visualization with custom styles (Pydantic models)
# Create a style (Pydantic model with validation)
custom_style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#e3f2fd",
stroke_color="#1976d2",
icon="π€",
),
task_style=NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#fff3e0",
stroke_color="#f57c00",
icon="π",
),
show_weights=True,
show_tools=True,
max_label_length=30,
)
# Create a visualizer with the custom style
viz = GraphVisualizer(graph, custom_style)
# Mermaid with a title
mermaid_styled = viz.to_mermaid(title="Data Pipeline")
print("\n=== STYLED MERMAID ===")
print(mermaid_styled)
# Save to files
viz.save_mermaid("pipeline.md", title="Data Pipeline") # Markdown with ```mermaid```
viz.save_dot("pipeline.dot", graph_name="DataPipeline")
# Render to images (requires system Graphviz)
try:
render_to_image(graph, "pipeline.png", format="png", dpi=150, style=custom_style)
render_to_image(graph, "pipeline.svg", format="svg", style=custom_style)
print("\nβ
Images created: pipeline.png, pipeline.svg")
except Exception as e:
print(f"\nβ οΈ Image rendering failed: {e}")
print(" Install system Graphviz to render images")
# Adjacency matrix (text representation)
print("\n=== ADJACENCY MATRIX ===")
matrix = viz.to_adjacency_matrix(show_labels=True)
print(matrix)
# Rich Console output with trees and tables
print("\n=== RICH CONSOLE ===")
viz.print_colored()
Example 6: Conditional routing
from builder import GraphBuilder
from execution.scheduler import ConditionContext
# Define conditions
def is_high_quality(context: ConditionContext) -> bool:
return context.state.get("quality", 0) > 0.8
def needs_review(context: ConditionContext) -> bool:
return context.state.get("word_count", 0) > 1000
# Build a graph with conditional edges
builder = GraphBuilder()
builder.add_agent(agent_id="writer", display_name="Content Writer")
builder.add_agent(agent_id="editor", display_name="Quick Editor")
builder.add_agent(agent_id="reviewer", display_name="Senior Reviewer")
builder.add_agent(agent_id="publisher", display_name="Publisher")
# Conditional transitions
builder.add_conditional_edge("writer", "editor", condition=is_high_quality)
builder.add_conditional_edge("writer", "reviewer", condition=needs_review)
builder.add_workflow_edge("editor", "publisher")
builder.add_workflow_edge("reviewer", "publisher")
graph = builder.build()
# Run
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
Example 7: Monitoring with events
from core.events import (
global_event_bus,
EventType,
MetricsEventHandler,
)
# Configure event handlers
bus = global_event_bus()
metrics_handler = MetricsEventHandler()
# Subscribe to events
bus.subscribe(None, metrics_handler) # Listen to all events
@bus.subscribe(EventType.STEP_COMPLETED)
def on_step_completed(event):
print(f"β
{event.agent_id} completed in {event.duration_ms:.0f}ms")
@bus.subscribe(EventType.BUDGET_WARNING)
def on_budget_warning(event):
print(f"β οΈ Budget {event.budget_type}: {event.ratio:.1%}")
# Run with monitoring
runner = MACPRunner(llm_caller=my_llm)
result = runner.run_round(graph)
# Get aggregated metrics
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Errors: {metrics['errors_count']}")
print(f"Avg step duration: {metrics['avg_step_duration_ms']:.1f}ms")
Example 8: GNN routing with training
from core.gnn import (
create_gnn_router,
GNNTrainer,
GNNRouterInference,
GNNModelType,
TrainingConfig,
DefaultFeatureGenerator,
)
from core.metrics import MetricsTracker
import torch
# Collect execution data for training
tracker = MetricsTracker()
# ... run several rounds with different queries ...
for i in range(100):
result = runner.run_round(graph)
# Record metrics
for agent_id, response in result.messages.items():
tracker.record_node_execution(
node_id=agent_id,
success=True,
latency_ms=response["latency"],
cost_tokens=response["tokens"],
quality=evaluate_quality(response["content"]),
)
# Feature generation
feature_gen = DefaultFeatureGenerator()
node_features = feature_gen.generate_node_features(
graph,
graph.node_ids,
tracker,
)
# Create dataset
# ... prepare train_data, val_data in PyG Data format ...
# Train the model
config = TrainingConfig(
learning_rate=1e-3,
hidden_dim=64,
num_layers=2,
epochs=50,
task="node_classification",
)
model = create_gnn_router(
model_type=GNNModelType.GAT,
in_channels=node_features.shape[1],
out_channels=2,
config=config,
)
trainer = GNNTrainer(model, config)
result = trainer.train(train_data, val_data)
print(f"Best validation accuracy: {result['best_val_acc']:.3f}")
trainer.save("gnn_router.pt")
# Use the trained model for routing
router = GNNRouterInference(model, feature_gen)
prediction = router.predict(
graph,
source="coordinator",
candidates=["agent1", "agent2", "agent3"],
metrics_tracker=tracker,
)
print(f"Recommended: {prediction.recommended_nodes[0]}")
print(f"Confidence: {prediction.confidence:.3f}")
Example 9: Adaptive execution with a budget
from execution import (
MACPRunner,
RunnerConfig,
RoutingPolicy,
PruningConfig,
)
from execution.budget import Budget
# Configure adaptive execution
config = RunnerConfig(
adaptive=True,
enable_parallel=True,
max_parallel_size=3,
routing_policy=RoutingPolicy.WEIGHTED_TOPO,
pruning_config=PruningConfig(
min_weight_threshold=0.1,
token_budget=5000,
enable_fallback=True,
max_fallback_attempts=2,
),
budget_config=BudgetConfig(
total_token_limit=10000,
node_token_limit=2000,
max_prompt_length=3000,
warn_at_usage_ratio=0.8,
),
timeout=60.0,
max_retries=2,
)
runner = MACPRunner(llm_caller=my_llm, config=config)
# Execute
try:
result = runner.run_round(graph)
print(f"Executed agents: {len(result.execution_order)}")
print(f"Pruned agents: {result.pruned_agents}")
print(f"Topology changes: {result.topology_changed_count}")
print(f"Fallback count: {result.fallback_count}")
print(f"Total tokens: {result.total_tokens}")
except BudgetExceededError as e:
print(f"Budget exceeded: {e}")
except ExecutionError as e:
print(f"Execution failed: {e}")
Example 10: Graph analysis with algorithms
from core.algorithms import (
GraphAlgorithms,
CentralityType,
PathMetric,
)
# Create a complex graph
algo = GraphAlgorithms(graph)
# Find critical nodes
centrality = algo.centrality(CentralityType.BETWEENNESS, normalized=True)
print(f"Most critical agents: {centrality.top_nodes[:3]}")
# Find alternative paths
paths = algo.k_shortest_paths(
source="input",
target="output",
k=3,
metric=PathMetric.WEIGHTED,
)
print(f"Found {len(paths)} alternative paths:")
for i, path in enumerate(paths, 1):
print(f" Path {i}: {' -> '.join(path.nodes)} (cost: {path.cost:.2f})")
# Detect communities
communities = algo.detect_communities(algorithm="louvain")
print(f"Communities found: {len(communities.communities)}")
for i, community in enumerate(communities.communities):
print(f" Community {i}: {community}")
# Cycle check
cycles = algo.find_cycles(max_length=5)
if cycles.has_cycles:
print(f"β οΈ Graph has {len(cycles.cycles)} cycles!")
else:
print("β Graph is acyclic (DAG)")
Example 11: Multi-model system with cost optimization
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# Build a graph with different models for different tasks
builder = GraphBuilder()
# Stage 1: Data collection (5 parallel agents, cheap model)
for i in range(5):
builder.add_agent(
f"collector_{i}",
display_name=f"Data Collector {i}",
persona="Collects and formats raw data",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.2,
max_tokens=500,
)
builder.add_workflow_edge(f"collector_{i}", "analyst")
# Stage 2: Deep analysis (1 agent, strong model)
builder.add_agent(
"analyst",
display_name="Senior Data Analyst",
persona="Expert analyst with deep statistical knowledge",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=4000,
)
builder.add_workflow_edge("analyst", "privacy_checker")
# Stage 3: Privacy compliance check (local model)
builder.add_agent(
"privacy_checker",
display_name="Privacy Compliance Checker",
persona="Ensures data privacy and compliance",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed",
temperature=0.0,
max_tokens=1000,
)
builder.add_workflow_edge("privacy_checker", "reporter")
# Stage 4: Report generation (cheap model)
builder.add_agent(
"reporter",
display_name="Report Generator",
persona="Formats analysis into readable reports",
llm_backbone="gpt-4o-mini",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.5,
max_tokens=2000,
)
builder.set_task(
query="Analyze Q4 sales data and generate a compliance report",
description="Full pipeline from data collection to the final report",
)
graph = builder.build()
# Print configuration
print("=== Multi-Model Pipeline Configuration ===\n")
for agent in graph.agents:
if hasattr(agent, 'llm_config') and agent.llm_config:
config = agent.llm_config
print(f"{agent.display_name}:")
print(f" Model: {config.model_name}")
print(f" Endpoint: {config.base_url}")
print(f" Temp: {config.temperature}, Max tokens: {config.max_tokens}")
print()
# Create factory and runner
factory = LLMCallerFactory.create_openai_factory()
config = RunnerConfig(
enable_parallel=True,
max_parallel_size=5, # Collectors run in parallel
timeout=120.0,
callbacks=[StdoutCallbackHandler()], # Execution monitoring
)
runner = MACPRunner(
llm_factory=factory,
config=config,
)
# Execute
print("=== Executing Multi-Model Pipeline ===\n")
result = runner.run_round(graph)
print(f"\n=== Results ===")
print(f"Execution order: {' β '.join(result.execution_order)}")
print(f"Total time: {result.total_time:.2f}s")
print(f"Total tokens: {result.total_tokens}")
print(f"\nFinal report:\n{result.final_answer}")
# Token usage analysis by model
from collections import defaultdict
costs_by_model = defaultdict(int)
for agent_id in result.execution_order:
agent = graph.get_agent_by_id(agent_id)
model = agent.llm_config.model_name if agent.llm_config else "default"
tokens = result.messages.get(agent_id, {}).get("tokens", 0)
costs_by_model[model] += tokens
print(f"\n=== Token Usage by Model ===")
for model, tokens in costs_by_model.items():
print(f"{model}: {tokens} tokens")
# Savings calculation
# gpt-4: $30/$60 per 1M tokens (input/output)
# gpt-4o-mini: $0.15/$0.60 per 1M tokens
# llama3 (local): $0
gpt4_tokens = costs_by_model.get("gpt-4", 0)
mini_tokens = costs_by_model.get("gpt-4o-mini", 0)
llama_tokens = costs_by_model.get("llama3:70b", 0)
actual_cost = (gpt4_tokens * 45 / 1_000_000) + (mini_tokens * 0.375 / 1_000_000)
if_all_gpt4_cost = (gpt4_tokens + mini_tokens + llama_tokens) * 45 / 1_000_000
print(f"\n=== Cost Analysis ===")
print(f"Actual cost: ${actual_cost:.4f}")
print(f"Cost if all GPT-4: ${if_all_gpt4_cost:.4f}")
print(f"Savings: ${if_all_gpt4_cost - actual_cost:.4f} ({((1 - actual_cost/if_all_gpt4_cost)*100):.1f}%)")
Token budget (Budget System)
Resource management for execution (tokens, requests, time).
from execution.budget import (
Budget,
BudgetConfig,
NodeBudget,
BudgetTracker,
)
# Budget β tracks a single resource (tokens, requests, or time)
token_budget = Budget(limit=50000)
print(f"Available: {token_budget.available}")
print(f"Usage ratio: {token_budget.usage_ratio:.1%}")
can_spend = token_budget.can_spend(100) # Check before using
token_budget.spend(100) # Record usage
# Per-node budget (composed of Budget objects)
node_budget = NodeBudget(
node_id="solver",
tokens=Budget(limit=2000),
requests=Budget(limit=10),
time_seconds=Budget(limit=60),
)
# Budget tracker β configured via BudgetConfig
config = BudgetConfig(
total_token_limit=50000, # Global token limit
total_request_limit=100, # Global request limit
total_time_limit_seconds=600, # Global time limit (10 min)
node_token_limit=2000, # Per-node token limit
max_prompt_length=4000, # Max chars in a prompt
max_response_length=2000, # Max chars in a response
warn_at_usage_ratio=0.8, # Warn at 80%
)
tracker = BudgetTracker(config=config)
tracker.start() # Start the timer
# Availability check
can_run, reason = tracker.can_execute("solver", estimated_tokens=100)
if can_run:
# Record usage after execution
tracker.record_usage(
node_id="solver",
prompt_tokens=80,
completion_tokens=120,
latency_seconds=1.5,
)
# Prompt/response truncation when exceeding limits
prompt = "a very long prompt..."
truncated = tracker.truncate_prompt(prompt)
# Budget summary
summary = tracker.get_summary()
print(f"Tokens used: {summary['global']['tokens']['used']}")
print(f"Time elapsed: {summary['global']['elapsed_seconds']:.1f}s")
# Reset
tracker.reset()
Integration with RunnerConfig
from execution import RunnerConfig, BudgetConfig
config = RunnerConfig(
budget_config=BudgetConfig(
total_token_limit=50000,
node_token_limit=2000,
max_prompt_length=4000,
warn_at_usage_ratio=0.8,
),
)
Error handling (Error Handling)
Structured exceptions and error-handling policies.
from execution.errors import (
ExecutionError,
TimeoutError,
RetryExhaustedError,
BudgetExceededError,
AgentNotFoundError,
ValidationError,
ErrorPolicy,
ErrorAction,
ExecutionMetrics,
)
# Error policy
error_policy = ErrorPolicy(
on_timeout=ErrorAction.RETRY, # retry, skip, prune, fallback, rollback, abort
on_retry_exhausted=ErrorAction.PRUNE,
on_budget_exceeded=ErrorAction.ABORT,
on_validation_error=ErrorAction.ABORT,
on_agent_not_found=ErrorAction.SKIP,
on_unknown_error=ErrorAction.SKIP,
max_skipped_agents=5,
abort_on_critical_path=True,
)
# Apply in configuration
config = RunnerConfig(
error_policy=error_policy,
max_retries=3,
timeout=60.0,
)
# Error handling
try:
result = runner.run_round(graph)
except TimeoutError as e:
print(f"Timeout: {e}")
except RetryExhaustedError as e:
print(f"Retries exhausted: {e}")
except BudgetExceededError as e:
print(f"Budget exceeded: {e}")
except ExecutionError as e:
print(f"Execution error: {e}")
# Access metrics
metrics: ExecutionMetrics = e.metrics
print(f"Retries: {metrics.retry_count}")
print(f"Fallbacks: {metrics.fallback_count}")
# Get metrics from the result
if result.errors:
for error in result.errors:
print(f"{error['agent_id']}: {error['type']} - {error['message']}")
Graph algorithms (Graph Algorithms)
A service layer for graph analysis using rustworkx algorithms.
from core.algorithms import (
GraphAlgorithms,
CentralityType,
PathMetric,
SubgraphFilter,
)
algo = GraphAlgorithms(graph)
# K shortest paths
paths = algo.k_shortest_paths(
source="researcher",
target="writer",
k=3,
metric=PathMetric.HOP_COUNT, # HOP_COUNT, WEIGHTED, RELIABILITY
edge_weights=None, # or custom weights
)
for i, path in enumerate(paths):
print(f"Path {i+1}: {path.nodes} (cost={path.cost:.2f})")
# Node centrality
centrality = algo.centrality(
centrality_type=CentralityType.BETWEENNESS, # DEGREE, BETWEENNESS, CLOSENESS, EIGENVECTOR, PAGERANK
normalized=True,
)
print(f"Most central node: {centrality.top_nodes[0]}")
print(f"Scores: {centrality.scores}")
# Community detection
communities = algo.detect_communities(algorithm="louvain") # louvain, label_propagation
print(f"Communities found: {len(communities.communities)}")
print(f"Modularity: {communities.modularity:.3f}")
# Cycle search
cycles = algo.find_cycles(max_length=5)
if cycles.has_cycles:
print(f"Cycles found: {len(cycles.cycles)}")
for cycle in cycles.cycles:
print(f" {cycle}")
# Subgraph filtering
subgraph_filter = SubgraphFilter(
include_node_ids=["a", "b", "c"],
min_edge_weight=0.5,
max_hop_distance=2,
from_node="a",
)
subgraph = algo.filter_subgraph(subgraph_filter)
print(f"Nodes in subgraph: {len(subgraph.node_ids)}")
# Reachability analysis
reachable = algo.get_reachable_nodes("start", max_distance=3)
print(f"Reachable nodes: {reachable}")
# Topological order
if algo.is_dag():
topo_order = algo.topological_sort()
print(f"Topological order: {topo_order}")
Metrics Tracker
Collects and aggregates performance metrics for nodes and edges.
from core.metrics import (
MetricsTracker,
NodeMetrics,
EdgeMetrics,
MetricAggregator,
ExponentialMovingAverage,
SlidingWindowAverage,
)
tracker = MetricsTracker()
# Record node metrics
tracker.record_node_execution(
node_id="solver",
success=True,
latency_ms=150,
cost_tokens=200,
quality=0.95,
)
# Record edge metrics
tracker.record_edge_traversal(
source="solver",
target="checker",
weight=0.9,
success=True,
latency_ms=50,
)
# Get node metrics
metrics: NodeMetrics = tracker.get_node_metrics("solver")
print(f"Reliability: {metrics.reliability:.3f}")
print(f"Avg latency: {metrics.avg_latency_ms:.1f}ms")
print(f"Total cost: {metrics.total_cost_tokens}")
print(f"Avg quality: {metrics.avg_quality:.3f}")
print(f"Executions: {metrics.execution_count}")
# Get edge metrics
edge_metrics: EdgeMetrics = tracker.get_edge_metrics("solver", "checker")
print(f"Edge reliability: {edge_metrics.reliability:.3f}")
print(f"Traversals: {edge_metrics.traversal_count}")
# Snapshot of all metrics
snapshot = tracker.snapshot()
print(f"Timestamp: {snapshot.timestamp}")
print(f"Node metrics: {snapshot.node_metrics}")
print(f"Edge metrics: {snapshot.edge_metrics}")
# Metrics history (if enabled)
tracker = MetricsTracker(keep_history=True, history_window=100)
# ... records ...
history = tracker.get_history(node_id="solver")
for snapshot in history.snapshots:
print(f"{snapshot.timestamp}: {snapshot.metrics}")
# Custom aggregators
ema = ExponentialMovingAverage(alpha=0.1)
tracker.set_aggregator("solver", "latency", ema)
swa = SlidingWindowAverage(window_size=10)
tracker.set_aggregator("checker", "quality", swa)
# Export metrics
data = tracker.to_dict()
tracker.save("metrics.json")
# Load metrics
tracker = MetricsTracker.load("metrics.json")
Visualization
Tools for visualizing graphs in different formats. All visualization styles are based on Pydantic models for validation and type safety.
Core classes
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
MermaidDirection,
NodeShape,
NodeStyle,
EdgeStyle,
# Convenience functions
to_mermaid,
to_ascii,
to_dot,
print_graph,
render_to_image,
show_graph_interactive,
)
1. Quick usage (convenience functions)
# Simple Mermaid
mermaid_code = to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT)
print(mermaid_code)
# Simple ASCII
ascii_art = to_ascii(graph, show_edges=True)
print(ascii_art)
# Simple DOT
dot_code = to_dot(graph, graph_name="MyGraph")
print(dot_code)
# Print to console (auto-selects Rich or ASCII)
print_graph(graph, format="auto") # "auto", "colored", "ascii", "mermaid"
# Render to image (requires system Graphviz)
render_to_image(graph, "output.png", format="png", dpi=300)
render_to_image(graph, "output.svg", format="svg")
# Interactive view (opens in system viewer)
show_graph_interactive(graph, graph_name="MyWorkflow")
2. Advanced usage (GraphVisualizer with custom styles)
VisualizationStyle, NodeStyle, EdgeStyle are Pydantic models with field validation.
# Create custom node styles (Pydantic models)
agent_style = NodeStyle(
shape=NodeShape.ROUND, # RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc.
fill_color="#e3f2fd", # Fill color
stroke_color="#1976d2", # Border color
text_color="#000000", # Text color
icon="π€", # Emoji icon
)
task_style = NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#fff3e0",
stroke_color="#f57c00",
icon="π",
)
# Edge styles (Pydantic models)
workflow_edge = EdgeStyle(
line_style="solid", # solid, dashed, dotted
arrow_head="normal", # normal, none, diamond
color="#1976d2",
label_color="#333333",
)
task_edge = EdgeStyle(
line_style="dashed",
color="#f57c00",
)
# Global visualization style (Pydantic model)
style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT, # TOP_BOTTOM, BOTTOM_TOP, LEFT_RIGHT, RIGHT_LEFT
agent_style=agent_style,
task_style=task_style,
workflow_edge_style=workflow_edge,
task_edge_style=task_edge,
show_weights=True, # Show edge weights
show_probabilities=False, # Show probabilities
show_tools=True, # Show agent tools
show_descriptions=False, # Show descriptions
max_label_length=30, # Max label length
)
# Create a visualizer with custom style
viz = GraphVisualizer(graph, style)
# Mermaid diagrams
mermaid = viz.to_mermaid(
direction=MermaidDirection.TOP_BOTTOM, # Can override style
title="Agent Workflow", # Diagram title
)
print(mermaid)
# Save Mermaid to a file
viz.save_mermaid("graph.md", title="My Workflow") # Wraps in ```mermaid```
viz.save_mermaid("graph.mmd", title="My Workflow") # Raw .mmd without wrapper
# ASCII art for terminal
ascii_art = viz.to_ascii(
show_edges=True,
box_width=20,
)
print(ascii_art)
# Graphviz DOT
dot = viz.to_dot(
graph_name="AgentGraph",
rankdir="LR", # TB, LR, BT, RL
)
viz.save_dot("graph.dot", graph_name="AgentGraph")
# Render to image (requires installed Graphviz)
viz.render_image(
"output.png",
format="png", # png, svg, pdf, jpg
dpi=300, # For raster formats
graph_name="MyGraph",
)
# Interactive view
viz.show_interactive(graph_name="MyGraph") # Opens system viewer
# Adjacency matrix (text representation)
matrix = viz.to_adjacency_matrix(show_labels=True)
print(matrix)
3. Colored terminal output (Rich Console)
# Automatic colored output (if Rich is installed)
print_graph(graph, format="colored")
# Or via visualizer
viz = GraphVisualizer(graph)
viz.print_colored() # Pretty output with trees, tables, and colors
4. Full configuration example
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
NodeStyle,
EdgeStyle,
NodeShape,
MermaidDirection,
)
# Fully configured style
custom_style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#bbdefb",
stroke_color="#0d47a1",
icon="π€",
),
task_style=NodeStyle(
shape=NodeShape.DIAMOND,
fill_color="#ffe0b2",
stroke_color="#e65100",
icon="π",
),
workflow_edge_style=EdgeStyle(
line_style="solid",
color="#1976d2",
),
task_edge_style=EdgeStyle(
line_style="dashed",
color="#f57c00",
),
show_weights=True,
show_tools=True,
max_label_length=40,
)
viz = GraphVisualizer(graph, custom_style)
# Generate all formats
viz.save_mermaid("docs/graph.md", title="Workflow")
viz.save_dot("docs/graph.dot")
viz.render_image("docs/graph.png", format="png", dpi=150)
viz.render_image("docs/graph.svg", format="svg")
print(viz.to_ascii())
5. Installing Graphviz for image rendering
For render_image() and render_to_image() you need:
- Python library:
pip install graphviz - System Graphviz:
- Ubuntu/Debian:
sudo apt install graphviz - macOS:
brew install graphviz - Windows:
winget install graphvizor https://graphviz.org/download/
- Ubuntu/Debian:
Schema System
A complete system of Pydantic schemas for type-safe validation, serialization, and migration of graph data. All schemas inherit from pydantic.BaseModel and provide automatic type validation, default values, and data conversion.
Core schema classes
from core.schema import (
# Versioning
SCHEMA_VERSION,
SchemaVersion,
# Node and edge types
NodeType,
EdgeType,
# Node schemas (Pydantic BaseModel)
BaseNodeSchema,
AgentNodeSchema,
TaskNodeSchema,
# Edge schemas (Pydantic BaseModel)
BaseEdgeSchema,
WorkflowEdgeSchema,
CostMetrics,
# Graph schema (Pydantic BaseModel)
GraphSchema,
# LLM configuration (Pydantic BaseModel)
LLMConfig,
# Validation (Pydantic BaseModel)
ValidationResult,
SchemaValidator,
# Migrations
SchemaMigration,
MigrationRegistry,
migrate_schema,
)
1. Creating node schemas (Pydantic models)
# Agent with a full LLM configuration
agent_node = AgentNodeSchema(
id="solver",
type=NodeType.AGENT,
display_name="Math Solver",
persona="You are an expert mathematician",
description="Solves complex math problems step by step",
tools=["calculator", "wolfram_alpha"],
# LLM configuration (Pydantic model)
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=2000,
# Metrics and state
trust_score=0.95,
quality_score=0.9,
success_rate=1.0,
total_calls=0,
total_tokens_used=0,
# Pydantic validates embedding automatically
embedding=[0.1, 0.2, 0.3], # Can be a list or torch.Tensor
embedding_dim=3, # Auto-filled if None
# Metadata (arbitrary data)
metadata={"priority": "high", "category": "math"},
tags={"solver", "math", "primary"},
)
# Task
task_node = TaskNodeSchema(
id="main_task",
type=NodeType.TASK,
query="Solve: x^2 + 5x + 6 = 0",
description="Main mathematical task",
expected_output="Two solutions: x1, x2",
max_iterations=10,
status="pending", # pending, running, completed, failed
)
# Extract LLM configuration from the agent
llm_config: LLMConfig = agent_node.get_llm_config()
print(f"Model: {llm_config.model_name}")
print(f"Configured: {llm_config.is_configured()}")
print(f"Generation params: {llm_config.to_generation_params()}")
# Check whether an LLM configuration exists
if agent_node.has_llm_config():
print("Agent has LLM configuration")
2. Creating edge schemas (Pydantic models)
# Base edge with cost metrics (Pydantic model)
edge = BaseEdgeSchema(
source="solver",
target="checker",
type=EdgeType.WORKFLOW,
weight=1.0,
probability=0.95,
bidirectional=False,
# Cost metrics (Pydantic model)
cost=CostMetrics(
estimated_tokens=500,
actual_tokens=None,
latency_ms=150.0,
timeout_ms=5000.0,
trust=0.9,
reliability=0.95,
cost_usd=0.01,
custom={"priority": 1.0},
),
# Pydantic validates attr automatically
attr=[1.0, 0.95, 0.9], # Can be a list or torch.Tensor
attr_dim=3, # Auto-filled if None
metadata={"route": "primary"},
)
# Workflow edge with conditional routing
conditional_edge = WorkflowEdgeSchema(
source="solver",
target="checker",
type=EdgeType.WORKFLOW,
weight=0.9,
probability=1.0,
# Conditional routing
condition="source_success", # Name of a built-in or registered condition
priority=1, # Priority (higher = checked earlier)
transform="extract_answer", # Optional data transform
is_conditional=True, # Auto-set if condition is provided
)
# Get edge features
feature_vector = edge.get_feature_vector(feature_names=["trust", "reliability"])
print(f"Features: {feature_vector}")
# Convert to torch.Tensor
attr_tensor = edge.to_attr_tensor()
print(f"Attr tensor: {attr_tensor}")
3. Full graph schema (Pydantic model)
from datetime import datetime
# GraphSchema - the main Pydantic model
schema = GraphSchema(
schema_version=SCHEMA_VERSION, # "2.0.0"
name="Math Pipeline",
description="A workflow for solving mathematical problems",
created_at=datetime.now(),
updated_at=datetime.now(),
# nodes is dict[str, BaseNodeSchema], not a list!
nodes={
"solver": AgentNodeSchema(
id="solver",
display_name="Math Solver",
description="Solves math problems",
tools=["calculator"],
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
),
"checker": AgentNodeSchema(
id="checker",
display_name="Answer Checker",
description="Validates solutions",
llm_backbone="gpt-4o-mini",
),
"__task__": TaskNodeSchema(
id="__task__",
query="Solve: x^2 + 5x + 6 = 0",
),
},
edges=[
WorkflowEdgeSchema(
source="solver",
target="checker",
weight=0.9,
type=EdgeType.WORKFLOW,
),
],
# Feature names for feature extraction
node_feature_names=["trust_score", "quality_score"],
edge_feature_names=["trust", "reliability"],
# Metadata
metadata={
"created_by": "user@example.com",
"purpose": "math_pipeline",
"version": "1.0",
},
)
# Add nodes and edges
new_agent = AgentNodeSchema(
id="reviewer",
display_name="Reviewer",
)
schema.add_node(new_agent)
new_edge = BaseEdgeSchema(
source="checker",
target="reviewer",
)
schema.add_edge(new_edge)
# Retrieve nodes and edges
solver_node = schema.get_node("solver")
edges_from_solver = schema.get_edges(source="solver")
edges_to_checker = schema.get_edges(target="checker")
# Compute feature dimensionalities
schema.compute_feature_dims()
print(f"Node feature dim: {schema.node_feature_dim}")
print(f"Edge feature dim: {schema.edge_feature_dim}")
4. Serialization and validation (Pydantic)
# Serialization (Pydantic methods)
schema_dict = schema.model_dump() # Dict[str, Any]
schema_json = schema.model_dump_json(indent=2) # JSON string
# Or a specialized method
schema_data = schema.to_dict()
# Deserialization (Pydantic methods)
loaded_schema = GraphSchema.model_validate(schema_dict)
loaded_from_json = GraphSchema.model_validate_json(schema_json)
# Schema validation (returns ValidationResult - Pydantic model)
validator = SchemaValidator(
check_cycles=True,
check_duplicates=True,
check_orphans=True,
check_connectivity=False,
)
result: ValidationResult = validator.validate(schema)
if result.valid:
print("β Schema is valid")
else:
print("β Validation errors:")
for error in result.errors:
print(f" - {error}")
if result.warnings:
print("β Warnings:")
for warning in result.warnings:
print(f" - {warning}")
5. Schema migration between versions
# Automatic migration of legacy data
old_data = {
"schema_version": "1.0.0",
"agents": [ # Old format (agents list)
{"agent_id": "solver", "display_name": "Solver"},
],
"edges": [
{"source": "solver", "target": "checker"},
],
}
# Migrate to the current version (2.0.0)
migrated_data = migrate_schema(old_data)
print(f"Migrated to version: {migrated_data['schema_version']}")
# Create a custom migration
from core.schema import SchemaMigration, register_migration
class MyCustomMigration(SchemaMigration):
from_version = "1.5.0"
to_version = "2.0.0"
def migrate(self, data: dict) -> dict:
# Your migration logic
data["new_field"] = "default_value"
return data
# Register migration
register_migration(MyCustomMigration())
6. Versioning
# Check schema version
current_version = SchemaVersion.parse(SCHEMA_VERSION) # "2.0.0"
print(f"Current: {current_version}")
old_version = SchemaVersion.parse("1.5.0")
print(f"Compatible: {current_version.is_schema_compatible(old_version)}") # False (different major versions)
print(f"Newer: {current_version > old_version}") # True
Benefits of Pydantic schemas
- Automatic type validation β Pydantic checks types when creating objects
- Default values β fields are auto-populated
- Type conversion β automatic conversion (torch.Tensor β list)
- Serialization/deserialization β built-in
.model_dump(),.model_validate() - Extensibility β
extra="allow"enables arbitrary fields - Immutability β
frozen=Truefor immutable models - Documentation β automatic JSON Schema generation
7. Agent input/output validation
New: Each agent can have input_schema and output_schema to validate incoming data and outputs. This allows you to:
- π Guarantee data correctness
- π Automatically parse structured outputs
- π« Catch invalid LLM outputs
- π Generate JSON Schema for prompts
Prompt injection:
_build_promptautomatically injects schemas into the LLM prompt.
output_schemaβ system message:"Respond with JSON matching: {schema}"input_schemaβ user message:"Input format: {schema}"The schemas are serialised as compact JSON (no extra whitespace) to minimise token usage. No manual prompt engineering is required.
Imports
from pydantic import BaseModel
from core.schema import (
AgentNodeSchema,
SchemaValidationResult, # Validation result
)
from builder import GraphBuilder
7.1. Create an agent with Pydantic schemas
# Define input/output schemas as Pydantic models
class SolverInput(BaseModel):
question: str
context: str | None = None
difficulty: int = 1
class SolverOutput(BaseModel):
answer: str
confidence: float # 0.0 - 1.0
explanation: str | None = None
# Create an agent with validation
builder = GraphBuilder()
builder.add_agent(
"solver",
display_name="Math Solver",
persona="Expert mathematician",
description="Solves mathematical problems",
# Schemas for validation
input_schema=SolverInput,
output_schema=SolverOutput,
# LLM configuration
llm_backbone="gpt-4",
temperature=0.0,
)
graph = builder.build()
7.2. Using JSON Schema (without Pydantic)
You can pass a plain dict with JSON Schema:
# JSON Schema directly (without Pydantic models)
input_schema = {
"type": "object",
"properties": {
"question": {"type": "string"},
"context": {"type": "string"},
},
"required": ["question"]
}
output_schema = {
"type": "object",
"properties": {
"answer": {"type": "string"},
"confidence": {"type": "number"},
},
"required": ["answer", "confidence"]
}
builder.add_agent(
"solver",
input_schema=input_schema, # JSON Schema dict
output_schema=output_schema, # JSON Schema dict
)
7.3. Validation via RoleGraph
# Check whether schemas exist
has_input = graph.has_input_schema("solver") # True
has_output = graph.has_output_schema("solver") # True
# Validate input data
result: SchemaValidationResult = graph.validate_agent_input(
"solver",
{"question": "Solve x^2 + 5x + 6 = 0"}
)
if result.valid:
print("β
Input is valid")
print(f"Validated data: {result.validated_data}")
else:
print("β Input validation failed")
print(f"Errors: {result.errors}")
# Validate output data (JSON string or dict)
response = '{"answer": "x1=-2, x2=-3", "confidence": 0.95}'
result = graph.validate_agent_output("solver", response)
if result.valid:
parsed = result.validated_data
print(f"Answer: {parsed['answer']}")
print(f"Confidence: {parsed['confidence']}")
else:
print(f"Invalid output: {result.errors}")
# You can raise an exception
result.raise_if_invalid() # -> ValueError
7.4. Getting JSON Schema for prompts
# Get JSON Schema for LLM instructions
input_schema_json = graph.get_input_schema_json("solver")
output_schema_json = graph.get_output_schema_json("solver")
# Use in the prompt
prompt = f"""You are a math solver.
INPUT FORMAT:
{json.dumps(input_schema_json, indent=2)}
You MUST respond in the following JSON format:
{json.dumps(output_schema_json, indent=2)}
Now solve: {{question}}
"""
7.5. Validation directly via AgentNodeSchema
# Create an agent with schemas
agent = AgentNodeSchema(
id="solver",
display_name="Math Solver",
input_schema=SolverInput,
output_schema=SolverOutput,
)
# Validate
result = agent.validate_input({"question": "2+2=?"})
print(f"Valid: {result.valid}")
result = agent.validate_output('{"answer": "4", "confidence": 0.99}')
print(f"Valid: {result.valid}, data: {result.validated_data}")
# Check schema presence
if agent.has_input_schema():
print("Agent has input schema")
if agent.has_output_schema():
print("Agent has output schema")
7.6. Handling invalid LLM outputs
# Scenario: the LLM responds in the wrong format
response = llm_call(prompt)
result = graph.validate_agent_output("solver", response)
if not result.valid:
# Option 1: Retry with a stricter prompt
retry_prompt = f"{prompt}\n\nβ οΈ IMPORTANT: You MUST respond with valid JSON!"
response = llm_call(retry_prompt)
result = graph.validate_agent_output("solver", response)
if not result.valid:
# Option 2: Fallback to default values
parsed = {
"answer": response,
"confidence": 0.5,
"explanation": "LLM failed to format correctly"
}
else:
parsed = result.validated_data
else:
parsed = result.validated_data
print(f"Final answer: {parsed['answer']}")
7.7. SchemaValidationResult API
class SchemaValidationResult(BaseModel):
"""Schema validation result."""
valid: bool # True if data is valid
schema_type: str # "input" or "output"
errors: list[str] # Validation errors
warnings: list[str] # Validation warnings
validated_data: dict[str, Any] | None # Validated data
message: str # Additional message
# Methods
result.raise_if_invalid() # Raise ValueError if invalid
7.8. Serialization support
When saving a graph:
- Pydantic models (
input_schema/output_schema) are NOT serialized (exclude=True) - JSON Schema (
input_schema_json/output_schema_json) is serialized
# When creating an agent with a Pydantic model
agent = AgentNodeSchema(
id="solver",
input_schema=SolverInput, # Not serialized
output_schema=SolverOutput, # Not serialized
)
# JSON Schema is extracted automatically
print(agent.input_schema_json) # {'type': 'object', 'properties': {...}}
print(agent.output_schema_json) # {'type': 'object', 'properties': {...}}
# When deserializing a graph from JSON
# Pydantic models are lost, but JSON Schema remains
# Validation works via basic type checks
When should you use input/output schemas?
| Scenario | Recommendation |
|---|---|
| Structured data | β Use Pydantic schemas |
| JSON outputs from an LLM | β Required! Parsing and validation |
| Free-form text | β Not needed |
| API integration | β Guarantees correct data |
| Debugging | β Quickly surfaces issues |
Performance impact
- β Validation does not consume tokens β it is pure Python
- β οΈ Prompt instructions consume tokens β embedding JSON Schema into prompts increases token usage
- β‘ Validation is fast β Pydantic is optimized for speed
Validation FAQ
Q: Is this required? A: No, it is fully optional. If schemas are not set, validation is skipped.
Q: What if the LLM cannot respond in the required format?
A: validate_output() returns valid=False plus errors. Options: retry/fallback/ignore.
Q: Can I pass plain JSON Schema? A: Yes. Pass a dict with JSON Schema instead of a Pydantic model.
Q: Does token usage increase? A: Validation does not consume tokens. But including JSON Schema in prompts does increase token usage.
Builder API (Detailed)
Different ways to construct graphs.
1. build_property_graph (quick construction)
from builder import build_property_graph
graph = build_property_graph(
agents=[agent1, agent2, agent3],
workflow_edges=[("agent1", "agent2"), ("agent2", "agent3")],
context_edges=[("agent1", "agent3")], # Additional connections
query="Solve this task",
include_task_node=True, # Add a task node
task_node_id="__task__", # Task node ID
connect_task_to_all=False, # Connect task to all agents
edge_weights=None, # Custom edge weights
default_weight=1.0, # Default weight
bidirectional=False, # Bidirectional edges
encoder=None, # NodeEncoder for embeddings
compute_embeddings=False, # Compute embeddings immediately
)
2. GraphBuilder (fluent API)
from builder import GraphBuilder
builder = GraphBuilder()
# Add agents (basic)
builder.add_agent(
agent_id="researcher",
display_name="Researcher",
description="Does research",
tools=["search", "read"],
)
# Add an agent with multi-model configuration
builder.add_agent(
agent_id="analyst",
display_name="Senior Analyst",
persona="Expert data analyst",
# LLM configuration
llm_backbone="gpt-4", # Model name
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY", # Or $ENV_VAR
temperature=0.7,
max_tokens=2000,
timeout=60.0,
top_p=0.9,
stop_sequences=["END", "STOP"],
)
# Or via an LLMConfig object
from core.schema import LLMConfig
llm_config = LLMConfig(
model_name="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.7,
max_tokens=2000,
)
builder.add_agent(
agent_id="writer",
display_name="Writer",
llm_config=llm_config, # Pass a ready configuration
)
# Add edges
builder.add_workflow_edge("researcher", "writer", weight=0.9)
builder.add_context_edge("researcher", "writer", weight=0.5)
# Add a task
builder.set_task(query="Write a report", description="Main task")
# Conditional edges
def quality_check(state: dict) -> bool:
return state.get("quality_score", 0) > 0.8
builder.add_conditional_edge(
source="writer",
target="editor",
condition=quality_check,
weight=0.9,
)
# Set execution bounds (new!)
builder.set_start_node("researcher") # Start node
builder.set_end_node("writer") # End node
# Or both at once:
builder.set_execution_bounds("researcher", "writer")
# Build the graph
graph = builder.build(compute_embeddings=True, encoder=my_encoder)
# Validate before building
is_valid, errors = builder.validate()
if not is_valid:
print(f"Errors: {errors}")
3. build_from_adjacency (from a matrix)
from builder import build_from_adjacency
import torch
adjacency = torch.tensor([
[0, 1, 0],
[0, 0, 1],
[0, 0, 0],
], dtype=torch.float32)
graph = build_from_adjacency(
adjacency_matrix=adjacency,
agents=[agent1, agent2, agent3],
query="Task",
threshold=0.1, # Ignore edges with weight < threshold
)
4. build_from_schema (from a schema)
from builder import build_from_schema
graph = build_from_schema(
schema=my_schema,
compute_embeddings=True,
encoder=my_encoder,
validate=True, # Validate before building
)
Event System
Subscribe to events for monitoring and debugging.
from core.events import (
EventBus,
global_event_bus,
EventType,
LoggingEventHandler,
MetricsEventHandler,
on_event,
# Events
NodeAddedEvent,
EdgeAddedEvent,
StepCompletedEvent,
BudgetWarningEvent,
)
# Get the global event bus
bus = global_event_bus()
# 1. Subscribe via a handler
logging_handler = LoggingEventHandler(
log_level="INFO",
include_metadata=True,
)
bus.subscribe(EventType.STEP_COMPLETED, logging_handler)
# 2. Subscribe via a function
def on_step_completed(event):
if isinstance(event, StepCompletedEvent):
print(f"Agent {event.agent_id} completed: {event.tokens_used} tokens")
bus.subscribe(EventType.STEP_COMPLETED, on_step_completed)
# 3. Subscribe via a decorator
@on_event(EventType.BUDGET_WARNING)
def handle_budget_warning(event: BudgetWarningEvent):
print(f"β οΈ Budget warning: {event.budget_type} at {event.ratio:.1%}")
# 4. Global subscription (all events)
@on_event(None)
def handle_all_events(event):
print(f"Event: {event.event_type.value}")
# Disable event handling
bus.disable()
# Enable
bus.enable()
# Clear all handlers
bus.clear()
# Aggregate metrics via events
metrics_handler = MetricsEventHandler()
bus.subscribe(None, metrics_handler)
# After execution
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Errors: {metrics['errors_count']}")
print(f"Budget warnings: {metrics['budget_warnings']}")
Callback system
Monitoring and logging execution via callback handlers.
Core concepts
BaseCallbackHandlerβ base class for creating callback handlersAsyncCallbackHandlerβ async version for asynchronous operationsCallbackManagerβ manager that orchestrates and invokes handlers- Built-in handlers β StdoutCallbackHandler, MetricsCallbackHandler, FileCallbackHandler
Quick start
from execution import MACPRunner
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
FileCallbackHandler,
)
# 1. Callbacks via RunnerConfig
from execution import RunnerConfig
config = RunnerConfig(
callbacks=[
StdoutCallbackHandler(show_outputs=True),
MetricsCallbackHandler(),
]
)
runner = MACPRunner(llm_caller=my_llm, config=config)
result = runner.run_round(graph)
# 2. Per-run callbacks (override config)
result = runner.run_round(
graph,
callbacks=[FileCallbackHandler("execution_log.jsonl")]
)
Context Manager
from callbacks import collect_metrics, trace_as_callback
# 1. Collect metrics
with collect_metrics() as metrics:
runner.run_round(graph)
print(f"Total tokens: {metrics.total_tokens}")
print(f"Total duration: {metrics.total_duration_ms}ms")
print(f"Runs completed: {metrics.runs_completed}")
print(f"Runs failed: {metrics.runs_failed}")
# Full statistics
all_metrics = metrics.get_metrics()
print(f"Agent calls: {all_metrics['agent_calls']}")
print(f"Errors: {all_metrics['errors_count']}")
# 2. Tracing with arbitrary handlers
from callbacks import StdoutCallbackHandler
with trace_as_callback(handlers=[StdoutCallbackHandler()]) as manager:
runner.run_round(graph)
# Callbacks are automatically applied to this run
Creating your own CallbackHandler
from callbacks import BaseCallbackHandler
from uuid import UUID
class MySlackAlertHandler(BaseCallbackHandler):
"""Sends Slack alerts on errors."""
def on_run_start(
self,
*,
run_id: UUID,
query: str,
num_agents: int = 0,
**kwargs,
) -> None:
send_slack(f"π Started run {run_id}: {num_agents} agents")
def on_agent_end(
self,
*,
run_id: UUID,
agent_id: str,
output: str,
tokens_used: int = 0,
duration_ms: float = 0.0,
**kwargs,
) -> None:
print(f"β
Agent {agent_id}: {tokens_used} tokens, {duration_ms:.0f}ms")
def on_agent_error(
self,
error: BaseException,
*,
run_id: UUID,
agent_id: str,
**kwargs,
) -> None:
send_slack_alert(
f"β Agent {agent_id} failed in run {run_id}: {error}",
severity="high"
)
def on_run_end(
self,
*,
run_id: UUID,
output: str,
success: bool = True,
total_tokens: int = 0,
**kwargs,
) -> None:
if not success:
send_slack_alert(f"π Run {run_id} failed!")
else:
send_slack(f"β
Run {run_id} completed: {total_tokens} tokens")
# Usage
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[MySlackAlertHandler()])
)
Async Callbacks
from callbacks import AsyncCallbackHandler
import aiohttp
class AsyncWebhookHandler(AsyncCallbackHandler):
"""Asynchronously sends a webhook on events."""
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def on_run_start(
self,
*,
run_id: UUID,
query: str,
**kwargs,
) -> None:
async with aiohttp.ClientSession() as session:
await session.post(
self.webhook_url,
json={"event": "run_start", "run_id": str(run_id), "query": query}
)
async def on_agent_end(
self,
*,
run_id: UUID,
agent_id: str,
output: str,
tokens_used: int = 0,
**kwargs,
) -> None:
async with aiohttp.ClientSession() as session:
await session.post(
self.webhook_url,
json={
"event": "agent_end",
"run_id": str(run_id),
"agent_id": agent_id,
"tokens": tokens_used,
}
)
# Usage with async runner
runner = MACPRunner(
async_llm_caller=my_async_llm,
config=RunnerConfig(callbacks=[AsyncWebhookHandler("https://api.example.com/webhook")])
)
result = await runner.arun_round(graph)
Built-in handlers
1. StdoutCallbackHandler β console output
from callbacks import StdoutCallbackHandler
handler = StdoutCallbackHandler(
color=True, # Colored output
show_prompts=False, # Show prompts
show_outputs=True, # Show agent outputs
truncate_length=200, # Output truncation length
)
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[handler])
)
# Output example:
# π Run started: 5 agents
# Order: researcher β analyst β writer β editor β publisher
# βΆοΈ [0] Researcher started
# π οΈ Tool 'web_search.search' started with args: {query: "market analysis"}
# β
Success Tool 'web_search.search' ended (1200ms, 3500 chars)
# β
[0] Researcher completed: 150 tokens, 1200ms
# Output: Market analysis shows strong growth...
# βΆοΈ [1] Analyst started
# β
[1] Analyst completed: 200 tokens, 1500ms [FINAL]
# β
Run completed: 350 tokens, 2700ms
2. MetricsCallbackHandler β metrics aggregation
from callbacks import MetricsCallbackHandler
metrics_handler = MetricsCallbackHandler()
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[metrics_handler])
)
result = runner.run_round(graph)
# Retrieve metrics
metrics = metrics_handler.get_metrics()
print(f"Total tokens: {metrics['total_tokens']}")
print(f"Total duration: {metrics['total_duration_ms']}ms")
print(f"Agent calls: {metrics['agent_calls']}") # {'researcher': 1, 'writer': 1, ...}
print(f"Agent tokens: {metrics['agent_tokens']}") # {'researcher': 150, ...}
print(f"Errors: {metrics['errors_count']}")
print(f"Retries: {metrics['retries']}")
print(f"Budget warnings: {metrics['budget_warnings']}")
print(f"Runs completed: {metrics['runs_completed']}")
# Averages
print(f"Avg tokens per agent: {metrics['avg_tokens_per_agent']}")
# Tool metrics (WebSearchTool and other tools)
print(f"Tool calls: {metrics['tool_calls']}") # {'web_search.search': 3, 'web_search.fetch': 1}
print(f"Tool durations: {metrics['tool_durations']}") # {'web_search.search': 3600.0, ...}
print(f"Tool errors: {metrics['tool_errors_count']}") # 0
# Last 10 errors
for error in metrics['errors']:
print(f"Error in {error['agent_id']}: {error['error_message']}")
# Last 10 tool errors
for error in metrics['tool_errors']:
print(f"Tool error: {error['tool_name']}.{error['action']}: {error['error_message']}")
# Reset metrics
metrics_handler.reset()
3. FileCallbackHandler β write to a JSON Lines file
from callbacks import FileCallbackHandler
handler = FileCallbackHandler(
file_path="execution_log.jsonl",
append=True, # Append or overwrite
flush_every=1, # Flush after each event
)
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[handler])
)
result = runner.run_round(graph)
# Close the file manually (or it is closed automatically via __del__)
handler.close()
# File format (JSON Lines):
# {"event_type": "run_start", "timestamp": "2024-...", "run_id": "...", "query": "...", "num_agents": 5}
# {"event_type": "agent_start", "timestamp": "...", "run_id": "...", "agent_id": "researcher", ...}
# {"event_type": "agent_end", "timestamp": "...", "run_id": "...", "agent_id": "researcher", "tokens_used": 150, ...}
Available callback methods
| Method | Description | Parameters |
|---|---|---|
on_run_start |
Run start | run_id, query, num_agents, execution_order |
on_run_end |
Run end | run_id, output, success, error, total_tokens, total_time_ms, executed_agents |
on_agent_start |
Agent started | run_id, agent_id, agent_name, step_index, prompt, predecessors |
on_agent_end |
Agent finished | run_id, agent_id, output, tokens_used, duration_ms, is_final |
on_agent_error |
Agent error | error, run_id, agent_id, error_type, will_retry, attempt |
on_retry |
Retry attempt | run_id, agent_id, attempt, max_attempts, delay_ms, error |
on_llm_new_token |
New token (streaming) | token, run_id, agent_id, token_index, is_first, is_last |
on_plan_created |
Plan created | run_id, num_steps, execution_order |
on_topology_changed |
Topology changed | run_id, reason, old_remaining, new_remaining, change_count |
on_prune |
Agent pruned | run_id, agent_id, reason |
on_fallback |
Fallback activated | run_id, failed_agent_id, fallback_agent_id, reason |
on_parallel_start |
Parallel group start | run_id, agent_ids, group_index |
on_parallel_end |
Parallel group end | run_id, agent_ids, successful, failed |
on_memory_read |
Memory read | run_id, agent_id, entries_count, keys |
on_memory_write |
Memory write | run_id, agent_id, key, value_size |
on_budget_warning |
Budget warning | run_id, budget_type, current, limit, ratio |
on_budget_exceeded |
Budget exceeded | run_id, budget_type, current, limit, action_taken |
on_tool_start |
Tool started | run_id, tool_name, action, arguments |
on_tool_end |
Tool finished | run_id, tool_name, action, success, duration_ms, output_size, result_summary |
on_tool_error |
Tool error | run_id, tool_name, action, error_type, error_message |
Tool Callback Events
Tools emit events via the callback system. This lets you monitor all tool actions without direct logging.
Event types:
| Event | Class | Description |
|---|---|---|
TOOL_START |
ToolStartEvent |
Tool action started |
TOOL_END |
ToolEndEvent |
Tool action successfully completed |
TOOL_ERROR |
ToolErrorEvent |
Tool action failed |
Example: handling tool events
from callbacks import BaseCallbackHandler, CallbackManager
from tools import WebSearchTool
from uuid import UUID
class ToolMonitorHandler(BaseCallbackHandler):
"""Monitor all tool actions."""
def on_tool_start(
self,
*,
run_id: UUID,
tool_name: str,
action: str,
arguments: dict,
**kwargs,
) -> None:
print(f"[TOOL] {tool_name}.{action} started with {arguments}")
def on_tool_end(
self,
*,
run_id: UUID,
tool_name: str,
action: str,
success: bool = True,
duration_ms: float = 0.0,
output_size: int = 0,
result_summary: str = "",
**kwargs,
) -> None:
status = "OK" if success else "FAIL"
print(f"[TOOL] {tool_name}.{action} {status} ({duration_ms:.0f}ms, {output_size} chars)")
def on_tool_error(
self,
error: BaseException = None,
*,
run_id: UUID,
tool_name: str,
action: str,
error_type: str = "",
error_message: str = "",
**kwargs,
) -> None:
print(f"[TOOL ERROR] {tool_name}.{action}: {error_type} - {error_message}")
# Usage
cb = CallbackManager(handlers=[ToolMonitorHandler()])
tool = WebSearchTool(callback_manager=cb)
tool.execute(query="Python tutorials")
# [TOOL] web_search.search started with {'query': 'Python tutorials'}
# [TOOL] web_search.search OK (1200ms, 3500 chars)
Built-in handlers already support tool events:
StdoutCallbackHandlerβ prints tool events to console with emojiMetricsCallbackHandlerβ collects metrics for tool_calls, tool_durations, tool_errors
Ignore flags
You can disable specific event types:
class MyMinimalHandler(BaseCallbackHandler):
# Ignore most events
ignore_llm = True # Do not call on_llm_new_token
ignore_retry = True # Do not call on_retry
ignore_budget = True # Do not call on_budget_*
ignore_memory = True # Do not call on_memory_*
ignore_tool = True # Do not call on_tool_start/end/error
# Handle only errors
def on_agent_error(self, error, *, run_id, agent_id, **kwargs):
log_critical_error(agent_id, error)
Combining handlers
from callbacks import (
StdoutCallbackHandler,
MetricsCallbackHandler,
FileCallbackHandler,
)
# You can use multiple handlers at the same time
runner = MACPRunner(
llm_caller=my_llm,
config=RunnerConfig(callbacks=[
StdoutCallbackHandler(show_outputs=False), # Only status to console
MetricsCallbackHandler(), # Metrics collection
FileCallbackHandler("debug.jsonl"), # Full log to file
MySlackAlertHandler(), # Slack alerts
])
)
State Storage
Persistent storage for node states.
from utils.state_storage import (
InMemoryStateStorage,
FileStateStorage,
)
# 1. In-memory storage
storage = InMemoryStateStorage()
storage.save("agent_id", {"messages": [...], "context": {...}})
state = storage.load("agent_id")
storage.delete("agent_id")
all_keys = storage.keys()
storage.clear()
# 2. File-based storage
storage = FileStateStorage(directory="./agent_states")
storage.save("researcher", {
"messages": [{"role": "user", "content": "Hello"}],
"iteration": 5,
})
state = storage.load("researcher")
if state:
print(f"Iteration: {state['iteration']}")
storage.delete("researcher")
# Get all stored IDs
all_agent_ids = storage.keys()
# Clear all states
storage.clear()
Async Utils
Helper functions for asynchronous execution.
from utils.async_utils import (
run_sync,
gather_with_concurrency,
timeout_wrapper,
)
# 1. Run a coroutine synchronously
async def my_async_function():
return "result"
result = run_sync(my_async_function(), context="my_context")
# 2. Parallel execution with a concurrency limit
async def fetch_data(agent_id: str):
# ... async call ...
return response
async def main():
tasks = [fetch_data(f"agent_{i}") for i in range(20)]
# Run no more than 5 at once
results = await gather_with_concurrency(5, *tasks)
return results
# 3. Timeouts
async def slow_operation():
await asyncio.sleep(10)
return "done"
async def main():
try:
result = await timeout_wrapper(
slow_operation(),
timeout=5.0,
error_message="Operation took too long",
)
except TimeoutError as e:
print(f"Timeout: {e}")
Conditional Routing
Dynamic selection of the next agent based on conditions.
from core.graph import ConditionalEdge
from execution.scheduler import ConditionContext, ConditionEvaluator
# 1. Define conditional edges
def quality_above_threshold(context: ConditionContext) -> bool:
"""Go to editor only if quality > 0.8"""
quality = context.state.get("quality_score", 0)
return quality > 0.8
def has_errors(context: ConditionContext) -> bool:
"""Go to fixer if there are errors"""
return "errors" in context.state and len(context.state["errors"]) > 0
# Add conditional edges to the graph
graph.add_conditional_edge(
source="writer",
targets={
"editor": quality_above_threshold,
"fixer": has_errors,
},
default="reviewer", # Fallback if no condition matches
)
# 2. Use via the builder
from builder import GraphBuilder
builder = GraphBuilder()
builder.add_agent(agent_id="writer", display_name="Writer")
builder.add_agent(agent_id="editor", display_name="Editor")
builder.add_agent(agent_id="fixer", display_name="Fixer")
builder.add_conditional_edge(
source="writer",
target="editor",
condition=quality_above_threshold,
weight=0.9,
)
builder.add_conditional_edge(
source="writer",
target="fixer",
condition=has_errors,
weight=0.7,
)
graph = builder.build()
# 3. Evaluate conditions at runtime
evaluator = ConditionEvaluator()
context = ConditionContext(
current_node="writer",
state={"quality_score": 0.85, "errors": []},
history=["researcher", "writer"],
metadata={"iteration": 1},
)
# Evaluate a single condition
if evaluator.evaluate(quality_above_threshold, context):
next_node = "editor"
# Evaluate all conditions for a node
next_nodes = evaluator.evaluate_all(graph, "writer", context)
print(f"Next nodes: {next_nodes}")
Agent Tools (Tools)
The tools module allows agents to use external tools via Native Function Calling.
Key principle: If an agent has tools specified, they are ALWAYS used automatically on every LLM call.
Built-in tools:
shellβ execute shell commandscode_interpreterβ execute Python code in a sandboxfile_searchβ search files and their contentsweb_searchβ search the web (DuckDuckGo, Serper, Tavily) + Selenium browser for dynamic pagesfunction_callingβ call custom functions
Quick start
from builder import GraphBuilder
from execution import MACPRunner
from tools import tool, OpenAIToolsCaller
from openai import OpenAI
# 1. Register tools via the @tool decorator
@tool
def fibonacci(n: int) -> str:
"""Calculate the n-th Fibonacci number."""
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return str(a)
@tool
def is_prime(n: int) -> str:
"""Check if a number is prime."""
if n < 2:
return "False"
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return "False"
return "True"
# 2. Create an agent with tools
builder = GraphBuilder()
builder.add_agent(
agent_id="math",
display_name="Math Agent",
persona="a helpful math assistant",
tools=["fibonacci", "is_prime"], # <-- tools are specified here!
)
builder.add_task(query="Calculate fibonacci(20) and check if it's prime")
builder.connect_task_to_agents(agent_ids=["math"])
# 3. Create caller and runner
client = OpenAI(api_key="...")
caller = OpenAIToolsCaller(client, model="gpt-4")
runner = MACPRunner(llm_caller=caller)
# 4. Run β tools are used AUTOMATICALLY
result = runner.run_round(builder.build())
print(result.final_answer)
Important:
- Tools are set when creating an agent via the
toolsparameter - Runner automatically passes tools to the LLM via the API
- No
enable_toolsflags are needed β it works automatically
Two ways to register tools
Method 1: Global @tool decorator (recommended)
from tools import tool
@tool
def calculate(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression))
@tool
def search_web(query: str) -> str:
"""Search the web for information."""
return f"Results for: {query}"
Method 2: Via ToolRegistry
from tools import ToolRegistry, get_registry
# Global registry
registry = get_registry()
@registry.function
def my_tool(arg: str) -> str:
"""Description for the LLM."""
return arg.upper()
# Or create your own registry
my_registry = ToolRegistry()
@my_registry.function
def custom_tool(x: int) -> str:
return str(x * 2)
Passing tools as objects
You can pass BaseTool objects directly into AgentProfile:
from core.agent import AgentProfile
from tools import CodeInterpreterTool, ShellTool
# Create an agent with tool objects
agent = AgentProfile(
agent_id="coder",
display_name="Code Agent",
persona="a Python programmer",
tools=[CodeInterpreterTool(timeout=10), ShellTool()], # <-- objects!
)
# Add to the graph
builder = GraphBuilder()
builder.add_agent_profile(agent)
Supported tools
| Tool | Description |
|---|---|
shell |
Execute shell commands |
function_calling |
Call registered Python functions (grouped) |
code_interpreter |
Execute Python code in a sandbox |
file_search |
Search files and file contents in directories |
Base classes
from tools import (
BaseTool, # Abstract base class for tools
ToolCall, # A tool-call request (parsed from LLM output)
ToolResult, # Tool execution result
ToolRegistry, # Tool registry
ShellTool, # Tool for shell commands
FunctionTool, # Tool for calling (grouped) functions
CodeInterpreterTool, # Tool for executing Python code
FileSearchTool, # Tool for searching files
)
ShellTool β executing shell commands
from tools import ShellTool, ToolRegistry
# Create a ShellTool with safety settings
shell_tool = ShellTool(
timeout=30, # Timeout in seconds
max_output_size=8192, # Max output size
working_dir="/path/to/dir", # Working directory (optional)
allowed_commands=["echo", "ls", "pwd"], # Command allowlist (optional)
)
# Register in a registry
registry = ToolRegistry()
registry.register(shell_tool)
# Execute directly
result = shell_tool.execute(command="echo Hello World")
print(result.success) # True
print(result.output) # "Hello World"
# Or via the registry
from tools import ToolCall
call = ToolCall(name="shell", arguments={"command": "ls -la"})
result = registry.execute(call)
FunctionTool β calling custom functions
from tools import FunctionTool, ToolRegistry
# Create a FunctionTool
func_tool = FunctionTool()
# Register functions via decorator
@func_tool.register
def calculate(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression))
@func_tool.register
def uppercase(text: str) -> str:
"""Convert text to uppercase."""
return text.upper()
@func_tool.register(name="word_count", description="Count words in text")
def count_words(text: str) -> int:
"""Count words."""
return len(text.split())
# Register in the registry
registry = ToolRegistry()
registry.register(func_tool)
# Call a function
result = func_tool.execute(function="calculate", expression="2 ** 10")
print(result.output) # "1024"
# List registered functions
print(func_tool.list_functions()) # ['calculate', 'uppercase', 'word_count']
Two ways to register functions
There are two ways to register functions as tools:
Method 1: Via FunctionTool (grouped functions)
Functions are grouped under a single tool named function_calling. The LLM must call them like this:
{"name": "function_calling", "arguments": {"function": "calculate", "expression": "2+2"}}
func_tool = FunctionTool()
@func_tool.register
def calculate(expression: str) -> str:
return str(eval(expression))
registry.register(func_tool)
Method 2: Via @registry.function (separate tools) β RECOMMENDED
Each function becomes a separate tool. The LLM calls them directly:
{"name": "calculate", "arguments": {"expression": "2+2"}}
@registry.function
def calculate(expression: str) -> str:
return str(eval(expression))
@registry.function
def fibonacci(n: int) -> str:
"""Calculate the n-th Fibonacci number."""
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return str(a)
Recommendation: Use @registry.function β it is simpler for the LLM and avoids confusion with nested arguments.
CodeInterpreterTool β executing Python code
Allows agents to execute arbitrary Python code in a safe sandbox environment.
from tools import CodeInterpreterTool, ToolRegistry, ToolCall
# Create a CodeInterpreterTool
code_tool = CodeInterpreterTool(
timeout=30, # Execution timeout in seconds
max_output_size=8192, # Maximum output size
safe_mode=True, # Restricted builtins for safety
)
# Register
registry = ToolRegistry()
registry.register(code_tool)
# Example 1: Simple computation
result = code_tool.execute(code="2 ** 10 + sum(range(5))")
print(result.output) # "1034"
# Example 2: Multi-line code with functions
code = """
def fibonacci(n):
a, b = 0, 1
for _ in range(n):
a, b = b, a + b
return a
for i in range(10):
print(f"fib({i}) = {fibonacci(i)}")
"""
result = code_tool.execute(code=code)
print(result.output)
# fib(0) = 0
# fib(1) = 1
# fib(2) = 1
# ...
# Example 3: Using preloaded modules
# Available in sandbox: math, statistics, json, re, datetime,
# collections, itertools, functools, random
result = code_tool.execute(code="""
# Modules are already loaded; no import needed
print(f"pi = {math.pi:.6f}")
print(f"e = {math.e:.6f}")
data = {"name": "Alice", "age": 30}
print(json.dumps(data, indent=2))
""")
print(result.output)
# Example 4: Error handling
result = code_tool.execute(code="1 / 0")
print(result.success) # False
print(result.error) # "ZeroDivisionError: division by zero"
Safety:
- With
safe_mode=True, built-in functions are restricted - Forbidden:
open,exec,eval,__import__,compile - Only safe modules are available
- Timeout prevents infinite loops
FileSearchTool β searching files and contents
Allows agents to search files by name, search text within files, and read file contents.
from tools import FileSearchTool, ToolRegistry, ToolCall
# Create a FileSearchTool
file_tool = FileSearchTool(
base_directory="./project", # Base directory to search within
max_results=50, # Maximum number of results
max_depth=10, # Maximum recursion depth
max_file_size=100_000, # Max file size for content search
max_read_size=10_000, # Max size for reading a file
allowed_extensions=[".py", ".txt", ".md"], # Allowed extensions (optional)
)
registry = ToolRegistry()
registry.register(file_tool)
# Example 1: Find all Python files
result = file_tool.execute(pattern="*.py")
print(result.output)
# Found 15 file(s) matching '*.py':
# src/main.py (1,234 bytes)
# src/utils.py (567 bytes)
# ...
# Example 2: Search in a specific directory
result = file_tool.execute(pattern="test_*.py", directory="tests")
print(result.output)
# Example 3: Search within file contents
result = file_tool.execute(pattern="*.py", query="def main")
print(result.output)
# Search results for 'def main' in 15 file(s):
# Found 3 match(es).
# === src/main.py ===
# 42: def main():
# === src/cli.py ===
# 15: def main_entry():
# ...
# Example 4: Regex search
result = file_tool.execute(pattern="*.py", query=r"def \w+_handler", regex=True)
# Example 5: Read a specific file
result = file_tool.execute(read_file="src/config.py")
print(result.output)
# === src/config.py ===
# """Configuration module."""
# import os
# ...
# Example 6: Via ToolCall (how the LLM calls it)
call = ToolCall(
name="file_search",
arguments={"pattern": "*.py", "query": "class Agent"}
)
result = registry.execute(call)
Safety:
- Cannot escape outside
base_directory - Hidden files and directories (starting with
.) are skipped - File size limits prevent reading huge files
WebSearchTool β searching, reading, and interacting with web pages
A tool for working with the internet: search (DuckDuckGo/Serper/Tavily), fetching pages, and full interaction via Selenium (clicks, forms, JS, crawl).
Install Selenium (optional):
pip install selenium webdriver-manager
Quick start
Method 1 β dict config (recommended):
from builder import GraphBuilder
from execution import MACPRunner
builder = GraphBuilder()
builder.add_agent(
"researcher",
persona="research assistant",
# Dict config β tool is created automatically with the desired parameters
tools=[{"name": "web_search", "use_selenium": True, "fetch_content": True}],
)
builder.add_task(query="Find information about Python 3.12")
builder.connect_task_to_agents(agent_ids=["researcher"])
graph = builder.build()
runner = MACPRunner(llm_caller=my_caller)
result = runner.run_round(graph)
Method 2 β registry registration:
from tools import WebSearchTool, get_registry
registry = get_registry()
registry.register(WebSearchTool(use_selenium=True, fetch_content=True))
# Agent references it by name
builder.add_agent("researcher", tools=["web_search"])
Method 3 β pass the object directly:
from tools import WebSearchTool
builder.add_agent(
"researcher",
tools=[WebSearchTool(use_selenium=True)],
)
Dict config parameters
tools=[{
"name": "web_search",
# All WebSearchTool constructor parameters:
"use_selenium": True,
"fetch_content": True,
"max_results": 5,
"timeout": 15,
"max_content_length": 4000,
"selenium_config": {
"headless": True,
"browser": "edge", # "chrome", "firefox", "edge"
"extra_wait": 1.0,
"disable_images": True,
"page_load_timeout": 30,
},
# Provider by string:
# "provider": "serper", # "duckduckgo", "serper", "tavily"
# "api_key": "...",
}]
The browser is detected automatically. If webdriver-manager cannot download a driver (no internet, SSL error), a system driver is used.
Actions (the action parameter)
action is a command that defines what to do. All actions run within the same browser session.
| action | Description | Required parameters |
|---|---|---|
search |
Web search | query |
fetch |
Open and read a page | url |
click |
Click an element | selector |
fill |
Fill an input | selector, value |
extract_links |
Extract links from a page | β |
execute_js |
Execute JavaScript | js_code |
crawl |
Recursive site crawl | url |
get_content |
Text of the current page | β |
search and fetch work without Selenium. The rest require use_selenium=True.
If action is not provided, it is inferred automatically: query β search, url β fetch, selector β click, js_code β execute_js.
Action examples
from tools import WebSearchTool
with WebSearchTool(use_selenium=True) as tool:
# Search
result = tool.execute(action="search", query="Python tutorials")
# Fetch a page (wait for an element)
result = tool.execute(action="fetch", url="https://example.com", wait_for_selector="h1")
# Click
result = tool.execute(action="click", selector="a.nav-link")
# Fill a form and submit
result = tool.execute(action="fill", selector="input[name=q]", value="Python", submit=True)
# Extract links
result = tool.execute(action="extract_links", url="https://example.com")
# Execute JS
result = tool.execute(action="execute_js", js_code="return document.title")
# Crawl
result = tool.execute(action="crawl", url="https://docs.python.org", max_depth=2, max_pages=5)
# Current page text
result = tool.execute(action="get_content")
Search providers
| Provider | API key | Description |
|---|---|---|
DuckDuckGoProvider |
No | Default, free |
SerperProvider |
Yes (serper.dev) | Google Search |
TavilyProvider |
Yes (tavily.com) | With AI summarization |
# Via dict config
tools=[{"name": "web_search", "provider": "tavily", "api_key": "tvly-..."}]
# Or directly
from tools import WebSearchTool, TavilyProvider
tool = WebSearchTool(provider=TavilyProvider(api_key="tvly-..."))
Custom provider:
from tools import WebSearchTool, SearchProvider
class MyProvider(SearchProvider):
def search(self, query: str, max_results: int = 5) -> list[dict[str, str]]:
return [{"title": "Result", "url": "https://example.com", "snippet": query}]
tool = WebSearchTool(provider=MyProvider())
Constructor parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
provider |
SearchProvider | None |
DuckDuckGoProvider |
Search provider |
max_results |
int |
5 |
Max search results |
max_content_length |
int |
4000 |
Max page content length |
fetch_content |
bool |
False |
Fetch page contents during search |
timeout |
int |
15 |
Request timeout (sec) |
use_selenium |
bool |
False |
Use Selenium |
selenium_config |
dict | None |
None |
Selenium settings (headless, browser, extra_wait, etc.) |
selenium_fetcher |
SeleniumFetcher | None |
None |
A pre-built SeleniumFetcher instance |
callback_manager |
CallbackManager | None |
None |
For events (if None β taken from context) |
execute() parameters
| Parameter | Type | Description |
|---|---|---|
action |
str |
Action (see table above). Auto-inferred if omitted |
query |
str |
Search query |
url |
str |
Page URL |
selector |
str |
CSS selector |
value |
str |
Value for fill |
submit |
bool |
Submit the form (default: False) |
js_code |
str |
JavaScript code |
max_pages |
int |
Max pages for crawl (default: 10) |
max_depth |
int |
Max crawl depth (default: 2) |
url_filter |
str |
Regex filter for crawl URLs |
fetch_content |
bool |
Fetch contents (for search) |
max_results |
int |
Max results (for search) |
wait_for_selector |
str |
CSS selector to wait for page readiness |
Callback integration
WebSearchTool emits on_tool_start/on_tool_end/on_tool_error events via the callback system:
from callbacks import CallbackManager, StdoutCallbackHandler
from tools import WebSearchTool
cb = CallbackManager(handlers=[StdoutCallbackHandler()])
tool = WebSearchTool(callback_manager=cb, use_selenium=True)
tool.execute(action="fetch", url="https://example.com")
# π οΈ Tool 'web_search.fetch' started
# β
Tool 'web_search.fetch' ended (1200ms)
Notes
- Two modes:
urllib(no dependencies) and Selenium (full browser) - Browsers: Chrome, Firefox, Edge (automatic fallback to system driver)
- Context manager:
with WebSearchTool(...) as tool:β auto-closes the browser - Built-in HTML parser without external dependencies
create_tool_from_config()β build from dict config for agent integration
ToolRegistry β tool registry
from tools import ToolRegistry, ShellTool, FunctionTool
# Create a registry
registry = ToolRegistry()
# Register tools
registry.register(ShellTool(timeout=10))
registry.register(FunctionTool())
# Register functions via the registry decorator (convenient)
@registry.function
def greet(name: str) -> str:
"""Greeting."""
return f"Hello, {name}!"
@registry.function(name="add", description="Add two numbers")
def add_numbers(a: int, b: int) -> int:
return a + b
# Check tool presence
print(registry.has("shell")) # True
print(registry.has("greet")) # True
# List tools
print(registry.list_tools()) # ['shell', 'function_calling', 'greet', 'add']
# Get tools for an agent
tools = registry.get_tools_for_agent(["shell", "greet"])
print([t.name for t in tools]) # ['shell', 'greet']
# Format a prompt with tool descriptions
prompt = registry.format_tools_prompt(["shell", "greet"])
print(prompt)
# Available tools:
# - shell: Execute a shell command...
# - greet: Greeting.
# To use a tool, format your response as:
# <tool_call>{"name": "tool_name", "arguments": {...}}</tool_call>
Parsing tool_call from an LLM response
An agent can call a tool by including a special tag in its response:
from tools import ToolCall
# LLM returns a response with tool calls
llm_response = """
I need to compute the result.
<tool_call>
{"name": "calculate", "arguments": {"expression": "2 + 2"}}
</tool_call>
And also check the directory:
<tool_call>
{"name": "shell", "arguments": {"command": "ls"}}
</tool_call>
"""
# Parse all calls
calls = ToolCall.parse_from_response(llm_response)
print(len(calls)) # 2
print(calls[0].name) # "calculate"
print(calls[0].arguments) # {"expression": "2 + 2"}
# Execute all calls
results = registry.execute_all(calls)
for result in results:
print(f"{result.tool_name}: {result.output if result.success else result.error}")
Integration with MACPRunner
Tools are used automatically β it is enough to specify them when creating the agent.
from execution import MACPRunner, RunnerConfig
from builder import GraphBuilder
from tools import (
tool, get_registry, register_tool,
ShellTool, CodeInterpreterTool, FileSearchTool,
OpenAIToolsCaller,
)
from openai import OpenAI
# 1. Register built-in tools
register_tool(ShellTool(timeout=10))
register_tool(CodeInterpreterTool(timeout=10, safe_mode=True))
register_tool(FileSearchTool(base_directory="."))
# Register custom functions via @tool
@tool
def get_current_time() -> str:
"""Get current date and time."""
from datetime import datetime
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@tool
def calculate(expression: str) -> str:
"""Evaluate math expression safely."""
return str(eval(expression, {"__builtins__": {}}, {}))
# 2. Create a graph with agents
builder = GraphBuilder()
builder.add_agent(
"assistant",
display_name="AI Assistant",
persona="Helpful assistant who uses tools to solve problems.",
tools=["shell", "get_current_time"], # <-- tools are used automatically!
)
builder.add_agent(
"coder",
display_name="Python Coder",
persona="Python expert who writes and executes code.",
tools=["code_interpreter"],
)
builder.add_agent(
"calculator",
display_name="Calculator Agent",
persona="Math expert who calculates expressions.",
tools=["calculate"],
)
builder.add_workflow_edge("assistant", "calculator")
builder.add_task(query="What is 25 * 17 and what time is it?")
builder.connect_task_to_agents()
graph = builder.build()
# 3. Create caller and runner
client = OpenAI(api_key="...")
caller = OpenAIToolsCaller(client, model="gpt-4")
runner = MACPRunner(llm_caller=caller) # No extra configuration needed!
# 4. Execute β tools are used automatically
result = runner.run_round(graph)
print(result.final_answer)
Note: The max_tool_iterations parameter in RunnerConfig limits the number of tool-calling loops (default is 3).
Creating a custom tool
from tools import BaseTool, ToolResult
from typing import Any
class WeatherTool(BaseTool):
"""A tool for getting weather."""
@property
def name(self) -> str:
return "weather"
@property
def description(self) -> str:
return "Get current weather for a city"
@property
def parameters_schema(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name"
}
},
"required": ["city"]
}
def execute(self, city: str = "", **kwargs) -> ToolResult:
if not city:
return ToolResult(
tool_name=self.name,
success=False,
error="City is required"
)
# A real API call would go here
weather = f"Sunny, 22Β°C in {city}"
return ToolResult(
tool_name=self.name,
success=True,
output=weather
)
# Usage
registry = ToolRegistry()
registry.register(WeatherTool())
result = registry.execute(ToolCall(name="weather", arguments={"city": "Moscow"}))
print(result.output) # "Sunny, 22Β°C in Moscow"
Example: full workflow with tools
"""Full example of using tools in a multi-agent system."""
import math
from execution import MACPRunner, RunnerConfig
from builder import GraphBuilder
from tools import (
ToolRegistry,
ShellTool,
CodeInterpreterTool,
FileSearchTool,
)
# Configure tools
registry = ToolRegistry()
# Shell with allowlist
registry.register(ShellTool(
timeout=5,
allowed_commands=["echo", "date", "pwd", "ls"]
))
# Code interpreter to execute Python code
registry.register(CodeInterpreterTool(timeout=10, safe_mode=True))
# File search to find files
registry.register(FileSearchTool(base_directory=".", max_results=20))
# Math functions β register directly via @registry.function
# This allows the LLM to call them by name: {"name": "sqrt", "arguments": {"x": 144}}
@registry.function
def sqrt(x: float) -> float:
"""Calculate square root."""
return math.sqrt(x)
@registry.function
def power(base: float, exp: float) -> float:
"""Calculate base^exp."""
return math.pow(base, exp)
@registry.function
def factorial(n: int) -> int:
"""Calculate factorial."""
return math.factorial(n)
# Build the graph
builder = GraphBuilder()
builder.add_agent(
"math_solver",
persona="Expert mathematician",
tools=["sqrt", "power", "factorial"], # Direct access to functions
)
builder.add_agent(
"coder",
persona="Python developer",
tools=["code_interpreter"], # Execute Python code
)
builder.add_agent(
"researcher",
persona="Code researcher",
tools=["file_search"], # Search files
)
builder.add_agent(
"coordinator",
persona="Task coordinator that combines results",
tools=[], # No tools
)
builder.add_workflow_edge("math_solver", "coordinator")
builder.add_workflow_edge("coder", "coordinator")
builder.add_workflow_edge("researcher", "coordinator")
builder.add_task(query="Calculate sqrt(144), then write Python to verify")
builder.connect_task_to_agents()
graph = builder.build()
# Execute
def mock_llm(prompt: str) -> str:
if "mathematician" in prompt:
return '''I'll calculate the square root.
<tool_call>
{"name": "sqrt", "arguments": {"x": 144}}
</tool_call>
'''
elif "developer" in prompt:
return '''Let me verify with Python code.
<tool_call>
{"name": "code_interpreter", "arguments": {"code": "import math\\nprint(f'sqrt(144) = {math.sqrt(144)}')"}}
</tool_call>
'''
elif "researcher" in prompt:
return '''Let me find Python files.
<tool_call>
{"name": "file_search", "arguments": {"pattern": "*.py", "directory": "src"}}
</tool_call>
'''
else:
return "Based on the results: sqrt(144) = 12 and we're in the current directory."
config = RunnerConfig(enable_tools=True, max_tool_iterations=2)
runner = MACPRunner(llm_caller=mock_llm, tool_registry=registry, config=config)
result = runner.run_round(graph)
print("Final:", result.final_answer)
Running the example
# Run the tools example
uv run python examples/tools_example.py
# Run tests
uv run pytest tests/test_tools.py -v
API Reference
Core classes
| Class | Description | Pydantic |
|---|---|---|
RoleGraph |
Role/agent graph with adjacency matrices | β |
AgentProfile |
Pydantic BaseModel β Immutable agent profile | β |
TaskNode |
Pydantic BaseModel β Virtual task node | β |
NodeEncoder |
Text-to-embeddings encoder | β |
MACPRunner |
MACP protocol executor | β |
AdaptiveScheduler |
Adaptive scheduler | β |
LLMCallerFactory |
Factory for creating LLM callers (multi-model) | β |
LLMConfig |
Pydantic BaseModel β LLM configuration for schemas | β |
AgentLLMConfig |
Pydantic BaseModel β LLM configuration for AgentProfile | β |
AgentMemory |
Agent memory manager | β |
SharedMemoryPool |
Shared memory pool | β |
BudgetTracker |
Token/request budget tracker | β |
MetricsTracker |
Performance metrics tracker | β |
GraphVisualizer |
Graph visualization | β |
BaseCallbackHandler |
Base callback handler | β |
AsyncCallbackHandler |
Async callback handler | β |
CallbackManager |
Callback handlers manager | β |
AsyncCallbackManager |
Async callbacks manager | β |
StdoutCallbackHandler |
Console event output | β |
MetricsCallbackHandler |
Execution metrics aggregation | β |
FileCallbackHandler |
Write events to JSON Lines file | β |
EventBus |
Event bus for graph monitoring | β |
EarlyStopCondition |
Early stopping condition | β |
StepContext |
Pydantic BaseModel β Step context for hooks | β |
TopologyAction |
Pydantic BaseModel β Topology modification action | β |
Schemas (Pydantic BaseModel)
| Schema class | Description | Usage |
|---|---|---|
GraphSchema |
Pydantic β Full graph schema | Validation, serialization, migration |
BaseNodeSchema |
Pydantic β Base node schema | Parent class for nodes |
AgentNodeSchema |
Pydantic β Agent node schema | LLM config, tools, metrics, embeddings |
TaskNodeSchema |
Pydantic β Task node schema | Query, status, deadline |
BaseEdgeSchema |
Pydantic β Base edge schema | Weight, probability, cost |
WorkflowEdgeSchema |
Pydantic β Workflow edge | Conditions, priority, transforms |
CostMetrics |
Pydantic β Cost metrics | Tokens, latency, trust, reliability |
ValidationResult |
Pydantic β Validation result | Errors, warnings |
Visualization (Pydantic BaseModel)
| Class | Description | Usage |
|---|---|---|
VisualizationStyle |
Pydantic β Global visualization style | Configure colors, shapes, what to show |
NodeStyle |
Pydantic β Node style | Shape, fill_color, stroke_color, icon |
EdgeStyle |
Pydantic β Edge style | Line style, arrow, colors |
NodeShape |
Enum β Node shapes | RECTANGLE, ROUND, STADIUM, CIRCLE, DIAMOND, etc. |
MermaidDirection |
Enum β Graph direction | TOP_BOTTOM, LEFT_RIGHT, etc. |
GNN (Pydantic BaseModel)
| Class | Description | Usage |
|---|---|---|
FeatureConfig |
Pydantic β Feature configuration | Node/edge feature dimensions |
TrainingConfig |
Pydantic β Training configuration | Learning rate, epochs, optimizer |
Graph construction functions
| Function | Description |
|---|---|
build_property_graph() |
Main graph builder |
build_from_schema() |
Build from GraphSchema |
build_from_adjacency() |
Build from adjacency matrix |
GraphBuilder |
Fluent graph builder with multi-model support |
Multi-model functions
| Function | Description |
|---|---|
create_openai_caller() |
Create a legacy flat-string (str) -> str LLM caller |
create_openai_structured_caller() |
Create a sync structured caller (list[dict]) -> str β recommended |
create_openai_async_structured_caller() |
Create an async structured caller β required for astream() with enable_parallel=True |
LLMCallerFactory.create_openai_factory() |
Create a factory for automatic caller generation |
LLMConfig.merge_with() |
Merge LLM configurations (fallback) |
AgentProfile.with_llm_config() |
Set LLM configuration for an agent |
AgentProfile.has_custom_llm() |
Check whether an agent has a custom LLM config |
Scheduling functions
| Function | Description |
|---|---|
build_execution_order() |
Topological execution order |
get_parallel_groups() |
Parallel execution groups |
extract_agent_adjacency() |
Extract the agent adjacency matrix |
get_incoming_agents() |
Agent predecessors |
get_outgoing_agents() |
Agent successors |
Configuration classes
| Class | Description |
|---|---|
RunnerConfig |
MACPRunner configuration |
LLMConfig |
LLM configuration for an agent (multi-model) |
AgentLLMConfig |
Immutable LLM configuration for AgentProfile |
RoutingPolicy |
Routing policies |
PruningConfig |
Agent pruning configuration |
MemoryConfig |
Memory system configuration |
TrainingConfig |
GNN training configuration |
ErrorPolicy |
Error-handling policies |
FrameworkSettings |
Global framework settings |
FAQ
Why Pydantic? What benefits does it provide?
gMAS Framework is built entirely on Pydantic 2.0+ to ensure type safety, automatic validation, and convenient serialization. Key benefits:
- Automatic type validation β errors are caught when objects are created, not later at runtime
- Declarative typing β IDE autocompletion, static checking (mypy, pyright)
- Automatic serialization β
.model_dump(),.model_dump_json()work out of the box - Default values β no need to write boilerplate
- Nested models β automatic validation of nested structures
- Migrations β safe schema upgrades between versions
- Immutability β
frozen=Trueprevents accidental mutation
from core import AgentProfile
from pydantic import ValidationError
# β
Correct usage β Pydantic validates
agent = AgentProfile(
agent_id="test",
display_name="Test Agent",
tools=["tool1", "tool2"],
)
# β Incorrect β Pydantic will raise ValidationError
try:
bad_agent = AgentProfile(
agent_id=123, # Must be str, not int
display_name="Test",
)
except ValidationError as e:
print(e.errors()) # Detailed error info
# Automatic serialization (Pydantic v2 API)
data = agent.model_dump() # β dict
json_str = agent.model_dump_json(indent=2) # β JSON string
# Automatic deserialization
loaded = AgentProfile.model_validate(data)
from_json = AgentProfile.model_validate_json(json_str)
Which Pydantic version is required? Is it compatible with Pydantic 1.x?
gMAS Framework requires Pydantic 2.0+ and is not compatible with Pydantic 1.x.
Key API differences:
- Pydantic 1.x:
.dict(),.parse_obj(),.json() - Pydantic 2.x:
.model_dump(),.model_validate(),.model_dump_json()
If you have Pydantic 1.x installed:
pip install --upgrade "pydantic>=2.0"
Version check:
import pydantic
print(pydantic.VERSION) # Must be >= 2.0.0
How do I use different models for different agents?
from builder import GraphBuilder
from execution import MACPRunner, LLMCallerFactory
# Method 1: Via GraphBuilder (recommended)
builder = GraphBuilder()
builder.add_agent(
"analyst",
llm_backbone="gpt-4", # Strong model
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.0,
max_tokens=4000,
)
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini", # Cheaper model
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
temperature=0.3,
max_tokens=1000,
)
builder.add_workflow_edge("analyst", "formatter")
graph = builder.build()
# Factory auto-creates callers
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
result = runner.run_round(graph)
How do I integrate with OpenAI?
import openai
# Method 1: Simple integration (one LLM for all)
def openai_caller(prompt: str) -> str:
response = openai.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
)
return response.choices[0].message.content
runner = MACPRunner(llm_caller=openai_caller)
# Method 2: Multi-model integration (recommended)
from execution import create_openai_caller
# Uses the openai SDK automatically
runner = MACPRunner(
llm_factory=LLMCallerFactory.create_openai_factory(
default_api_key="sk-...",
default_base_url="https://api.openai.com/v1",
)
)
How do I use local models (Ollama)?
import requests
def ollama_caller(prompt: str) -> str:
response = requests.post(
"http://localhost:11434/api/generate",
json={"model": "llama2", "prompt": prompt, "stream": False},
)
return response.json()["response"]
runner = MACPRunner(llm_caller=ollama_caller)
How do I add custom tools?
Tools are just strings that are included in the agent prompt:
agent = AgentProfile(
agent_id="code_executor",
display_name="Code Executor",
tools=["python_execute", "file_read", "file_write"],
)
Tool logic is implemented inside your LLM call.
How do I visualize the graph? Which formats are supported?
gMAS Framework provides a powerful visualization system with Pydantic styles and support for multiple formats:
Supported formats:
- Mermaid β for GitHub/docs
- ASCII art β for terminals
- Graphviz DOT β for professional visualization
- Rich Console β colored terminal output
- PNG/SVG/PDF β image rendering (requires system Graphviz)
from core.visualization import (
GraphVisualizer,
VisualizationStyle,
NodeStyle,
NodeShape,
MermaidDirection,
# Convenience functions
to_mermaid,
to_ascii,
print_graph,
render_to_image,
)
# Quick visualization (convenience functions)
print(to_mermaid(graph, direction=MermaidDirection.LEFT_RIGHT))
print(to_ascii(graph, show_edges=True))
print_graph(graph, format="auto") # Auto-selects colored/ascii
# Advanced custom styles (Pydantic models)
style = VisualizationStyle(
direction=MermaidDirection.LEFT_RIGHT,
agent_style=NodeStyle(
shape=NodeShape.ROUND,
fill_color="#e3f2fd",
stroke_color="#1976d2",
icon="π€",
),
show_weights=True,
show_tools=True,
)
viz = GraphVisualizer(graph, style)
viz.save_mermaid("graph.md", title="My Workflow")
viz.save_dot("graph.dot")
# Image rendering (requires: pip install graphviz + system graphviz)
try:
render_to_image(graph, "output.png", format="png", dpi=150, style=style)
render_to_image(graph, "output.svg", format="svg", style=style)
print("β
Images created")
except Exception as e:
print(f"β οΈ Install system Graphviz: {e}")
# Ubuntu: sudo apt install graphviz
# macOS: brew install graphviz
Installing Graphviz for image rendering:
# Python library
pip install graphviz
# System Graphviz
# Ubuntu/Debian:
sudo apt install graphviz
# macOS:
brew install graphviz
# Windows:
winget install graphviz
How do I save and load a graph?
import json
# Save
data = graph.to_dict()
with open("graph.json", "w") as f:
json.dump(data, f)
# Load
with open("graph.json", "r") as f:
data = json.load(f)
graph = RoleGraph.from_dict(data)
Saving via Pydantic schemas (recommended):
from core.schema import GraphSchema
# Build a schema from the graph
schema = GraphSchema(
name="MyGraph",
nodes={agent.agent_id: AgentNodeSchema.from_profile(agent) for agent in graph.agents},
edges=[BaseEdgeSchema.from_edge(e) for e in graph.edges],
)
# Save (Pydantic auto-serialization)
schema_json = schema.model_dump_json(indent=2)
with open("graph_schema.json", "w") as f:
f.write(schema_json)
# Load (Pydantic auto-validation)
with open("graph_schema.json", "r") as f:
loaded_schema = GraphSchema.model_validate_json(f.read())
# Build a graph from the schema
from builder import build_from_schema
graph = build_from_schema(loaded_schema)
How do I handle agent errors?
from execution import RunnerConfig, ErrorPolicy
config = RunnerConfig(
error_policy=ErrorPolicy(
on_error="fallback", # skip, retry, fallback, fail
max_retries=3,
),
pruning_config=PruningConfig(
enable_fallback=True,
max_fallback_attempts=2,
),
)
result = runner.run_round(graph)
if result.errors:
for error in result.errors:
print(f"Error in {error.agent_id}: {error.message}")
How do I track agent performance?
from core.metrics import MetricsTracker
tracker = MetricsTracker()
# Runner integration
runner = MACPRunner(llm_caller=my_llm, metrics_tracker=tracker)
result = runner.run_round(graph)
# Retrieve metrics
for agent_id in graph.node_ids:
metrics = tracker.get_node_metrics(agent_id)
print(f"{agent_id}:")
print(f" Reliability: {metrics.reliability:.2%}")
print(f" Avg latency: {metrics.avg_latency_ms:.0f}ms")
print(f" Quality: {metrics.avg_quality:.2f}")
# Save metrics
tracker.save("metrics.json")
How do I use dynamic topology?
# Modify the graph at runtime
graph.add_node(new_agent, connections_to=["existing_agent"])
graph.add_edge("agent1", "new_agent", weight=0.8)
# Remove inefficient agents
if metrics.get_node_metrics("slow_agent").avg_latency_ms > 5000:
graph.remove_node("slow_agent", policy=StateMigrationPolicy.DISCARD)
# Update weights based on performance
new_weights = compute_weights_from_metrics(tracker)
graph.update_communication(new_weights)
How do I integrate with LangChain?
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
llm = ChatOpenAI(model="gpt-4")
def langchain_caller(prompt: str) -> str:
messages = [HumanMessage(content=prompt)]
response = llm(messages)
return response.content
runner = MACPRunner(llm_caller=langchain_caller)
result = runner.run_round(graph)
How do I implement human-in-the-loop?
from execution import StreamEventType
def human_approval(agent_id: str, response: str) -> bool:
print(f"\n{agent_id} replied: {response}")
approval = input("Approve? (y/n): ")
return approval.lower() == 'y'
def stream_with_approval(graph):
for event in runner.stream(graph):
if event.event_type == StreamEventType.AGENT_OUTPUT:
if not human_approval(event.agent_id, event.content):
# Restart the agent with feedback
feedback = input("Your feedback: ")
# ... restart logic ...
yield event
How do I use a graph with multiple tasks?
# Option 1: sequential
queries = ["Task 1", "Task 2", "Task 3"]
for query in queries:
graph.query = query
result = runner.run_round(graph)
print(f"{query}: {result.final_answer}")
# Option 2: parallel (async)
async def process_queries(queries):
tasks = []
for query in queries:
graph_copy = copy.deepcopy(graph)
graph_copy.query = query
tasks.append(runner.arun_round(graph_copy))
results = await asyncio.gather(*tasks)
return results
How do I combine cloud and local models?
from builder import GraphBuilder
builder = GraphBuilder()
# Cloud model for public data
builder.add_agent(
"public_analyzer",
llm_backbone="gpt-4",
base_url="https://api.openai.com/v1",
api_key="$OPENAI_API_KEY",
)
# Local model (Ollama) for confidential data
builder.add_agent(
"private_analyzer",
llm_backbone="llama3:70b",
base_url="http://localhost:11434/v1",
api_key="not-needed", # Ollama does not require an API key
)
builder.add_workflow_edge("public_analyzer", "private_analyzer")
graph = builder.build()
factory = LLMCallerFactory.create_openai_factory()
runner = MACPRunner(llm_factory=factory)
How do I optimize LLM cost with multi-model routing?
# Strategy: cheap models for routine tasks, expensive for complex tasks
builder = GraphBuilder()
# Steps 1-3: simple operations β cheap model
for i in range(3):
builder.add_agent(
f"processor_{i}",
llm_backbone="gpt-4o-mini", # $0.15/$0.60 per 1M tokens
max_tokens=500,
)
# Step 4: complex analysis β expensive model
builder.add_agent(
"analyst",
llm_backbone="gpt-4", # $30/$60 per 1M tokens
max_tokens=2000,
)
# Step 5: final formatting β cheap model
builder.add_agent(
"formatter",
llm_backbone="gpt-4o-mini",
max_tokens=500,
)
# Savings: ~70β80% vs using gpt-4 for all steps
How do I use API keys safely?
# β DO NOT do this (hardcode keys)
builder.add_agent("agent", api_key="sk-1234567890...")
# β
Correct: use environment variables
import os
# Method 1: load from a .env file
from dotenv import load_dotenv
load_dotenv()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# Method 2: set the env var explicitly
os.environ["OPENAI_API_KEY"] = open("keys/openai.key").read().strip()
builder.add_agent("agent", api_key="$OPENAI_API_KEY")
# Method 3: use a factory with a default key
factory = LLMCallerFactory.create_openai_factory(
default_api_key=os.getenv("OPENAI_API_KEY"),
)
How do I configure logging?
from config import setup_logging
# Configure global logging
setup_logging(
level="DEBUG",
log_file="framework.log",
rotation="500 MB",
retention="10 days",
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
backtrace=True,
diagnose=True,
)
# Use in code
from config import logger
logger.info("Starting execution")
logger.debug(f"Graph has {graph.num_nodes} nodes")
logger.error("Failed to execute agent", exc_info=True)
How do I export a graph for analysis?
# 1. JSON serialization
import json
graph_data = graph.to_dict()
with open("graph.json", "w") as f:
json.dump(graph_data, f, indent=2)
# 2. PyTorch Geometric format
pyg_data = graph.to_pyg_data()
torch.save(pyg_data, "graph.pt")
# 3. NetworkX format (if needed)
import networkx as nx
G = nx.DiGraph()
for node_id in graph.node_ids:
G.add_node(node_id, **graph.get_agent_by_id(node_id).to_dict())
for i, j in zip(*graph.edge_index):
src = graph.node_ids[i]
tgt = graph.node_ids[j]
G.add_edge(src, tgt, weight=graph.A_com[i, j])
nx.write_gexf(G, "graph.gexf")
# 4. CSV export
import pandas as pd
# Nodes
nodes_df = pd.DataFrame([
{"id": agent.agent_id, "name": agent.display_name, "tools": ",".join(agent.tools)}
for agent in graph.agents
])
nodes_df.to_csv("nodes.csv", index=False)
# Edges
edges = []
for i in range(graph.num_nodes):
for j in range(graph.num_nodes):
if graph.A_com[i, j] > 0:
edges.append({
"source": graph.node_ids[i],
"target": graph.node_ids[j],
"weight": graph.A_com[i, j],
})
edges_df = pd.DataFrame(edges)
edges_df.to_csv("edges.csv", index=False)
How do I test agents?
import pytest
from unittest.mock import Mock
def test_agent_execution():
# Mock the LLM
mock_llm = Mock(return_value="Mocked response")
# Build a graph
agents = [AgentProfile(agent_id="test", display_name="Test Agent")]
graph = build_property_graph(agents, [], query="Test query")
# Run
runner = MACPRunner(llm_caller=mock_llm)
result = runner.run_round(graph)
# Assertions
assert result.final_answer == "Mocked response"
assert len(result.execution_order) == 1
assert result.total_tokens >= 0
mock_llm.assert_called_once()
def test_error_handling():
# Mock the LLM with an error
mock_llm = Mock(side_effect=Exception("LLM error"))
graph = build_property_graph([agent], [], query="Test")
config = RunnerConfig(
max_retries=2,
error_policy=ErrorPolicy(on_error=ErrorAction.SKIP),
)
runner = MACPRunner(llm_caller=mock_llm, config=config)
result = runner.run_round(graph)
assert len(result.errors) > 0
assert result.final_answer is None
def test_parallel_execution():
agents = [
AgentProfile(agent_id=f"agent_{i}", display_name=f"Agent {i}")
for i in range(3)
]
edges = [("agent_0", "agent_1"), ("agent_0", "agent_2")]
graph = build_property_graph(agents, edges, query="Test")
config = RunnerConfig(enable_parallel=True, max_parallel_size=2)
runner = MACPRunner(llm_caller=mock_llm, config=config)
result = runner.run_round(graph)
assert len(result.execution_order) == 3
How do I scale to large graphs?
# 1. Use pruning to cut inefficient paths
config = RunnerConfig(
pruning_config=PruningConfig(
min_weight_threshold=0.2,
min_probability_threshold=0.1,
token_budget=5000,
),
)
# 2. Use parallel execution
config.enable_parallel = True
config.max_parallel_size = 10
# 3. Use beam search to cap paths
config.routing_policy = RoutingPolicy.BEAM_SEARCH
scheduler = AdaptiveScheduler(policy=RoutingPolicy.BEAM_SEARCH, beam_width=5)
# 4. Use subgraph filtering
from core.algorithms import GraphAlgorithms, SubgraphFilter
algo = GraphAlgorithms(graph)
subgraph = algo.filter_subgraph(SubgraphFilter(
max_hop_distance=3,
from_node="start",
min_edge_weight=0.3,
))
# 5. Use async for parallel requests
async def process_large_graph(graph):
results = await runner.arun_round(graph)
return results
License
Support
- GitHub Issues: github.com/yourusername/rustworkx-agent-framework/issues
- Documentation: github.com/yourusername/rustworkx-agent-framework#readme
Made with β€οΈ for the multi-agent systems developer community