Spaces:
Sleeping
Sleeping
| """Agent loop: the core processing engine.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from contextlib import AsyncExitStack | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING, Any, Awaitable, Callable | |
| from loguru import logger | |
| from deeptutor.tutorbot.agent.context import ContextBuilder | |
| from deeptutor.tutorbot.agent.memory import MemoryConsolidator | |
| from deeptutor.tutorbot.agent.team import TeamManager | |
| from deeptutor.tutorbot.agent.team.tools import TeamTool | |
| from deeptutor.tutorbot.agent.subagent import SubagentManager | |
| from deeptutor.tutorbot.agent.tools.cron import CronTool | |
| from deeptutor.tutorbot.agent.tools.message import MessageTool | |
| from deeptutor.tutorbot.agent.tools.registry import ToolRegistry, build_base_tools | |
| from deeptutor.tutorbot.agent.tools.spawn import SpawnTool | |
| from deeptutor.tutorbot.bus.events import InboundMessage, OutboundMessage | |
| from deeptutor.tutorbot.bus.queue import MessageBus | |
| from deeptutor.tutorbot.providers.base import LLMProvider | |
| from deeptutor.tutorbot.session.manager import Session, SessionManager | |
| if TYPE_CHECKING: | |
| from deeptutor.tutorbot.config.schema import ChannelsConfig, ExecToolConfig, WebSearchConfig | |
| from deeptutor.tutorbot.cron.service import CronService | |
| class AgentLoop: | |
| """ | |
| The agent loop is the core processing engine. | |
| It: | |
| 1. Receives messages from the bus | |
| 2. Builds context with history, memory, skills | |
| 3. Calls the LLM | |
| 4. Executes tool calls | |
| 5. Sends responses back | |
| """ | |
| _TOOL_RESULT_MAX_CHARS = 16_000 | |
| def __init__( | |
| self, | |
| bus: MessageBus, | |
| provider: LLMProvider, | |
| workspace: Path, | |
| model: str | None = None, | |
| max_iterations: int = 40, | |
| context_window_tokens: int = 65_536, | |
| web_search_config: WebSearchConfig | None = None, | |
| web_proxy: str | None = None, | |
| exec_config: ExecToolConfig | None = None, | |
| team_max_workers: int = 5, | |
| team_worker_max_iterations: int = 25, | |
| cron_service: CronService | None = None, | |
| restrict_to_workspace: bool = False, | |
| session_manager: SessionManager | None = None, | |
| mcp_servers: dict | None = None, | |
| channels_config: ChannelsConfig | None = None, | |
| shared_memory_dir: Path | None = None, | |
| default_session_key: str | None = None, | |
| ): | |
| from deeptutor.tutorbot.config.schema import ExecToolConfig, WebSearchConfig | |
| self.bus = bus | |
| self.channels_config = channels_config | |
| self.provider = provider | |
| self.workspace = workspace | |
| self.model = model or provider.get_default_model() | |
| self.max_iterations = max_iterations | |
| self.context_window_tokens = context_window_tokens | |
| self.web_search_config = web_search_config or WebSearchConfig() | |
| self.web_proxy = web_proxy | |
| self.exec_config = exec_config or ExecToolConfig() | |
| self.cron_service = cron_service | |
| self.restrict_to_workspace = restrict_to_workspace | |
| self._shared_memory_dir = shared_memory_dir | |
| self._default_session_key = default_session_key | |
| self.context = ContextBuilder(workspace, shared_memory_dir=shared_memory_dir) | |
| self.sessions = session_manager or SessionManager(workspace) | |
| self.tools = ToolRegistry() | |
| self.subagents = SubagentManager( | |
| provider=provider, | |
| workspace=workspace, | |
| bus=bus, | |
| model=self.model, | |
| web_search_config=self.web_search_config, | |
| web_proxy=web_proxy, | |
| exec_config=self.exec_config, | |
| restrict_to_workspace=restrict_to_workspace, | |
| ) | |
| self.team = TeamManager( | |
| provider=provider, | |
| workspace=workspace, | |
| bus=bus, | |
| sessions=self.sessions, | |
| model=self.model, | |
| temperature=provider.generation.temperature, | |
| max_tokens=provider.generation.max_tokens, | |
| reasoning_effort=provider.generation.reasoning_effort, | |
| web_search_config=self.web_search_config, | |
| web_proxy=web_proxy, | |
| exec_config=self.exec_config, | |
| restrict_to_workspace=restrict_to_workspace, | |
| max_workers=team_max_workers, | |
| worker_max_iterations=team_worker_max_iterations, | |
| ) | |
| self._running = False | |
| self._mcp_servers = mcp_servers or {} | |
| self._mcp_stack: AsyncExitStack | None = None | |
| self._mcp_connected = False | |
| self._mcp_connecting = False | |
| self._active_tasks: dict[str, list[asyncio.Task]] = {} # session_key -> tasks | |
| self._processing_lock = asyncio.Lock() | |
| self.memory_consolidator = MemoryConsolidator( | |
| workspace=workspace, | |
| provider=provider, | |
| model=self.model, | |
| sessions=self.sessions, | |
| context_window_tokens=context_window_tokens, | |
| build_messages=self.context.build_messages, | |
| get_tool_definitions=self.tools.get_definitions, | |
| shared_memory_dir=shared_memory_dir, | |
| ) | |
| self._register_default_tools() | |
| def _register_default_tools(self) -> None: | |
| """Register the default set of tools.""" | |
| self.tools = build_base_tools( | |
| workspace=self.workspace, | |
| exec_config=self.exec_config, | |
| web_search_config=self.web_search_config, | |
| web_proxy=self.web_proxy, | |
| restrict_to_workspace=self.restrict_to_workspace, | |
| ) | |
| self.tools.register(MessageTool(send_callback=self.bus.publish_outbound)) | |
| self.tools.register(SpawnTool(manager=self.subagents)) | |
| self.tools.register(TeamTool(manager=self.team)) | |
| if self.cron_service: | |
| self.tools.register(CronTool(self.cron_service)) | |
| from deeptutor.tutorbot.agent.tools.deeptutor_tools import ( | |
| BrainstormAdapterTool, | |
| CodeExecutionAdapterTool, | |
| PaperSearchAdapterTool, | |
| RAGAdapterTool, | |
| ReasonAdapterTool, | |
| ) | |
| for tool_cls in (BrainstormAdapterTool, RAGAdapterTool, | |
| CodeExecutionAdapterTool, ReasonAdapterTool, | |
| PaperSearchAdapterTool): | |
| self.tools.register(tool_cls()) | |
| async def _connect_mcp(self) -> None: | |
| """Connect to configured MCP servers (one-time, lazy).""" | |
| if self._mcp_connected or self._mcp_connecting or not self._mcp_servers: | |
| return | |
| self._mcp_connecting = True | |
| from deeptutor.tutorbot.agent.tools.mcp import connect_mcp_servers | |
| try: | |
| self._mcp_stack = AsyncExitStack() | |
| await self._mcp_stack.__aenter__() | |
| await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack) | |
| self._mcp_connected = True | |
| except BaseException as e: | |
| logger.error("Failed to connect MCP servers (will retry next message): {}", e) | |
| if self._mcp_stack: | |
| try: | |
| await self._mcp_stack.aclose() | |
| except Exception: | |
| pass | |
| self._mcp_stack = None | |
| finally: | |
| self._mcp_connecting = False | |
| def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: | |
| """Update context for all tools that need routing info.""" | |
| for name in ("message", "spawn", "cron", "team"): | |
| if tool := self.tools.get(name): | |
| if hasattr(tool, "set_context"): | |
| tool.set_context(channel, chat_id, *([message_id] if name == "message" else [])) | |
| def _strip_think(text: str | None) -> str | None: | |
| """Remove <think>…</think> blocks that some models embed in content.""" | |
| if not text: | |
| return None | |
| return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None | |
| def _tool_hint(tool_calls: list) -> str: | |
| """Format tool calls as concise hint, e.g. 'web_search("query")'.""" | |
| def _fmt(tc): | |
| args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {} | |
| val = next(iter(args.values()), None) if isinstance(args, dict) else None | |
| if not isinstance(val, str): | |
| return tc.name | |
| return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' | |
| return ", ".join(_fmt(tc) for tc in tool_calls) | |
| async def _run_agent_loop( | |
| self, | |
| initial_messages: list[dict], | |
| on_progress: Callable[..., Awaitable[None]] | None = None, | |
| ) -> tuple[str | None, list[str], list[dict]]: | |
| """Run the agent iteration loop.""" | |
| messages = initial_messages | |
| iteration = 0 | |
| final_content = None | |
| tools_used: list[str] = [] | |
| while iteration < self.max_iterations: | |
| iteration += 1 | |
| tool_defs = self.tools.get_definitions() | |
| response = await self.provider.chat_with_retry( | |
| messages=messages, | |
| tools=tool_defs, | |
| model=self.model, | |
| ) | |
| if response.has_tool_calls: | |
| if on_progress: | |
| thought = self._strip_think(response.content) | |
| if thought: | |
| await on_progress(thought) | |
| await on_progress(self._tool_hint(response.tool_calls), tool_hint=True) | |
| tool_call_dicts = [ | |
| tc.to_openai_tool_call() | |
| for tc in response.tool_calls | |
| ] | |
| messages = self.context.add_assistant_message( | |
| messages, response.content, tool_call_dicts, | |
| reasoning_content=response.reasoning_content, | |
| thinking_blocks=response.thinking_blocks, | |
| ) | |
| for tool_call in response.tool_calls: | |
| tools_used.append(tool_call.name) | |
| args_str = json.dumps(tool_call.arguments, ensure_ascii=False) | |
| logger.info("Tool call: {}({})", tool_call.name, args_str[:200]) | |
| result = await self.tools.execute(tool_call.name, tool_call.arguments) | |
| messages = self.context.add_tool_result( | |
| messages, tool_call.id, tool_call.name, result | |
| ) | |
| else: | |
| clean = self._strip_think(response.content) | |
| # Don't persist error responses to session history — they can | |
| # poison the context and cause permanent 400 loops (#1303). | |
| if response.finish_reason == "error": | |
| logger.error("LLM returned error: {}", (clean or "")[:200]) | |
| final_content = clean or "Sorry, I encountered an error calling the AI model." | |
| break | |
| messages = self.context.add_assistant_message( | |
| messages, clean, reasoning_content=response.reasoning_content, | |
| thinking_blocks=response.thinking_blocks, | |
| ) | |
| final_content = clean | |
| break | |
| if final_content is None and iteration >= self.max_iterations: | |
| logger.warning("Max iterations ({}) reached", self.max_iterations) | |
| final_content = ( | |
| f"I reached the maximum number of tool call iterations ({self.max_iterations}) " | |
| "without completing the task. You can try breaking the task into smaller steps." | |
| ) | |
| return final_content, tools_used, messages | |
| async def run(self) -> None: | |
| """Run the agent loop, dispatching messages as tasks to stay responsive to /stop.""" | |
| self._running = True | |
| await self._connect_mcp() | |
| logger.info("Agent loop started") | |
| while self._running: | |
| try: | |
| msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) | |
| except asyncio.TimeoutError: | |
| continue | |
| except Exception as e: | |
| logger.warning("Error consuming inbound message: {}, continuing...", e) | |
| continue | |
| cmd = msg.content.strip().lower() | |
| if cmd == "/stop": | |
| await self._handle_stop(msg) | |
| elif cmd == "/restart": | |
| await self._handle_restart(msg) | |
| else: | |
| task = asyncio.create_task(self._dispatch(msg)) | |
| self._active_tasks.setdefault(msg.session_key, []).append(task) | |
| task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None) | |
| async def _handle_stop(self, msg: InboundMessage) -> None: | |
| """Cancel all active tasks and subagents for the session.""" | |
| tasks = self._active_tasks.pop(msg.session_key, []) | |
| cancelled = sum(1 for t in tasks if not t.done() and t.cancel()) | |
| for t in tasks: | |
| try: | |
| await t | |
| except (asyncio.CancelledError, Exception): | |
| pass | |
| sub_cancelled = await self.subagents.cancel_by_session(msg.session_key) | |
| team_cancelled = await self.team.cancel_by_session(msg.session_key) | |
| if team_cancelled: | |
| session = self.sessions.get_or_create(msg.session_key) | |
| session.metadata.pop("nano_team_active", None) | |
| self.sessions.save(session) | |
| total = cancelled + sub_cancelled + team_cancelled | |
| content = f"Stopped {total} task(s)." if total else "No active task to stop." | |
| await self.bus.publish_outbound(OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, content=content, | |
| )) | |
| async def _handle_restart(self, msg: InboundMessage) -> None: | |
| """Restart the process in-place via os.execv.""" | |
| await self.bus.publish_outbound(OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, content="Restarting...", | |
| )) | |
| async def _do_restart(): | |
| await asyncio.sleep(1) | |
| # Use original sys.argv to preserve entry point (tutorbot runs in-process) | |
| os.execv(sys.executable, [sys.executable] + sys.argv) | |
| asyncio.create_task(_do_restart()) | |
| async def _dispatch(self, msg: InboundMessage) -> None: | |
| """Process a message under the global lock.""" | |
| async with self._processing_lock: | |
| try: | |
| response = await self._process_message(msg) | |
| if response is not None: | |
| await self.bus.publish_outbound(response) | |
| elif msg.channel == "cli": | |
| await self.bus.publish_outbound(OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, | |
| content="", metadata=msg.metadata or {}, | |
| )) | |
| except asyncio.CancelledError: | |
| logger.info("Task cancelled for session {}", msg.session_key) | |
| raise | |
| except Exception: | |
| logger.exception("Error processing message for session {}", msg.session_key) | |
| await self.bus.publish_outbound(OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, | |
| content="Sorry, I encountered an error.", | |
| )) | |
| async def close_mcp(self) -> None: | |
| """Close MCP connections.""" | |
| if self._mcp_stack: | |
| try: | |
| await self._mcp_stack.aclose() | |
| except (RuntimeError, BaseExceptionGroup): | |
| pass # MCP SDK cancel scope cleanup is noisy but harmless | |
| self._mcp_stack = None | |
| def stop(self) -> None: | |
| """Stop the agent loop.""" | |
| self._running = False | |
| logger.info("Agent loop stopping") | |
| async def _process_message( | |
| self, | |
| msg: InboundMessage, | |
| session_key: str | None = None, | |
| on_progress: Callable[[str], Awaitable[None]] | None = None, | |
| ) -> OutboundMessage | None: | |
| """Process a single inbound message and return the response.""" | |
| # System messages: parse origin from chat_id ("channel:chat_id") | |
| if msg.channel == "system": | |
| channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id | |
| else ("cli", msg.chat_id)) | |
| logger.info("Processing system message from {}", msg.sender_id) | |
| key = f"{channel}:{chat_id}" | |
| session = self.sessions.get_or_create(key) | |
| await self.memory_consolidator.maybe_consolidate_by_tokens(session) | |
| self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) | |
| history = session.get_history(max_messages=0) | |
| messages = self.context.build_messages( | |
| history=history, | |
| current_message=msg.content, channel=channel, chat_id=chat_id, | |
| ) | |
| final_content, _, all_msgs = await self._run_agent_loop(messages) | |
| self._save_turn(session, all_msgs, 1 + len(history)) | |
| self.sessions.save(session) | |
| await self.memory_consolidator.maybe_consolidate_by_tokens(session) | |
| return OutboundMessage(channel=channel, chat_id=chat_id, | |
| content=final_content or "Background task completed.") | |
| preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content | |
| logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview) | |
| key = session_key or self._default_session_key or msg.session_key | |
| session = self.sessions.get_or_create(key) | |
| # Slash commands | |
| raw = msg.content.strip() | |
| cmd = raw.lower() | |
| if cmd == "/new": | |
| try: | |
| if not await self.memory_consolidator.archive_unconsolidated(session): | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Memory archival failed, session not cleared. Please try again.", | |
| ) | |
| except Exception: | |
| logger.exception("/new archival failed for {}", session.key) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Memory archival failed, session not cleared. Please try again.", | |
| ) | |
| session.clear() | |
| session.metadata.pop("nano_team_active", None) | |
| self.sessions.save(session) | |
| self.sessions.invalidate(session.key) | |
| return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, | |
| content="New session started.") | |
| if cmd == "/help": | |
| lines = [ | |
| "🐈 TutorBot commands:", | |
| "/new — Start a new conversation", | |
| "/stop — Stop the current task", | |
| "/restart — Restart the bot", | |
| "/team <goal> — Start or instruct nano team mode", | |
| "/team status — Show nano team state", | |
| "/team log [n] — Show detailed collaboration logs (default 20)", | |
| "/team approve <task_id> — Approve a pending task", | |
| "/team reject <task_id> <reason> — Reject a pending task", | |
| "/team manual <task_id> <instruction> — Send change request", | |
| "/team stop — Stop nano team mode", | |
| "/btw <instruction> — Async side task via single subagent", | |
| "/help — Show available commands", | |
| ] | |
| return OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, content="\n".join(lines), | |
| ) | |
| current_message = msg.content | |
| if cmd.startswith("/btw"): | |
| arg = raw[4:].strip() | |
| if not arg: | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Usage: /btw <instruction>", | |
| ) | |
| started = await self.subagents.spawn( | |
| task=arg, | |
| label="btw", | |
| origin_channel=msg.channel, | |
| origin_chat_id=msg.chat_id, | |
| session_key=key, | |
| ) | |
| return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id, content=started) | |
| if cmd == "/team": | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=( | |
| "Usage:\n" | |
| "/team <goal>\n" | |
| "/team status\n" | |
| "/team log [n]\n" | |
| "/team approve <task_id>\n" | |
| "/team reject <task_id> <reason>\n" | |
| "/team manual <task_id> <instruction>\n" | |
| "/team stop" | |
| ), | |
| ) | |
| if cmd.startswith("/teams "): | |
| cmd = "/team " + raw[7:].strip().lower() | |
| raw = "/team " + raw[7:].strip() | |
| if cmd.startswith("/team "): | |
| instruction = raw[6:].strip() | |
| parts = instruction.split(maxsplit=2) | |
| lowered = (parts[0] if parts else "").lower() | |
| if lowered == "status": | |
| content = self.team.status_text(key) | |
| session.metadata["nano_team_active"] = bool(self.team.has_unfinished_run(key)) | |
| self.sessions.save(session) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=content, | |
| metadata={"team_text": True}, | |
| ) | |
| if lowered == "log": | |
| n = 20 | |
| if len(parts) > 1: | |
| try: | |
| n = max(1, min(200, int(parts[1]))) | |
| except (TypeError, ValueError): | |
| n = 20 | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=self.team.log_text(key, n=n), | |
| metadata={"team_text": True}, | |
| ) | |
| if lowered == "stop": | |
| if msg.channel == "cli": | |
| content = await self.team.stop_mode(key, with_snapshot=True) | |
| else: | |
| content = await self.team.stop_mode(key) | |
| session.metadata.pop("nano_team_active", None) | |
| self.sessions.save(session) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=content, | |
| metadata={"team_text": True}, | |
| ) | |
| if lowered == "approve": | |
| task_id = parts[1] if len(parts) > 1 else "" | |
| if not task_id: | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Usage: /team approve <task_id>", | |
| ) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=self.team.approve_for_session(key, task_id), | |
| metadata={"team_text": True}, | |
| ) | |
| if lowered == "reject": | |
| task_id = parts[1] if len(parts) > 1 else "" | |
| reason = parts[2] if len(parts) > 2 else "" | |
| if not task_id or not reason.strip(): | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Usage: /team reject <task_id> <reason>", | |
| ) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=self.team.reject_for_session(key, task_id, reason.strip()), | |
| metadata={"team_text": True}, | |
| ) | |
| if lowered == "manual": | |
| task_id = parts[1] if len(parts) > 1 else "" | |
| instruction_text = parts[2] if len(parts) > 2 else "" | |
| if not task_id or not instruction_text.strip(): | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content="Usage: /team manual <task_id> <instruction>", | |
| ) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=self.team.request_changes_for_session(key, task_id, instruction_text.strip()), | |
| metadata={"team_text": True}, | |
| ) | |
| content = await self.team.start_or_route_goal(key, instruction) | |
| session.metadata["nano_team_active"] = self.team.is_active(key) | |
| self.sessions.save(session) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=content, | |
| metadata={"team_text": True}, | |
| ) | |
| if session.metadata.get("nano_team_active"): | |
| if not self.team.is_active(key): | |
| session.metadata.pop("nano_team_active", None) | |
| self.sessions.save(session) | |
| else: | |
| if msg.channel != "cli" and self.team.has_pending_approval(key): | |
| approval_reply = self.team.handle_approval_reply(key, raw) | |
| if approval_reply: | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=approval_reply, | |
| metadata={"team_text": True}, | |
| ) | |
| return OutboundMessage( | |
| channel=msg.channel, | |
| chat_id=msg.chat_id, | |
| content=( | |
| "Team mode is active. Supported input:\n" | |
| "- /team <instruction|status|log|approve|reject|manual|stop>\n" | |
| "- /btw <instruction>" | |
| ), | |
| ) | |
| await self.memory_consolidator.maybe_consolidate_by_tokens(session) | |
| self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id")) | |
| if message_tool := self.tools.get("message"): | |
| if isinstance(message_tool, MessageTool): | |
| message_tool.start_turn() | |
| history = session.get_history(max_messages=0) | |
| initial_messages = self.context.build_messages( | |
| history=history, | |
| current_message=current_message, | |
| media=msg.media if msg.media else None, | |
| channel=msg.channel, chat_id=msg.chat_id, | |
| ) | |
| async def _bus_progress(content: str, *, tool_hint: bool = False) -> None: | |
| meta = dict(msg.metadata or {}) | |
| meta["_progress"] = True | |
| meta["_tool_hint"] = tool_hint | |
| await self.bus.publish_outbound(OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta, | |
| )) | |
| final_content, _, all_msgs = await self._run_agent_loop( | |
| initial_messages, on_progress=on_progress or _bus_progress, | |
| ) | |
| if final_content is None: | |
| final_content = "I've completed processing but have no response to give." | |
| self._save_turn(session, all_msgs, 1 + len(history)) | |
| self.sessions.save(session) | |
| await self.memory_consolidator.maybe_consolidate_by_tokens(session) | |
| if (mt := self.tools.get("message")) and isinstance(mt, MessageTool) and mt._sent_in_turn: | |
| return None | |
| preview = final_content[:120] + "..." if len(final_content) > 120 else final_content | |
| logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) | |
| return OutboundMessage( | |
| channel=msg.channel, chat_id=msg.chat_id, content=final_content, | |
| metadata=msg.metadata or {}, | |
| ) | |
| def _save_turn(self, session: Session, messages: list[dict], skip: int) -> None: | |
| """Save new-turn messages into session, truncating large tool results.""" | |
| for m in messages[skip:]: | |
| entry = dict(m) | |
| role, content = entry.get("role"), entry.get("content") | |
| if role == "assistant" and not content and not entry.get("tool_calls"): | |
| continue # skip empty assistant messages — they poison session context | |
| if role == "tool" and isinstance(content, str) and len(content) > self._TOOL_RESULT_MAX_CHARS: | |
| entry["content"] = content[:self._TOOL_RESULT_MAX_CHARS] + "\n... (truncated)" | |
| elif role == "user": | |
| if isinstance(content, str) and content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG): | |
| # Strip the runtime-context prefix, keep only the user text. | |
| parts = content.split("\n\n", 1) | |
| if len(parts) > 1 and parts[1].strip(): | |
| entry["content"] = parts[1] | |
| else: | |
| continue | |
| if isinstance(content, list): | |
| filtered = [] | |
| for c in content: | |
| if c.get("type") == "text" and isinstance(c.get("text"), str) and c["text"].startswith(ContextBuilder._RUNTIME_CONTEXT_TAG): | |
| continue # Strip runtime context from multimodal messages | |
| if (c.get("type") == "image_url" | |
| and c.get("image_url", {}).get("url", "").startswith("data:image/")): | |
| filtered.append({"type": "text", "text": "[image]"}) | |
| else: | |
| filtered.append(c) | |
| if not filtered: | |
| continue | |
| entry["content"] = filtered | |
| entry.setdefault("timestamp", datetime.now().isoformat()) | |
| session.messages.append(entry) | |
| session.updated_at = datetime.now() | |
| async def process_direct( | |
| self, | |
| content: str, | |
| session_key: str = "cli:direct", | |
| channel: str = "cli", | |
| chat_id: str = "direct", | |
| on_progress: Callable[[str], Awaitable[None]] | None = None, | |
| ) -> str: | |
| """Process a message directly (for CLI or cron usage).""" | |
| await self._connect_mcp() | |
| msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) | |
| response = await self._process_message(msg, session_key=session_key, on_progress=on_progress) | |
| return response.content if response else "" | |