Mini-Agent / mini_agent /acp /__init__.py
AbdulElahGwaith's picture
Upload folder using huggingface_hub
dc893fb verified
"""ACP (Agent Client Protocol) bridge for Mini-Agent."""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4
from acp import (
PROTOCOL_VERSION,
AgentSideConnection,
CancelNotification,
InitializeRequest,
InitializeResponse,
NewSessionRequest,
NewSessionResponse,
PromptRequest,
PromptResponse,
session_notification,
start_tool_call,
stdio_streams,
text_block,
tool_content,
update_agent_message,
update_agent_thought,
update_tool_call,
)
from pydantic import field_validator
from acp.schema import AgentCapabilities, Implementation, McpCapabilities
from mini_agent.agent import Agent
from mini_agent.cli import add_workspace_tools, initialize_base_tools
from mini_agent.config import Config
from mini_agent.llm import LLMClient
from mini_agent.retry import RetryConfig as RetryConfigBase
from mini_agent.schema import Message
logger = logging.getLogger(__name__)
try:
class InitializeRequestPatch(InitializeRequest):
@field_validator("protocolVersion", mode="before")
@classmethod
def normalize_protocol_version(cls, value: Any) -> int:
if isinstance(value, str):
try:
return int(value.split(".")[0])
except Exception:
return 1
if isinstance(value, (int, float)):
return int(value)
return 1
InitializeRequest = InitializeRequestPatch
InitializeRequest.model_rebuild(force=True)
except Exception: # pragma: no cover - defensive
logger.debug("ACP schema patch skipped")
@dataclass
class SessionState:
agent: Agent
cancelled: bool = False
class MiniMaxACPAgent:
"""Minimal ACP adapter wrapping the existing Agent runtime."""
def __init__(
self,
conn: AgentSideConnection,
config: Config,
llm: LLMClient,
base_tools: list,
system_prompt: str,
):
self._conn = conn
self._config = config
self._llm = llm
self._base_tools = base_tools
self._system_prompt = system_prompt
self._sessions: dict[str, SessionState] = {}
async def initialize(self, params: InitializeRequest) -> InitializeResponse: # noqa: ARG002
return InitializeResponse(
protocolVersion=PROTOCOL_VERSION,
agentCapabilities=AgentCapabilities(loadSession=False),
agentInfo=Implementation(name="mini-agent", title="Mini-Agent", version="0.1.0"),
)
async def newSession(self, params: NewSessionRequest) -> NewSessionResponse:
session_id = f"sess-{len(self._sessions)}-{uuid4().hex[:8]}"
workspace = Path(params.cwd or self._config.agent.workspace_dir).expanduser()
if not workspace.is_absolute():
workspace = workspace.resolve()
tools = list(self._base_tools)
add_workspace_tools(tools, self._config, workspace)
agent = Agent(llm_client=self._llm, system_prompt=self._system_prompt, tools=tools, max_steps=self._config.agent.max_steps, workspace_dir=str(workspace))
self._sessions[session_id] = SessionState(agent=agent)
return NewSessionResponse(sessionId=session_id)
async def prompt(self, params: PromptRequest) -> PromptResponse:
state = self._sessions.get(params.sessionId)
if not state:
# Auto-create session if not found (compatibility with clients that skip newSession)
logger.warning(f"Session '{params.sessionId}' not found, auto-creating new session")
new_session = await self.newSession(NewSessionRequest(cwd=None))
state = self._sessions.get(new_session.sessionId)
if not state:
logger.error("Failed to auto-create session")
return PromptResponse(stopReason="refusal")
state.cancelled = False
user_text = "\n".join(block.get("text", "") if isinstance(block, dict) else getattr(block, "text", "") for block in params.prompt)
state.agent.messages.append(Message(role="user", content=user_text))
stop_reason = await self._run_turn(state, params.sessionId)
return PromptResponse(stopReason=stop_reason)
async def cancel(self, params: CancelNotification) -> None:
state = self._sessions.get(params.sessionId)
if state:
state.cancelled = True
async def _run_turn(self, state: SessionState, session_id: str) -> str:
agent = state.agent
for _ in range(agent.max_steps):
if state.cancelled:
return "cancelled"
tool_schemas = [tool.to_schema() for tool in agent.tools.values()]
try:
response = await agent.llm.generate(messages=agent.messages, tools=tool_schemas)
except Exception as exc:
logger.exception("LLM error")
await self._send(session_id, update_agent_message(text_block(f"Error: {exc}")))
return "refusal"
if response.thinking:
await self._send(session_id, update_agent_thought(text_block(response.thinking)))
if response.content:
await self._send(session_id, update_agent_message(text_block(response.content)))
agent.messages.append(Message(role="assistant", content=response.content, thinking=response.thinking, tool_calls=response.tool_calls))
if not response.tool_calls:
return "end_turn"
for call in response.tool_calls:
name, args = call.function.name, call.function.arguments
# Show tool name with key arguments for better visibility
args_preview = ", ".join(f"{k}={repr(v)[:50]}" for k, v in list(args.items())[:2]) if isinstance(args, dict) else ""
label = f"🔧 {name}({args_preview})" if args_preview else f"🔧 {name}()"
await self._send(session_id, start_tool_call(call.id, label, kind="execute", raw_input=args))
tool = agent.tools.get(name)
if not tool:
text, status = f"❌ Unknown tool: {name}", "failed"
else:
try:
result = await tool.execute(**args)
status = "completed" if result.success else "failed"
prefix = "✅" if result.success else "❌"
text = f"{prefix} {result.content if result.success else result.error or 'Tool execution failed'}"
except Exception as exc:
status, text = "failed", f"❌ Tool error: {exc}"
await self._send(session_id, update_tool_call(call.id, status=status, content=[tool_content(text_block(text))], raw_output=text))
agent.messages.append(Message(role="tool", content=text, tool_call_id=call.id, name=name))
return "max_turn_requests"
async def _send(self, session_id: str, update: Any) -> None:
await self._conn.sessionUpdate(session_notification(session_id, update))
async def run_acp_server(config: Config | None = None) -> None:
"""Run Mini-Agent as an ACP-compatible stdio server."""
config = config or Config.load()
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
base_tools, skill_loader = await initialize_base_tools(config)
prompt_path = Config.find_config_file(config.agent.system_prompt_path)
if prompt_path and prompt_path.exists():
system_prompt = prompt_path.read_text(encoding="utf-8")
else:
system_prompt = "You are a helpful AI assistant."
if skill_loader:
meta = skill_loader.get_skills_metadata_prompt()
if meta:
system_prompt = f"{system_prompt.rstrip()}\n\n{meta}"
rcfg = config.llm.retry
llm = LLMClient(api_key=config.llm.api_key, api_base=config.llm.api_base, model=config.llm.model, retry_config=RetryConfigBase(enabled=rcfg.enabled, max_retries=rcfg.max_retries, initial_delay=rcfg.initial_delay, max_delay=rcfg.max_delay, exponential_base=rcfg.exponential_base))
reader, writer = await stdio_streams()
AgentSideConnection(lambda conn: MiniMaxACPAgent(conn, config, llm, base_tools, system_prompt), writer, reader)
logger.info("Mini-Agent ACP server running")
await asyncio.Event().wait()
def main() -> None:
asyncio.run(run_acp_server())
__all__ = ["MiniMaxACPAgent", "run_acp_server", "main"]