"""Agent loop: the core processing engine.""" from __future__ import annotations import asyncio import json import os import re import sys from contextlib import AsyncExitStack from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger from deeptutor.tutorbot.agent.context import ContextBuilder from deeptutor.tutorbot.agent.memory import MemoryConsolidator from deeptutor.tutorbot.agent.team import TeamManager from deeptutor.tutorbot.agent.team.tools import TeamTool from deeptutor.tutorbot.agent.subagent import SubagentManager from deeptutor.tutorbot.agent.tools.cron import CronTool from deeptutor.tutorbot.agent.tools.message import MessageTool from deeptutor.tutorbot.agent.tools.registry import ToolRegistry, build_base_tools from deeptutor.tutorbot.agent.tools.spawn import SpawnTool from deeptutor.tutorbot.bus.events import InboundMessage, OutboundMessage from deeptutor.tutorbot.bus.queue import MessageBus from deeptutor.tutorbot.providers.base import LLMProvider from deeptutor.tutorbot.session.manager import Session, SessionManager if TYPE_CHECKING: from deeptutor.tutorbot.config.schema import ChannelsConfig, ExecToolConfig, WebSearchConfig from deeptutor.tutorbot.cron.service import CronService class AgentLoop: """ The agent loop is the core processing engine. It: 1. Receives messages from the bus 2. Builds context with history, memory, skills 3. Calls the LLM 4. Executes tool calls 5. Sends responses back """ _TOOL_RESULT_MAX_CHARS = 16_000 def __init__( self, bus: MessageBus, provider: LLMProvider, workspace: Path, model: str | None = None, max_iterations: int = 40, context_window_tokens: int = 65_536, web_search_config: WebSearchConfig | None = None, web_proxy: str | None = None, exec_config: ExecToolConfig | None = None, team_max_workers: int = 5, team_worker_max_iterations: int = 25, cron_service: CronService | None = None, restrict_to_workspace: bool = False, session_manager: SessionManager | None = None, mcp_servers: dict | None = None, channels_config: ChannelsConfig | None = None, shared_memory_dir: Path | None = None, default_session_key: str | None = None, ): from deeptutor.tutorbot.config.schema import ExecToolConfig, WebSearchConfig self.bus = bus self.channels_config = channels_config self.provider = provider self.workspace = workspace self.model = model or provider.get_default_model() self.max_iterations = max_iterations self.context_window_tokens = context_window_tokens self.web_search_config = web_search_config or WebSearchConfig() self.web_proxy = web_proxy self.exec_config = exec_config or ExecToolConfig() self.cron_service = cron_service self.restrict_to_workspace = restrict_to_workspace self._shared_memory_dir = shared_memory_dir self._default_session_key = default_session_key self.context = ContextBuilder(workspace, shared_memory_dir=shared_memory_dir) self.sessions = session_manager or SessionManager(workspace) self.tools = ToolRegistry() self.subagents = SubagentManager( provider=provider, workspace=workspace, bus=bus, model=self.model, web_search_config=self.web_search_config, web_proxy=web_proxy, exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, ) self.team = TeamManager( provider=provider, workspace=workspace, bus=bus, sessions=self.sessions, model=self.model, temperature=provider.generation.temperature, max_tokens=provider.generation.max_tokens, reasoning_effort=provider.generation.reasoning_effort, web_search_config=self.web_search_config, web_proxy=web_proxy, exec_config=self.exec_config, restrict_to_workspace=restrict_to_workspace, max_workers=team_max_workers, worker_max_iterations=team_worker_max_iterations, ) self._running = False self._mcp_servers = mcp_servers or {} self._mcp_stack: AsyncExitStack | None = None self._mcp_connected = False self._mcp_connecting = False self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks self._processing_lock = asyncio.Lock() self.memory_consolidator = MemoryConsolidator( workspace=workspace, provider=provider, model=self.model, sessions=self.sessions, context_window_tokens=context_window_tokens, build_messages=self.context.build_messages, get_tool_definitions=self.tools.get_definitions, shared_memory_dir=shared_memory_dir, ) self._register_default_tools() def _register_default_tools(self) -> None: """Register the default set of tools.""" self.tools = build_base_tools( workspace=self.workspace, exec_config=self.exec_config, web_search_config=self.web_search_config, web_proxy=self.web_proxy, restrict_to_workspace=self.restrict_to_workspace, ) self.tools.register(MessageTool(send_callback=self.bus.publish_outbound)) self.tools.register(SpawnTool(manager=self.subagents)) self.tools.register(TeamTool(manager=self.team)) if self.cron_service: self.tools.register(CronTool(self.cron_service)) from deeptutor.tutorbot.agent.tools.deeptutor_tools import ( BrainstormAdapterTool, CodeExecutionAdapterTool, PaperSearchAdapterTool, RAGAdapterTool, ReasonAdapterTool, ) for tool_cls in (BrainstormAdapterTool, RAGAdapterTool, CodeExecutionAdapterTool, ReasonAdapterTool, PaperSearchAdapterTool): self.tools.register(tool_cls()) async def _connect_mcp(self) -> None: """Connect to configured MCP servers (one-time, lazy).""" if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: return self._mcp_connecting = True from deeptutor.tutorbot.agent.tools.mcp import connect_mcp_servers try: self._mcp_stack = AsyncExitStack() await self._mcp_stack.__aenter__() await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) self._mcp_connected = True except BaseException as e: logger.error("Failed to connect MCP servers (will retry next message): {}", e) if self._mcp_stack: try: await self._mcp_stack.aclose() except Exception: pass self._mcp_stack = None finally: self._mcp_connecting = False def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Update context for all tools that need routing info.""" for name in ("message", "spawn", "cron", "team"): if tool := self.tools.get(name): if hasattr(tool, "set_context"): tool.set_context(channel, chat_id, *([message_id] if name == "message" else [])) @staticmethod def _strip_think(text: str | None) -> str | None: """Remove blocks that some models embed in content.""" if not text: return None return re.sub(r"[\s\S]*?", "", text).strip() or None @staticmethod def _tool_hint(tool_calls: list) -> str: """Format tool calls as concise hint, e.g. 'web_search("query")'.""" def _fmt(tc): args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {} val = next(iter(args.values()), None) if isinstance(args, dict) else None if not isinstance(val, str): return tc.name return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' return ", ".join(_fmt(tc) for tc in tool_calls) async def _run_agent_loop( self, initial_messages: list[dict], on_progress: Callable[..., Awaitable[None]] | None = None, ) -> tuple[str | None, list[str], list[dict]]: """Run the agent iteration loop.""" messages = initial_messages iteration = 0 final_content = None tools_used: list[str] = [] while iteration < self.max_iterations: iteration += 1 tool_defs = self.tools.get_definitions() response = await self.provider.chat_with_retry( messages=messages, tools=tool_defs, model=self.model, ) if response.has_tool_calls: if on_progress: thought = self._strip_think(response.content) if thought: await on_progress(thought) await on_progress(self._tool_hint(response.tool_calls), tool_hint=True) tool_call_dicts = [ tc.to_openai_tool_call() for tc in response.tool_calls ] messages = self.context.add_assistant_message( messages, response.content, tool_call_dicts, reasoning_content=response.reasoning_content, thinking_blocks=response.thinking_blocks, ) for tool_call in response.tool_calls: tools_used.append(tool_call.name) args_str = json.dumps(tool_call.arguments, ensure_ascii=False) logger.info("Tool call: {}({})", tool_call.name, args_str[:200]) result = await self.tools.execute(tool_call.name, tool_call.arguments) messages = self.context.add_tool_result( messages, tool_call.id, tool_call.name, result ) else: clean = self._strip_think(response.content) # Don't persist error responses to session history — they can # poison the context and cause permanent 400 loops (#1303). if response.finish_reason == "error": logger.error("LLM returned error: {}", (clean or "")[:200]) final_content = clean or "Sorry, I encountered an error calling the AI model." break messages = self.context.add_assistant_message( messages, clean, reasoning_content=response.reasoning_content, thinking_blocks=response.thinking_blocks, ) final_content = clean break if final_content is None and iteration >= self.max_iterations: logger.warning("Max iterations ({}) reached", self.max_iterations) final_content = ( f"I reached the maximum number of tool call iterations ({self.max_iterations}) " "without completing the task. You can try breaking the task into smaller steps." ) return final_content, tools_used, messages async def run(self) -> None: """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" self._running = True await self._connect_mcp() logger.info("Agent loop started") while self._running: try: msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: continue except Exception as e: logger.warning("Error consuming inbound message: {}, continuing...", e) continue cmd = msg.content.strip().lower() if cmd == "/stop": await self._handle_stop(msg) elif cmd == "/restart": await self._handle_restart(msg) else: task = asyncio.create_task(self._dispatch(msg)) self._active_tasks.setdefault(msg.session_key, []).append(task) task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) async def _handle_stop(self, msg: InboundMessage) -> None: """Cancel all active tasks and subagents for the session.""" tasks = self._active_tasks.pop(msg.session_key, []) cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) for t in tasks: try: await t except (asyncio.CancelledError, Exception): pass sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) team_cancelled = await self.team.cancel_by_session(msg.session_key) if team_cancelled: session = self.sessions.get_or_create(msg.session_key) session.metadata.pop("nano_team_active", None) self.sessions.save(session) total = cancelled + sub_cancelled + team_cancelled content = f"Stopped {total} task(s)." if total else "No active task to stop." await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, )) async def _handle_restart(self, msg: InboundMessage) -> None: """Restart the process in-place via os.execv.""" await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Restarting...", )) async def _do_restart(): await asyncio.sleep(1) # Use original sys.argv to preserve entry point (tutorbot runs in-process) os.execv(sys.executable, [sys.executable] + sys.argv) asyncio.create_task(_do_restart()) async def _dispatch(self, msg: InboundMessage) -> None: """Process a message under the global lock.""" async with self._processing_lock: try: response = await self._process_message(msg) if response is not None: await self.bus.publish_outbound(response) elif msg.channel == "cli": await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="", metadata=msg.metadata or {}, )) except asyncio.CancelledError: logger.info("Task cancelled for session {}", msg.session_key) raise except Exception: logger.exception("Error processing message for session {}", msg.session_key) await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Sorry, I encountered an error.", )) async def close_mcp(self) -> None: """Close MCP connections.""" if self._mcp_stack: try: await self._mcp_stack.aclose() except (RuntimeError, BaseExceptionGroup): pass # MCP SDK cancel scope cleanup is noisy but harmless self._mcp_stack = None def stop(self) -> None: """Stop the agent loop.""" self._running = False logger.info("Agent loop stopping") async def _process_message( self, msg: InboundMessage, session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> OutboundMessage | None: """Process a single inbound message and return the response.""" # System messages: parse origin from chat_id ("channel:chat_id") if msg.channel == "system": channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id else ("cli", msg.chat_id)) logger.info("Processing system message from {}", msg.sender_id) key = f"{channel}:{chat_id}" session = self.sessions.get_or_create(key) await self.memory_consolidator.maybe_consolidate_by_tokens(session) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) history = session.get_history(max_messages=0) messages = self.context.build_messages( history=history, current_message=msg.content, channel=channel, chat_id=chat_id, ) final_content, _, all_msgs = await self._run_agent_loop(messages) self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) await self.memory_consolidator.maybe_consolidate_by_tokens(session) return OutboundMessage(channel=channel, chat_id=chat_id, content=final_content or "Background task completed.") preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) key = session_key or self._default_session_key or msg.session_key session = self.sessions.get_or_create(key) # Slash commands raw = msg.content.strip() cmd = raw.lower() if cmd == "/new": try: if not await self.memory_consolidator.archive_unconsolidated(session): return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Memory archival failed, session not cleared. Please try again.", ) except Exception: logger.exception("/new archival failed for {}", session.key) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Memory archival failed, session not cleared. Please try again.", ) session.clear() session.metadata.pop("nano_team_active", None) self.sessions.save(session) self.sessions.invalidate(session.key) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content="New session started.") if cmd == "/help": lines = [ "🐈 TutorBot commands:", "/new — Start a new conversation", "/stop — Stop the current task", "/restart — Restart the bot", "/team — Start or instruct nano team mode", "/team status — Show nano team state", "/team log [n] — Show detailed collaboration logs (default 20)", "/team approve — Approve a pending task", "/team reject — Reject a pending task", "/team manual — Send change request", "/team stop — Stop nano team mode", "/btw — Async side task via single subagent", "/help — Show available commands", ] return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="\n".join(lines), ) current_message = msg.content if cmd.startswith("/btw"): arg = raw[4:].strip() if not arg: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Usage: /btw ", ) started = await self.subagents.spawn( task=arg, label="btw", origin_channel=msg.channel, origin_chat_id=msg.chat_id, session_key=key, ) return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=started) if cmd == "/team": return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=( "Usage:\n" "/team \n" "/team status\n" "/team log [n]\n" "/team approve \n" "/team reject \n" "/team manual \n" "/team stop" ), ) if cmd.startswith("/teams "): cmd = "/team " + raw[7:].strip().lower() raw = "/team " + raw[7:].strip() if cmd.startswith("/team "): instruction = raw[6:].strip() parts = instruction.split(maxsplit=2) lowered = (parts[0] if parts else "").lower() if lowered == "status": content = self.team.status_text(key) session.metadata["nano_team_active"] = bool(self.team.has_unfinished_run(key)) self.sessions.save(session) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, metadata={"team_text": True}, ) if lowered == "log": n = 20 if len(parts) > 1: try: n = max(1, min(200, int(parts[1]))) except (TypeError, ValueError): n = 20 return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=self.team.log_text(key, n=n), metadata={"team_text": True}, ) if lowered == "stop": if msg.channel == "cli": content = await self.team.stop_mode(key, with_snapshot=True) else: content = await self.team.stop_mode(key) session.metadata.pop("nano_team_active", None) self.sessions.save(session) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, metadata={"team_text": True}, ) if lowered == "approve": task_id = parts[1] if len(parts) > 1 else "" if not task_id: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Usage: /team approve ", ) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=self.team.approve_for_session(key, task_id), metadata={"team_text": True}, ) if lowered == "reject": task_id = parts[1] if len(parts) > 1 else "" reason = parts[2] if len(parts) > 2 else "" if not task_id or not reason.strip(): return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Usage: /team reject ", ) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=self.team.reject_for_session(key, task_id, reason.strip()), metadata={"team_text": True}, ) if lowered == "manual": task_id = parts[1] if len(parts) > 1 else "" instruction_text = parts[2] if len(parts) > 2 else "" if not task_id or not instruction_text.strip(): return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content="Usage: /team manual ", ) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=self.team.request_changes_for_session(key, task_id, instruction_text.strip()), metadata={"team_text": True}, ) content = await self.team.start_or_route_goal(key, instruction) session.metadata["nano_team_active"] = self.team.is_active(key) self.sessions.save(session) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, metadata={"team_text": True}, ) if session.metadata.get("nano_team_active"): if not self.team.is_active(key): session.metadata.pop("nano_team_active", None) self.sessions.save(session) else: if msg.channel != "cli" and self.team.has_pending_approval(key): approval_reply = self.team.handle_approval_reply(key, raw) if approval_reply: return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=approval_reply, metadata={"team_text": True}, ) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=( "Team mode is active. Supported input:\n" "- /team \n" "- /btw " ), ) await self.memory_consolidator.maybe_consolidate_by_tokens(session) self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) if message_tool := self.tools.get("message"): if isinstance(message_tool, MessageTool): message_tool.start_turn() history = session.get_history(max_messages=0) initial_messages = self.context.build_messages( history=history, current_message=current_message, media=msg.media if msg.media else None, channel=msg.channel, chat_id=msg.chat_id, ) async def _bus_progress(content: str, *, tool_hint: bool = False) -> None: meta = dict(msg.metadata or {}) meta["_progress"] = True meta["_tool_hint"] = tool_hint await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, )) final_content, _, all_msgs = await self._run_agent_loop( initial_messages, on_progress=on_progress or _bus_progress, ) if final_content is None: final_content = "I've completed processing but have no response to give." self._save_turn(session, all_msgs, 1 + len(history)) self.sessions.save(session) await self.memory_consolidator.maybe_consolidate_by_tokens(session) if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: return None preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, metadata=msg.metadata or {}, ) def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None: """Save new-turn messages into session, truncating large tool results.""" for m in messages[skip:]: entry = dict(m) role, content = entry.get("role"), entry.get("content") if role == "assistant" and not content and not entry.get("tool_calls"): continue # skip empty assistant messages — they poison session context if role == "tool" and isinstance(content, str) and len(content) > self._TOOL_RESULT_MAX_CHARS: entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" elif role == "user": if isinstance(content, str) and content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG): # Strip the runtime-context prefix, keep only the user text. parts = content.split("\n\n", 1) if len(parts) > 1 and parts[1].strip(): entry["content"] = parts[1] else: continue if isinstance(content, list): filtered = [] for c in content: if c.get("type") == "text" and isinstance(c.get("text"), str) and c["text"].startswith(ContextBuilder._RUNTIME_CONTEXT_TAG): continue # Strip runtime context from multimodal messages if (c.get("type") == "image_url" and c.get("image_url", {}).get("url", "").startswith("data:image/")): filtered.append({"type": "text", "text": "[image]"}) else: filtered.append(c) if not filtered: continue entry["content"] = filtered entry.setdefault("timestamp", datetime.now().isoformat()) session.messages.append(entry) session.updated_at = datetime.now() async def process_direct( self, content: str, session_key: str = "cli:direct", channel: str = "cli", chat_id: str = "direct", on_progress: Callable[[str], Awaitable[None]] | None = None, ) -> str: """Process a message directly (for CLI or cron usage).""" await self._connect_mcp() msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) return response.content if response else ""