""" Building the agent graph from profiles. Supports: - Topology validation (cycles, duplicates, directedness) - Custom node and edge attributes - Extensible schemas with versioning - Export to PyG format with arbitrary features """ import itertools from collections.abc import Callable, Sequence from typing import TYPE_CHECKING, Any import rustworkx as rx import torch from core.schema import ( SCHEMA_VERSION, AgentNodeSchema, BaseEdgeSchema, BaseNodeSchema, CostMetrics, EdgeType, GraphSchema, LLMConfig, NodeType, SchemaValidator, TaskNodeSchema, ValidationResult, WorkflowEdgeSchema, ) __all__ = [ "BuilderConfig", "GraphBuilder", "build_from_adjacency", "build_from_schema", "build_property_graph", "default_edges", "default_sequence", ] _TASK_NODE_ID = "__task__" if TYPE_CHECKING: from core.graph import RoleGraph class BuilderConfig: """ Graph builder settings. Controls schema validation, cycle/duplicate tolerance, and feature widths, as well as default weight behaviour and connection of the virtual task node. """ def __init__( self, validate: bool = True, check_cycles: bool = True, check_duplicates: bool = True, allow_self_loops: bool = False, node_feature_names: list[str] | None = None, edge_feature_names: list[str] | None = None, default_edge_dim: int | None = None, weight_fn: Callable[[str, str, dict], float] | None = None, default_weight: float = 1.0, include_task_node: bool = True, task_edge_weight: float = 1.0, ): """ Create the builder configuration. Args: validate: Whether to run schema validation before building. check_cycles: Whether to check for cycles during validation. check_duplicates: Whether to check for duplicate nodes and edges. allow_self_loops: Whether to allow self-loops in the graph. node_feature_names: Node feature names included in the schema. edge_feature_names: Edge feature names included in the schema. default_edge_dim: Default dimensionality of edge feature vectors. weight_fn: Custom function for computing edge weights. default_weight: Edge weight when weight_fn is not set. include_task_node: Whether to add a virtual task node. task_edge_weight: Weight of edges between the task and agents. """ self.validate = validate self.check_cycles = check_cycles self.check_duplicates = check_duplicates self.allow_self_loops = allow_self_loops self.node_feature_names = node_feature_names or [] self.edge_feature_names = edge_feature_names or [] self.default_edge_dim = default_edge_dim self.weight_fn = weight_fn self.default_weight = default_weight self.include_task_node = include_task_node self.task_edge_weight = task_edge_weight class GraphBuilder: """ Convenient interface for step-by-step construction of `GraphSchema`. Supports conditional routing via add_conditional_edge and add_conditional_edges methods. Supports explicit start/end nodes for execution optimisation. Example: builder = GraphBuilder() builder.add_agent("solver", description="Solves problems") builder.add_agent("reviewer", description="Reviews solutions") builder.add_agent("finalizer", description="Finalizes answer") # Unconditional edges builder.add_workflow_edge("solver", "reviewer") # Conditional edge: transition to finalizer only if reviewer succeeds builder.add_conditional_edge( "reviewer", "finalizer", condition=lambda ctx: "approved" in ctx.get_last_response().lower() ) # Set explicit execution boundaries builder.set_start_node("solver") builder.set_end_node("finalizer") graph = builder.build() """ def __init__(self, config: BuilderConfig | None = None): """Initialise the builder with the given configuration.""" self.config = config or BuilderConfig() self._schema = GraphSchema( schema_version=SCHEMA_VERSION, node_feature_names=self.config.node_feature_names, edge_feature_names=self.config.edge_feature_names, ) self._validator = SchemaValidator( check_cycles=self.config.check_cycles, check_duplicates=self.config.check_duplicates, ) # Callable conditions are stored separately (not serialised in the schema) self._edge_conditions: dict[tuple[str, str], Callable] = {} # Explicit graph execution boundaries self._start_node: str | None = None self._end_node: str | None = None # Agent profiles (for passing tool objects) self._agent_profiles: dict[str, Any] = {} def add_agent( self, agent_id: str, display_name: str | None = None, persona: str = "", description: str = "", embedding: list[float] | None = None, trust_score: float = 1.0, llm_config: LLMConfig | None = None, tools: list[str] | None = None, input_schema: Any | None = None, output_schema: Any | None = None, llm_backbone: str | None = None, base_url: str | None = None, api_key: str | None = None, max_tokens: int | None = None, temperature: float | None = None, timeout: float | None = None, top_p: float | None = None, stop_sequences: list[str] | None = None, **metadata, ) -> "GraphBuilder": """ Add an agent node to the schema with optional LLM configuration and validation. Args: agent_id: Unique agent identifier. display_name: Display name (defaults to the id). persona: Brief persona/role. description: Text description of capabilities. embedding: Agent feature vector (if already computed). trust_score: Base trust score for the agent. llm_config: Ready-made LLMConfig object (overrides individual params). tools: List of agent tools. input_schema: Pydantic model or JSON Schema for validating incoming data. output_schema: Pydantic model or JSON Schema for validating agent responses. llm_backbone: Model name (e.g. "gpt-4", "claude-3-opus"). base_url: API endpoint URL (e.g. "https://api.openai.com/v1"). api_key: API key or reference to an environment variable ("$OPENAI_API_KEY"). max_tokens: Maximum number of tokens in the response. temperature: Generation temperature (0.0-2.0). timeout: Request timeout in seconds. top_p: Top-p (nucleus) sampling parameter. stop_sequences: Sequences to stop generation. **metadata: Arbitrary additional fields. Example: # Agent with OpenAI GPT-4 builder.add_agent( "solver", persona="Expert problem solver", llm_backbone="gpt-4", base_url="https://api.openai.com/v1", api_key="$OPENAI_API_KEY", temperature=0.7, max_tokens=2000 ) # Agent with a local Ollama model builder.add_agent( "analyzer", persona="Data analyzer", llm_backbone="llama3:70b", base_url="http://localhost:11434/v1", temperature=0.0 ) # Agent with a ready-made LLMConfig config = LLMConfig(model_name="claude-3-opus", ...) builder.add_agent("reviewer", llm_config=config) # Agent with input/output validation from pydantic import BaseModel class SolverInput(BaseModel): question: str context: str | None = None class SolverOutput(BaseModel): answer: str confidence: float builder.add_agent( "solver", persona="Math solver", input_schema=SolverInput, output_schema=SolverOutput ) """ # If llm_config provided, use it as fallback effective_llm_backbone = llm_backbone effective_base_url = base_url effective_api_key = api_key effective_max_tokens = max_tokens effective_temperature = temperature effective_timeout = timeout effective_top_p = top_p effective_stop_sequences = stop_sequences if llm_config: effective_llm_backbone = llm_backbone or llm_config.model_name effective_base_url = base_url or llm_config.base_url effective_api_key = api_key or llm_config.api_key effective_max_tokens = max_tokens if max_tokens is not None else llm_config.max_tokens effective_temperature = temperature if temperature is not None else llm_config.temperature effective_timeout = timeout if timeout is not None else llm_config.timeout effective_top_p = top_p if top_p is not None else llm_config.top_p effective_stop_sequences = stop_sequences or llm_config.stop_sequences node = AgentNodeSchema( id=agent_id, display_name=display_name or agent_id, persona=persona, description=description, embedding=embedding, trust_score=trust_score, llm_backbone=effective_llm_backbone, base_url=effective_base_url, api_key=effective_api_key, max_tokens=effective_max_tokens, temperature=effective_temperature, timeout=effective_timeout, top_p=effective_top_p, stop_sequences=effective_stop_sequences, tools=tools or [], input_schema=input_schema, output_schema=output_schema, metadata=metadata, ) self._schema.add_node(node) return self def add_agent_profile( self, profile: Any, # AgentProfile trust_score: float = 1.0, **metadata, ) -> "GraphBuilder": """ Add an agent from an existing AgentProfile object. Args: profile: AgentProfile object. trust_score: Base trust score for the agent. **metadata: Additional fields. Example: from core.agent import AgentProfile from tools import CodeInterpreterTool agent = AgentProfile( agent_id="coder", display_name="Coder Agent", persona="a Python programmer", tools=[CodeInterpreterTool()], ) builder = GraphBuilder() builder.add_agent_profile(agent) """ # Get tool names (strings or names from objects) tool_names = [] if hasattr(profile, "get_tool_names"): tool_names = profile.get_tool_names() elif hasattr(profile, "tools"): tool_names = profile.tools # Get LLM config if present llm_config = None if hasattr(profile, "llm_config") and profile.llm_config: llm_config = profile.llm_config node = AgentNodeSchema( id=profile.agent_id, display_name=profile.display_name, persona=getattr(profile, "persona", ""), description=getattr(profile, "description", ""), embedding=None, # Will be computed later if needed trust_score=trust_score, llm_backbone=getattr(profile, "llm_backbone", None), base_url=llm_config.base_url if llm_config else None, api_key=llm_config.api_key if llm_config else None, max_tokens=llm_config.max_tokens if llm_config else None, temperature=llm_config.temperature if llm_config else None, timeout=llm_config.timeout if llm_config else None, top_p=llm_config.top_p if llm_config else None, stop_sequences=llm_config.stop_sequences if llm_config else None, tools=tool_names, input_schema=getattr(profile, "input_schema", None), output_schema=getattr(profile, "output_schema", None), metadata=metadata, ) self._schema.add_node(node) # Save the original profile for passing tool objects to the runner self._agent_profiles[profile.agent_id] = profile return self def add_task( self, task_id: str = _TASK_NODE_ID, query: str = "", description: str = "", embedding: list[float] | None = None, **metadata, ) -> "GraphBuilder": """ Add a virtual task node. Args: task_id: Task node identifier. query: Task query/formulation. description: Additional context description. embedding: Task feature vector. **metadata: Additional arbitrary fields. """ node = TaskNodeSchema( id=task_id, query=query, description=description, embedding=embedding, metadata=metadata, ) self._schema.add_node(node) return self def add_node( self, node_id: str, node_type: NodeType = NodeType.CUSTOM, **kwargs, ) -> "GraphBuilder": """Add an arbitrary node of the specified type.""" node = BaseNodeSchema( id=node_id, type=node_type, **kwargs, ) self._schema.add_node(node) return self def add_edge( self, source: str, target: str, edge_type: EdgeType = EdgeType.WORKFLOW, weight: float | None = None, probability: float = 1.0, cost: CostMetrics | None = None, attr: list[float] | None = None, **metadata, ) -> "GraphBuilder": """ Add a basic edge between nodes. Args: source: Edge source. target: Edge target. edge_type: Edge type (workflow/task/...). weight: Edge weight; if None, computed by weight_fn or default. probability: Transmission/activation probability. cost: Transition cost metrics. attr: Edge feature vector. **metadata: Additional metadata written to the schema. """ if weight is None: if self.config.weight_fn: weight = self.config.weight_fn(source, target, metadata) else: weight = self.config.default_weight if source == target and not self.config.allow_self_loops: msg = f"Self-loops not allowed: {source} -> {target}" raise ValueError(msg) edge = BaseEdgeSchema( source=source, target=target, type=edge_type, weight=weight, probability=probability, cost=cost or CostMetrics(), attr=attr, metadata=metadata, ) self._schema.add_edge(edge) return self def add_workflow_edge( self, source: str, target: str, weight: float | None = None, condition: str | None = None, priority: int = 0, **metadata, ) -> "GraphBuilder": """ Add a workflow (business logic) edge. Args: source: Source node. target: Target node. weight: Weight; if None — computed as for a regular edge. condition: Activation condition (e.g. a route filter). priority: Execution priority within the workflow. **metadata: Additional schema attributes. """ if weight is None: if self.config.weight_fn: weight = self.config.weight_fn(source, target, metadata) else: weight = self.config.default_weight edge = WorkflowEdgeSchema( source=source, target=target, weight=weight, condition=condition, priority=priority, metadata=metadata, ) self._schema.add_edge(edge) return self def add_conditional_edge( self, source: str, target: str, condition: Callable | str, weight: float | None = None, priority: int = 0, **metadata, ) -> "GraphBuilder": """ Add a conditional edge. The condition is evaluated at runtime during execution plan building. If the condition is not met, the edge is ignored. Args: source: Source node. target: Target node. condition: Activation condition: - Callable[[ConditionContext], bool] — check function - str — name of a registered condition or expression weight: Edge weight (if None — default_weight). priority: Edge priority. **metadata: Additional attributes. Returns: self for chaining. Example: # Callable condition builder.add_conditional_edge( "solver", "reviewer", condition=lambda ctx: ctx.source_succeeded() ) # String condition (built-in) builder.add_conditional_edge( "reviewer", "finalizer", condition="source_success" ) # String condition (content check) builder.add_conditional_edge( "classifier", "math_agent", condition="contains:math" ) """ if weight is None: if self.config.weight_fn: weight = self.config.weight_fn(source, target, metadata) else: weight = self.config.default_weight # String condition saved in the schema condition_str = condition if isinstance(condition, str) else None edge = WorkflowEdgeSchema( source=source, target=target, weight=weight, condition=condition_str, priority=priority, is_conditional=True, metadata=metadata, ) self._schema.add_edge(edge) # Callable condition saved separately if callable(condition): self._edge_conditions[(source, target)] = condition return self def add_conditional_edges( self, source: str, path_map: dict[str, Callable | str | None], default: str | None = None, weight: float | None = None, ) -> "GraphBuilder": """ Add multiple conditional edges from one source (router pattern). Analogous to LangGraph add_conditional_edges — selecting the next node based on conditions. Args: source: Source node. path_map: Dict {target: condition}. condition=None means unconditional transition. default: Default node if no condition is met. weight: Weight for all edges. Returns: self for chaining. Example: # Router: classifier routes to different agents builder.add_conditional_edges( "classifier", { "math_agent": lambda ctx: "math" in ctx.get_last_response(), "code_agent": lambda ctx: "code" in ctx.get_last_response(), "general_agent": None, # fallback }, default="general_agent" ) """ for target, condition in path_map.items(): if condition is None: # Unconditional edge self.add_workflow_edge(source, target, weight=weight) else: self.add_conditional_edge(source, target, condition, weight=weight) # If there is a default and it has not been added yet if default and default not in path_map: self.add_workflow_edge(source, default, weight=weight) return self @property def edge_conditions(self) -> dict[tuple[str, str], Callable]: """Get all callable conditions for edges.""" return self._edge_conditions.copy() def set_start_node(self, node_id: str) -> "GraphBuilder": """ Set the start node for execution. The start node is the graph entry point. Nodes unreachable from the start node will be excluded from execution. Args: node_id: Node ID (must be added before calling build()). Returns: self for chaining. Example: builder.add_agent("input_agent", ...) builder.set_start_node("input_agent") """ self._start_node = node_id return self def set_end_node(self, node_id: str) -> "GraphBuilder": """ Set the end node for execution. The end node is the graph exit point. Nodes from which the end node is unreachable will be excluded from execution. Args: node_id: Node ID (must be added before calling build()). Returns: self for chaining. Example: builder.add_agent("output_agent", ...) builder.set_end_node("output_agent") """ self._end_node = node_id return self def set_execution_bounds( self, start_node: str | None, end_node: str | None, ) -> "GraphBuilder": """ Set execution boundaries (start and end nodes). Args: start_node: Start node ID (None for auto-detection). end_node: End node ID (None for auto-detection). Returns: self for chaining. Example: builder.set_execution_bounds("input", "output") """ self._start_node = start_node self._end_node = end_node return self @property def start_node(self) -> str | None: """Get the start node ID.""" return self._start_node @property def end_node(self) -> str | None: """Get the end node ID.""" return self._end_node def connect_task_to_agents( self, task_id: str = _TASK_NODE_ID, agent_ids: list[str] | None = None, bidirectional: bool = True, ) -> "GraphBuilder": """ Connect the task node to agents via context/update edges. Args: task_id: Task node identifier. agent_ids: List of agents; if None, all schema agents are used. bidirectional: Whether to add reverse edges from agents to the task. """ if agent_ids is None: agent_ids = [node_id for node_id, node in self._schema.nodes.items() if node.type == NodeType.AGENT] for agent_id in agent_ids: self.add_edge( task_id, agent_id, edge_type=EdgeType.TASK_CONTEXT, weight=self.config.task_edge_weight, ) if bidirectional: self.add_edge( agent_id, task_id, edge_type=EdgeType.TASK_UPDATE, weight=self.config.task_edge_weight, ) return self def from_edges( self, edges: Sequence[tuple[str, str]], weight: float | None = None, ) -> "GraphBuilder": """Add a set of workflow edges from a list of (source, target) pairs.""" for source, target in edges: self.add_workflow_edge(source, target, weight=weight) return self def validate(self) -> ValidationResult: """Validate the schema for correctness and return the validation result.""" return self._validator.validate(self._schema) def build(self) -> "RoleGraph": """Build a `RoleGraph` from the current schema, optionally validating it.""" if self.config.validate: result = self.validate() if not result.valid: msg = f"Schema validation failed: {result.errors}" raise ValueError(msg) return build_from_schema( self._schema, edge_conditions=self._edge_conditions if self._edge_conditions else None, start_node=self._start_node, end_node=self._end_node, ) @property def schema(self) -> GraphSchema: """Current graph schema (for reading/extending outside the builder).""" return self._schema def build_from_schema( schema: GraphSchema, edge_conditions: dict[tuple[str, str], Callable] | None = None, start_node: str | None = None, end_node: str | None = None, ) -> "RoleGraph": """ Construct a `RoleGraph` from a ready-made `GraphSchema`. Creates an internal `rx.PyDiGraph`, transfers node/edge data, restores adjacency and probability matrices, and builds a `RoleGraph` with the corresponding agent and task objects. LLM configuration from AgentNodeSchema is automatically transferred to AgentProfile.llm_config to support multi-model usage. Args: schema: Graph schema. edge_conditions: Callable conditions for edges. start_node: Start node ID for execution. end_node: End node ID for execution. """ from core.agent import AgentLLMConfig, AgentProfile, TaskNode from core.graph import RoleGraph graph = rx.PyDiGraph() idx_map = {} agents = [] task_node = None task_node_id = None for node_id, node_schema in schema.nodes.items(): idx_map[node_id] = graph.add_node( { "id": node_id, "type": node_schema.type.value, "schema": node_schema.model_dump(), } ) embedding = None if node_schema.embedding: embedding = torch.tensor(node_schema.embedding, dtype=torch.float32) if node_schema.type == NodeType.TASK: task_schema = node_schema task_node = TaskNode( agent_id=node_id, query=getattr(task_schema, "query", ""), description=getattr(task_schema, "description", ""), embedding=embedding, ) task_node_id = node_id agents.append(task_node) else: agent_schema = node_schema # Build LLM config from schema if any LLM params are set llm_config = None if isinstance(agent_schema, AgentNodeSchema) and agent_schema.has_llm_config(): llm_config = AgentLLMConfig( model_name=agent_schema.llm_backbone, base_url=agent_schema.base_url, api_key=agent_schema.api_key, max_tokens=agent_schema.max_tokens, temperature=agent_schema.temperature, timeout=agent_schema.timeout, top_p=agent_schema.top_p, stop_sequences=agent_schema.stop_sequences, ) agent = AgentProfile( agent_id=node_id, display_name=node_schema.display_name or node_id, persona=getattr(agent_schema, "persona", ""), description=getattr(agent_schema, "description", ""), llm_backbone=getattr(agent_schema, "llm_backbone", None), llm_config=llm_config, tools=getattr(agent_schema, "tools", []), embedding=embedding, input_schema=getattr(agent_schema, "input_schema", None), output_schema=getattr(agent_schema, "output_schema", None), ) agents.append(agent) for edge_schema in schema.edges: if edge_schema.source in idx_map and edge_schema.target in idx_map: edge_data = { "type": edge_schema.type.value, "weight": edge_schema.weight, "probability": edge_schema.probability, "attr": edge_schema.to_attr_tensor(), "schema": edge_schema.model_dump(exclude={"attr"}), } graph.add_edge( idx_map[edge_schema.source], idx_map[edge_schema.target], edge_data, ) n = graph.num_nodes() a_com = torch.zeros((n, n), dtype=torch.float32) p_matrix = torch.zeros((n, n), dtype=torch.float32) for edge_idx in graph.edge_indices(): s, t = graph.get_edge_endpoints_by_index(edge_idx) d = graph.get_edge_data_by_index(edge_idx) a_com[s, t] = d.get("weight", 1.0) p_matrix[s, t] = d.get("probability", 1.0) connections = {a.agent_id: [] for a in agents} edge_condition_names: dict[tuple[str, str], str] = {} for edge_schema in schema.edges: if edge_schema.source in connections and edge_schema.target not in connections[edge_schema.source]: connections[edge_schema.source].append(edge_schema.target) # Extract string conditions from WorkflowEdgeSchema if hasattr(edge_schema, "condition") and edge_schema.condition and isinstance(edge_schema.condition, str): edge_condition_names[(edge_schema.source, edge_schema.target)] = edge_schema.condition return RoleGraph( agents=agents, node_ids=[a.agent_id for a in agents], role_connections=connections, task_node=task_node_id, query=getattr(task_node, "query", "") if task_node else "", graph=graph, A_com=a_com, p_matrix=p_matrix, edge_conditions=edge_conditions or {}, edge_condition_names=edge_condition_names, start_node=start_node, end_node=end_node, ) def default_sequence(roles: Sequence[str], anchor: str) -> list[str]: """Build the role order starting from the anchor (if present).""" ordered = [anchor] if anchor in roles else [] ordered.extend(role for role in roles if role != anchor) return ordered def default_edges(sequence: Sequence[str]) -> list[tuple[str, str]]: """Build a chain of edges from a sequence of nodes (s -> t in order).""" return list(itertools.pairwise(sequence)) def build_property_graph( agents: Sequence[Any], workflow_edges: Sequence[tuple[str, str]], *, query: str = "", answer: str = "", anchor: str | None = None, encoder: Any | None = None, include_task_node: bool = True, config: BuilderConfig | None = None, node_features: dict[str, dict[str, Any]] | None = None, edge_features: dict[tuple[str, str], dict[str, Any]] | None = None, ) -> "RoleGraph": """ Build a `RoleGraph` with the given workflow topology. Args: agents: List of agents (`AgentProfile`). workflow_edges: Workflow edges [(source, target), ...]. query: Task text placed in the task node. answer: Known answer (added to the final graph). anchor: Anchor agent ID — will be first in order and may boost edge weights. encoder: `NodeEncoder` for automatic embedding computation. include_task_node: Whether to add a virtual task node and connect it to agents. config: Builder configuration (if not provided — a new one is created). node_features: Additional node features `{node_id: {feature: value}}`. edge_features: Additional edge features `{(src, tgt): {feature: value}}`. Returns: Ready-made `RoleGraph` with populated adjacency and probability matrices. """ config = config or BuilderConfig(include_task_node=include_task_node) builder = GraphBuilder(config) if encoder is not None: from core.agent import TaskNode texts = [a.to_text() for a in agents] if include_task_node: task_tmpl = TaskNode(query=query) texts.append(task_tmpl.to_text()) embs = encoder.encode(texts) emb_list = [embs[i].cpu().tolist() for i in range(len(agents))] task_emb = embs[-1].cpu().tolist() if include_task_node else None else: emb_list = [None] * len(agents) task_emb = None if anchor is None and agents: anchor = agents[0].agent_id for i, agent in enumerate(agents): extra = (node_features or {}).get(agent.agent_id, {}) trust = 1.0 + (0.1 if agent.agent_id == anchor else 0) builder.add_agent( agent_id=agent.agent_id, display_name=agent.display_name, persona=getattr(agent, "persona", ""), description=getattr(agent, "description", ""), embedding=emb_list[i], trust_score=min(trust, 1.0), llm_backbone=getattr(agent, "llm_backbone", None), tools=getattr(agent, "tools", []), **extra, ) if include_task_node: task_extra = (node_features or {}).get(_TASK_NODE_ID, {}) builder.add_task( task_id=_TASK_NODE_ID, query=query, embedding=task_emb, **task_extra, ) pos = {a.agent_id: i for i, a in enumerate(agents)} for s, t in workflow_edges: if s not in pos or t not in pos: continue w = 1.0 if anchor and s == anchor: w += 0.1 if pos.get(s, 0) <= pos.get(t, 0): w += 0.05 extra = (edge_features or {}).get((s, t), {}) builder.add_workflow_edge( source=s, target=t, weight=round(w, 3), **extra, ) if include_task_node: builder.connect_task_to_agents(_TASK_NODE_ID, bidirectional=True) rg = builder.build() if answer: object.__setattr__(rg, "answer", answer) return rg def build_from_adjacency( agents: Sequence[Any], adjacency: torch.Tensor, *, query: str = "", answer: str = "", threshold: float = 0.5, config: BuilderConfig | None = None, ) -> "RoleGraph": """ Build a `RoleGraph` from an adjacency matrix. Args: agents: List of agents (`AgentProfile`). adjacency: Square weight/connectivity matrix (torch.Tensor). query: Task text placed in the task node. answer: Known answer (added to the final graph). threshold: Threshold above which a connection is considered to exist. config: Builder configuration (task node is disabled). """ n = len(agents) if tuple(adjacency.shape) != (n, n): msg = f"Adjacency shape {tuple(adjacency.shape)} doesn't match {n} agents" raise ValueError(msg) edges = [ (agents[i].agent_id, agents[j].agent_id) for i in range(n) for j in range(n) if i != j and adjacency[i, j] > threshold ] config = config or BuilderConfig(include_task_node=False) config.include_task_node = False return build_property_graph( agents, edges, query=query, answer=answer, config=config, )