|
|
""" |
|
|
Agentic Capabilities Module for MiniMind Max2 |
|
|
Function calling, tool use, and agent behaviors. |
|
|
""" |
|
|
|
|
|
from dataclasses import dataclass, field |
|
|
from typing import List, Optional, Dict, Any, Callable, Union |
|
|
import torch |
|
|
import torch.nn as nn |
|
|
import torch.nn.functional as F |
|
|
import json |
|
|
import re |
|
|
from enum import Enum |
|
|
|
|
|
|
|
|
class ToolType(Enum): |
|
|
"""Types of tools/functions.""" |
|
|
FUNCTION = "function" |
|
|
API = "api" |
|
|
CODE_EXEC = "code_execution" |
|
|
RETRIEVAL = "retrieval" |
|
|
BROWSER = "browser" |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class FunctionCallingConfig: |
|
|
"""Configuration for function calling.""" |
|
|
|
|
|
tool_call_start: str = "<tool_call>" |
|
|
tool_call_end: str = "</tool_call>" |
|
|
tool_result_start: str = "<tool_result>" |
|
|
tool_result_end: str = "</tool_result>" |
|
|
|
|
|
|
|
|
max_tool_calls: int = 5 |
|
|
parallel_tool_calls: bool = True |
|
|
strict_json: bool = True |
|
|
|
|
|
|
|
|
function_calling_weight: float = 1.0 |
|
|
schema_embedding_dim: int = 256 |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ToolDefinition: |
|
|
"""Definition of a callable tool.""" |
|
|
name: str |
|
|
description: str |
|
|
parameters: Dict[str, Any] |
|
|
required: List[str] = field(default_factory=list) |
|
|
tool_type: ToolType = ToolType.FUNCTION |
|
|
|
|
|
def to_schema(self) -> Dict[str, Any]: |
|
|
"""Convert to JSON schema format.""" |
|
|
return { |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": self.name, |
|
|
"description": self.description, |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": self.parameters, |
|
|
"required": self.required, |
|
|
}, |
|
|
}, |
|
|
} |
|
|
|
|
|
def to_prompt(self) -> str: |
|
|
"""Convert to prompt format for training.""" |
|
|
params_str = ", ".join([ |
|
|
f"{k}: {v.get('type', 'any')}" |
|
|
for k, v in self.parameters.items() |
|
|
]) |
|
|
return f"{self.name}({params_str}) - {self.description}" |
|
|
|
|
|
|
|
|
class ToolRegistry: |
|
|
"""Registry for managing available tools.""" |
|
|
|
|
|
def __init__(self): |
|
|
self.tools: Dict[str, ToolDefinition] = {} |
|
|
self.handlers: Dict[str, Callable] = {} |
|
|
|
|
|
def register( |
|
|
self, |
|
|
name: str, |
|
|
description: str, |
|
|
parameters: Dict[str, Any], |
|
|
required: Optional[List[str]] = None, |
|
|
handler: Optional[Callable] = None, |
|
|
tool_type: ToolType = ToolType.FUNCTION, |
|
|
) -> None: |
|
|
"""Register a new tool.""" |
|
|
self.tools[name] = ToolDefinition( |
|
|
name=name, |
|
|
description=description, |
|
|
parameters=parameters, |
|
|
required=required or [], |
|
|
tool_type=tool_type, |
|
|
) |
|
|
if handler: |
|
|
self.handlers[name] = handler |
|
|
|
|
|
def get_tool(self, name: str) -> Optional[ToolDefinition]: |
|
|
"""Get tool definition by name.""" |
|
|
return self.tools.get(name) |
|
|
|
|
|
def execute(self, name: str, **kwargs) -> Any: |
|
|
"""Execute a registered tool.""" |
|
|
if name not in self.handlers: |
|
|
raise ValueError(f"No handler registered for tool: {name}") |
|
|
return self.handlers[name](**kwargs) |
|
|
|
|
|
def get_all_schemas(self) -> List[Dict[str, Any]]: |
|
|
"""Get all tool schemas.""" |
|
|
return [tool.to_schema() for tool in self.tools.values()] |
|
|
|
|
|
def get_tools_prompt(self) -> str: |
|
|
"""Generate prompt describing all tools.""" |
|
|
tools_desc = "\n".join([ |
|
|
f"- {tool.to_prompt()}" |
|
|
for tool in self.tools.values() |
|
|
]) |
|
|
return f"Available tools:\n{tools_desc}" |
|
|
|
|
|
|
|
|
class ToolCallParser: |
|
|
"""Parse and validate tool calls from model output.""" |
|
|
|
|
|
def __init__(self, config: FunctionCallingConfig): |
|
|
self.config = config |
|
|
|
|
|
def extract_tool_calls(self, text: str) -> List[Dict[str, Any]]: |
|
|
"""Extract tool calls from model output.""" |
|
|
pattern = rf"{re.escape(self.config.tool_call_start)}(.*?){re.escape(self.config.tool_call_end)}" |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
|
|
|
calls = [] |
|
|
for match in matches: |
|
|
try: |
|
|
call = json.loads(match.strip()) |
|
|
calls.append(call) |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
parsed = self._parse_function_format(match.strip()) |
|
|
if parsed: |
|
|
calls.append(parsed) |
|
|
|
|
|
return calls |
|
|
|
|
|
def _parse_function_format(self, text: str) -> Optional[Dict[str, Any]]: |
|
|
"""Parse function(arg1=val1, arg2=val2) format.""" |
|
|
match = re.match(r"(\w+)\((.*)\)", text, re.DOTALL) |
|
|
if not match: |
|
|
return None |
|
|
|
|
|
name = match.group(1) |
|
|
args_str = match.group(2) |
|
|
|
|
|
|
|
|
args = {} |
|
|
for arg_match in re.finditer(r"(\w+)\s*=\s*([^,]+)", args_str): |
|
|
key = arg_match.group(1) |
|
|
value = arg_match.group(2).strip() |
|
|
|
|
|
|
|
|
try: |
|
|
args[key] = json.loads(value) |
|
|
except: |
|
|
args[key] = value.strip('"\'') |
|
|
|
|
|
return {"name": name, "arguments": args} |
|
|
|
|
|
def format_tool_call(self, name: str, arguments: Dict[str, Any]) -> str: |
|
|
"""Format a tool call for output.""" |
|
|
call = {"name": name, "arguments": arguments} |
|
|
return f"{self.config.tool_call_start}{json.dumps(call)}{self.config.tool_call_end}" |
|
|
|
|
|
def format_tool_result(self, result: Any) -> str: |
|
|
"""Format a tool result for input.""" |
|
|
if isinstance(result, (dict, list)): |
|
|
result_str = json.dumps(result) |
|
|
else: |
|
|
result_str = str(result) |
|
|
return f"{self.config.tool_result_start}{result_str}{self.config.tool_result_end}" |
|
|
|
|
|
|
|
|
class SchemaEncoder(nn.Module): |
|
|
"""Encode tool schemas for the model.""" |
|
|
|
|
|
def __init__(self, config: FunctionCallingConfig, hidden_size: int): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
|
|
|
|
|
|
self.encoder = nn.Sequential( |
|
|
nn.Linear(config.schema_embedding_dim, hidden_size), |
|
|
nn.GELU(), |
|
|
nn.Linear(hidden_size, hidden_size), |
|
|
) |
|
|
|
|
|
|
|
|
self.schema_embeddings = nn.Embedding(1000, config.schema_embedding_dim) |
|
|
|
|
|
def forward(self, schema_ids: torch.Tensor) -> torch.Tensor: |
|
|
"""Encode schema IDs to hidden representations.""" |
|
|
embeddings = self.schema_embeddings(schema_ids) |
|
|
return self.encoder(embeddings) |
|
|
|
|
|
|
|
|
class AgenticModule(nn.Module): |
|
|
""" |
|
|
Agentic capabilities module for MiniMind Max2. |
|
|
Handles function calling, tool use, and agent behaviors. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: FunctionCallingConfig, |
|
|
hidden_size: int, |
|
|
vocab_size: int, |
|
|
): |
|
|
super().__init__() |
|
|
self.config = config |
|
|
self.hidden_size = hidden_size |
|
|
|
|
|
|
|
|
self.tool_call_head = nn.Sequential( |
|
|
nn.Linear(hidden_size, hidden_size // 2), |
|
|
nn.GELU(), |
|
|
nn.Linear(hidden_size // 2, 2), |
|
|
) |
|
|
|
|
|
|
|
|
self.tool_selector = nn.Sequential( |
|
|
nn.Linear(hidden_size, hidden_size // 2), |
|
|
nn.GELU(), |
|
|
nn.Linear(hidden_size // 2, 100), |
|
|
) |
|
|
|
|
|
|
|
|
self.arg_enhancer = nn.Linear(hidden_size, hidden_size) |
|
|
|
|
|
|
|
|
self.schema_encoder = SchemaEncoder(config, hidden_size) |
|
|
|
|
|
|
|
|
self.parser = ToolCallParser(config) |
|
|
|
|
|
|
|
|
self.registry = ToolRegistry() |
|
|
|
|
|
def should_call_tool(self, hidden_states: torch.Tensor) -> torch.Tensor: |
|
|
"""Predict whether to call a tool at each position.""" |
|
|
return F.softmax(self.tool_call_head(hidden_states), dim=-1) |
|
|
|
|
|
def select_tool( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
available_tools: Optional[List[str]] = None, |
|
|
) -> torch.Tensor: |
|
|
"""Select which tool to call.""" |
|
|
logits = self.tool_selector(hidden_states) |
|
|
|
|
|
if available_tools is not None: |
|
|
|
|
|
num_tools = len(available_tools) |
|
|
mask = torch.ones_like(logits) * float("-inf") |
|
|
mask[..., :num_tools] = 0 |
|
|
logits = logits + mask |
|
|
|
|
|
return F.softmax(logits, dim=-1) |
|
|
|
|
|
def forward( |
|
|
self, |
|
|
hidden_states: torch.Tensor, |
|
|
tool_labels: Optional[torch.Tensor] = None, |
|
|
tool_ids: Optional[torch.Tensor] = None, |
|
|
) -> Dict[str, torch.Tensor]: |
|
|
""" |
|
|
Process hidden states for agentic capabilities. |
|
|
|
|
|
Returns: |
|
|
Dictionary with tool predictions and losses |
|
|
""" |
|
|
batch_size, seq_len, _ = hidden_states.shape |
|
|
|
|
|
|
|
|
tool_call_probs = self.should_call_tool(hidden_states) |
|
|
tool_select_probs = self.select_tool(hidden_states) |
|
|
|
|
|
|
|
|
enhanced = self.arg_enhancer(hidden_states) |
|
|
|
|
|
outputs = { |
|
|
"tool_call_probs": tool_call_probs, |
|
|
"tool_select_probs": tool_select_probs, |
|
|
"enhanced_hidden_states": enhanced, |
|
|
} |
|
|
|
|
|
|
|
|
if tool_labels is not None: |
|
|
tool_call_loss = F.cross_entropy( |
|
|
tool_call_probs.view(-1, 2), |
|
|
tool_labels.view(-1), |
|
|
ignore_index=-100, |
|
|
) |
|
|
outputs["tool_call_loss"] = tool_call_loss |
|
|
|
|
|
if tool_ids is not None: |
|
|
tool_select_loss = F.cross_entropy( |
|
|
tool_select_probs.view(-1, tool_select_probs.shape[-1]), |
|
|
tool_ids.view(-1), |
|
|
ignore_index=-100, |
|
|
) |
|
|
outputs["tool_select_loss"] = tool_select_loss |
|
|
|
|
|
return outputs |
|
|
|
|
|
def generate_tool_call( |
|
|
self, |
|
|
model: nn.Module, |
|
|
input_ids: torch.Tensor, |
|
|
tools: List[ToolDefinition], |
|
|
max_new_tokens: int = 100, |
|
|
) -> str: |
|
|
"""Generate a tool call from the model.""" |
|
|
|
|
|
tools_prompt = "\n".join([t.to_prompt() for t in tools]) |
|
|
|
|
|
|
|
|
|
|
|
generated = model.generate( |
|
|
input_ids, |
|
|
max_new_tokens=max_new_tokens, |
|
|
) |
|
|
|
|
|
|
|
|
output_text = "placeholder_output" |
|
|
tool_calls = self.parser.extract_tool_calls(output_text) |
|
|
|
|
|
return tool_calls |
|
|
|
|
|
|
|
|
class AgenticTrainer: |
|
|
"""Trainer for agentic capabilities.""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
model: nn.Module, |
|
|
agentic_module: AgenticModule, |
|
|
config: FunctionCallingConfig, |
|
|
learning_rate: float = 1e-5, |
|
|
device: str = "cuda", |
|
|
): |
|
|
self.model = model |
|
|
self.agentic = agentic_module |
|
|
self.config = config |
|
|
self.device = device |
|
|
|
|
|
|
|
|
self.optimizer = torch.optim.AdamW( |
|
|
agentic_module.parameters(), |
|
|
lr=learning_rate, |
|
|
) |
|
|
|
|
|
def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]: |
|
|
"""Single training step.""" |
|
|
self.agentic.train() |
|
|
|
|
|
input_ids = batch["input_ids"].to(self.device) |
|
|
attention_mask = batch["attention_mask"].to(self.device) |
|
|
tool_labels = batch.get("tool_labels") |
|
|
tool_ids = batch.get("tool_ids") |
|
|
|
|
|
if tool_labels is not None: |
|
|
tool_labels = tool_labels.to(self.device) |
|
|
if tool_ids is not None: |
|
|
tool_ids = tool_ids.to(self.device) |
|
|
|
|
|
|
|
|
with torch.no_grad(): |
|
|
if hasattr(self.model, 'model'): |
|
|
hidden_states, _, _ = self.model.model(input_ids, attention_mask) |
|
|
else: |
|
|
hidden_states = self.model.embed_tokens(input_ids) |
|
|
|
|
|
|
|
|
outputs = self.agentic(hidden_states, tool_labels, tool_ids) |
|
|
|
|
|
|
|
|
loss = torch.tensor(0.0, device=self.device) |
|
|
if "tool_call_loss" in outputs: |
|
|
loss = loss + outputs["tool_call_loss"] |
|
|
if "tool_select_loss" in outputs: |
|
|
loss = loss + outputs["tool_select_loss"] |
|
|
|
|
|
|
|
|
self.optimizer.zero_grad() |
|
|
loss.backward() |
|
|
self.optimizer.step() |
|
|
|
|
|
return { |
|
|
"loss": loss.item(), |
|
|
"tool_call_loss": outputs.get("tool_call_loss", torch.tensor(0.0)).item(), |
|
|
"tool_select_loss": outputs.get("tool_select_loss", torch.tensor(0.0)).item(), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_TOOLS = [ |
|
|
ToolDefinition( |
|
|
name="search", |
|
|
description="Search the web for information", |
|
|
parameters={ |
|
|
"query": {"type": "string", "description": "Search query"}, |
|
|
}, |
|
|
required=["query"], |
|
|
), |
|
|
ToolDefinition( |
|
|
name="calculate", |
|
|
description="Perform mathematical calculations", |
|
|
parameters={ |
|
|
"expression": {"type": "string", "description": "Math expression to evaluate"}, |
|
|
}, |
|
|
required=["expression"], |
|
|
), |
|
|
ToolDefinition( |
|
|
name="get_weather", |
|
|
description="Get current weather for a location", |
|
|
parameters={ |
|
|
"location": {"type": "string", "description": "City name or coordinates"}, |
|
|
}, |
|
|
required=["location"], |
|
|
), |
|
|
ToolDefinition( |
|
|
name="run_code", |
|
|
description="Execute Python code", |
|
|
parameters={ |
|
|
"code": {"type": "string", "description": "Python code to execute"}, |
|
|
"language": {"type": "string", "description": "Programming language", "default": "python"}, |
|
|
}, |
|
|
required=["code"], |
|
|
tool_type=ToolType.CODE_EXEC, |
|
|
), |
|
|
ToolDefinition( |
|
|
name="read_file", |
|
|
description="Read contents of a file", |
|
|
parameters={ |
|
|
"path": {"type": "string", "description": "File path"}, |
|
|
}, |
|
|
required=["path"], |
|
|
), |
|
|
ToolDefinition( |
|
|
name="write_file", |
|
|
description="Write contents to a file", |
|
|
parameters={ |
|
|
"path": {"type": "string", "description": "File path"}, |
|
|
"content": {"type": "string", "description": "Content to write"}, |
|
|
}, |
|
|
required=["path", "content"], |
|
|
), |
|
|
] |
|
|
|
|
|
|
|
|
def create_agentic_registry() -> ToolRegistry: |
|
|
"""Create a registry with default tools.""" |
|
|
registry = ToolRegistry() |
|
|
for tool in DEFAULT_TOOLS: |
|
|
registry.tools[tool.name] = tool |
|
|
return registry |
|
|
|