Spaces:
Runtime error
Runtime error
| """ | |
| Core Dialectic Agent implementation. | |
| This agent uses tools to gather context from the memory system | |
| and synthesize responses to queries about a peer. | |
| """ | |
| import logging | |
| import time | |
| import uuid | |
| from collections.abc import AsyncIterator, Callable | |
| from typing import Any, cast | |
| from src import crud | |
| from src.config import ConfiguredModelSettings, ReasoningLevel, settings | |
| from src.dependencies import tracked_db | |
| from src.dialectic import prompts | |
| from src.embedding_client import embedding_client | |
| from src.llm import ( | |
| HonchoLLMCallResponse, | |
| StreamingResponseWithMetadata, | |
| honcho_llm_call, | |
| ) | |
| from src.telemetry import prometheus_metrics | |
| from src.telemetry.events import DialecticCompletedEvent, emit | |
| from src.telemetry.logging import ( | |
| accumulate_metric, | |
| log_performance_metrics, | |
| log_token_usage_metrics, | |
| ) | |
| from src.telemetry.prometheus.metrics import DialecticComponents, TokenTypes | |
| from src.utils.agent_tools import ( | |
| DIALECTIC_TOOLS, | |
| DIALECTIC_TOOLS_MINIMAL, | |
| create_tool_executor, | |
| search_memory, | |
| ) | |
| from src.utils.formatting import format_new_turn_with_timestamp | |
| logger = logging.getLogger(__name__) | |
| def _get_dialectic_level_model_config( | |
| reasoning_level: ReasoningLevel, | |
| ) -> ConfiguredModelSettings: | |
| return settings.DIALECTIC.LEVELS[reasoning_level].MODEL_CONFIG | |
| class DialecticAgent: | |
| """ | |
| An agentic dialectic that iteratively gathers context to answer queries. | |
| Unlike the standard dialectic which pre-gathers all context before a single | |
| LLM call, this agent uses tools to strategically gather only the context | |
| needed to answer the specific query. | |
| """ | |
| def __init__( | |
| self, | |
| workspace_name: str, | |
| session_name: str | None, | |
| observer: str, | |
| observed: str, | |
| observer_peer_card: list[str] | None = None, | |
| observed_peer_card: list[str] | None = None, | |
| metric_key: str | None = None, | |
| reasoning_level: ReasoningLevel = "low", | |
| ): | |
| """ | |
| Initialize the dialectic agent. | |
| Args: | |
| workspace_name: Workspace identifier | |
| session_name: Session identifier (may be None for global queries) | |
| observer: The peer making the query | |
| observed: The peer being queried about | |
| observer_peer_card: Biographical information about the observer | |
| observed_peer_card: Biographical information about the observed peer | |
| metric_key: Optional key for logging metrics (if provided, agent won't log separately) | |
| reasoning_level: Level of reasoning to apply | |
| """ | |
| self.workspace_name: str = workspace_name | |
| self.session_name: str | None = session_name | |
| self.observer: str = observer | |
| self.observed: str = observed | |
| self.observer_peer_card: list[str] | None = observer_peer_card | |
| self.observed_peer_card: list[str] | None = observed_peer_card | |
| self.metric_key: str | None = metric_key | |
| self.reasoning_level: ReasoningLevel = reasoning_level | |
| # Initialize conversation history with system prompt | |
| self.messages: list[dict[str, str]] = [ | |
| { | |
| "role": "system", | |
| "content": prompts.agent_system_prompt( | |
| observer, observed, observer_peer_card, observed_peer_card | |
| ), | |
| } | |
| ] | |
| self._session_history_initialized: bool = False | |
| self._prefetched_conclusion_count: int = 0 | |
| self._run_id: str = str(uuid.uuid4())[ | |
| :8 | |
| ] # Always generate for event correlation | |
| async def _initialize_session_history(self) -> None: | |
| """Fetch and inject session history into the system prompt if configured.""" | |
| if self._session_history_initialized: | |
| return | |
| self._session_history_initialized = True | |
| max_tokens = settings.DIALECTIC.SESSION_HISTORY_MAX_TOKENS | |
| if max_tokens == 0 or not self.session_name: | |
| return | |
| # Fetch recent messages up to the token limit | |
| stmt = await crud.get_messages( | |
| workspace_name=self.workspace_name, | |
| session_name=self.session_name, | |
| token_limit=max_tokens, | |
| reverse=False, # chronological order | |
| ) | |
| async with tracked_db("dialectic.session_history") as db: | |
| result = await db.execute(stmt) | |
| messages = result.scalars().all() | |
| if not messages: | |
| return | |
| # Format messages for injection (must access ORM attrs before session closes) | |
| formatted_messages: list[str] = [] | |
| for msg in messages: | |
| formatted = format_new_turn_with_timestamp( | |
| msg.content, msg.created_at, msg.peer_name | |
| ) | |
| formatted_messages.append(formatted) | |
| session_history_section = ( | |
| "\n\n## SESSION HISTORY\n\n" | |
| "The following is the recent conversation history from this session. " | |
| "Use this as immediate context when answering the query.\n\n" | |
| "<session_history>\n" | |
| f"{chr(10).join(formatted_messages)}\n" | |
| "</session_history>" | |
| ) | |
| # Append session history to the system prompt | |
| self.messages[0]["content"] += session_history_section | |
| async def _prefetch_relevant_observations(self, query: str) -> str | None: | |
| """ | |
| Prefetch semantically relevant observations for the query. | |
| This provides immediate context to the agent without requiring | |
| tool calls, improving response quality and speed. | |
| Performs two separate searches to prevent retrieval dilution: | |
| - Explicit observations (produced by deriver) | |
| - Higher-level observations (produced in dreaming/background/chat) | |
| The number of observations fetched depends on reasoning level: | |
| - minimal: 10 of each type (reduced context for cost savings) | |
| - all others: 25 of each type | |
| Args: | |
| query: The user's query | |
| Returns: | |
| Formatted observations string or None if no observations found | |
| """ | |
| # Use reduced prefetch for minimal reasoning to save tokens | |
| prefetch_limit = 10 if self.reasoning_level == "minimal" else 25 | |
| try: | |
| # Pre-compute embedding once for both searches (no DB needed) | |
| query_embedding = await embedding_client.embed(query) | |
| # search_memory manages its own short-lived DB sessions so no | |
| # connection is held during external vector-store calls. | |
| explicit_repr = await search_memory( | |
| workspace_name=self.workspace_name, | |
| observer=self.observer, | |
| observed=self.observed, | |
| query=query, | |
| limit=prefetch_limit, | |
| levels=["explicit"], | |
| embedding=query_embedding, | |
| ) | |
| derived_repr = await search_memory( | |
| workspace_name=self.workspace_name, | |
| observer=self.observer, | |
| observed=self.observed, | |
| query=query, | |
| limit=prefetch_limit, | |
| levels=["deductive", "inductive", "contradiction"], | |
| embedding=query_embedding, | |
| ) | |
| if explicit_repr.is_empty() and derived_repr.is_empty(): | |
| return None | |
| # Count prefetched conclusions for telemetry | |
| explicit_count = len(explicit_repr.explicit) + len(explicit_repr.deductive) | |
| derived_count = len(derived_repr.explicit) + len(derived_repr.deductive) | |
| self._prefetched_conclusion_count = explicit_count + derived_count | |
| # Format as two separate sections | |
| parts: list[str] = [] | |
| if not explicit_repr.is_empty(): | |
| parts.append(explicit_repr.format_as_markdown(include_ids=False)) | |
| if not derived_repr.is_empty(): | |
| # Include IDs for derived so agent can use get_reasoning_chain | |
| parts.append(derived_repr.format_as_markdown(include_ids=True)) | |
| return "\n".join(parts) | |
| except Exception as e: | |
| logger.warning(f"Failed to prefetch observations: {e}") | |
| return None | |
| async def _prepare_query( | |
| self, query: str | |
| ) -> tuple[Callable[[str, dict[str, Any]], Any], str, str | None, float]: | |
| """ | |
| Prepare common state for answering a query. | |
| Handles session history initialization, metrics setup, observation prefetching, | |
| user message construction, and tool executor creation. | |
| Args: | |
| query: The question to answer about the peer | |
| Returns: | |
| A tuple of (tool_executor, task_name, run_id, start_time) | |
| """ | |
| await self._initialize_session_history() | |
| run_id: str | None = None | |
| if self.metric_key: | |
| task_name = self.metric_key | |
| else: | |
| run_id = str(uuid.uuid4())[:8] | |
| task_name = f"dialectic_chat_{run_id}" | |
| start_time = time.perf_counter() | |
| accumulate_metric( | |
| task_name, | |
| "context", | |
| ( | |
| f"workspace: {self.workspace_name}\n" | |
| f"session: {self.session_name or '(global)'}\n" | |
| f"observer: {self.observer}\n" | |
| f"observed: {self.observed}\n" | |
| f"reasoning_level: {self.reasoning_level}" | |
| ), | |
| "blob", | |
| ) | |
| accumulate_metric(task_name, "query", query, "blob") | |
| prefetched_observations = await self._prefetch_relevant_observations(query) | |
| if prefetched_observations: | |
| user_content = ( | |
| f"Query: {query}\n\n" | |
| f"## Relevant Observations (prefetched)\n" | |
| f"The following observations were found to be semantically relevant to your query. " | |
| f"Use these as primary context. You may still use tools to find additional information if needed.\n\n" | |
| f"{prefetched_observations}" | |
| ) | |
| accumulate_metric( | |
| task_name, "prefetched_observations", prefetched_observations, "blob" | |
| ) | |
| else: | |
| user_content = f"Query: {query}" | |
| self.messages.append({"role": "user", "content": user_content}) | |
| tool_executor: Callable[ | |
| [str, dict[str, Any]], Any | |
| ] = await create_tool_executor( | |
| workspace_name=self.workspace_name, | |
| session_name=self.session_name, | |
| observer=self.observer, | |
| observed=self.observed, | |
| history_token_limit=settings.DIALECTIC.HISTORY_TOKEN_LIMIT, | |
| run_id=self._run_id, | |
| agent_type="dialectic", | |
| parent_category="dialectic", | |
| ) | |
| return tool_executor, task_name, run_id, start_time | |
| def _log_response_metrics( | |
| self, | |
| task_name: str, | |
| run_id: str | None, | |
| start_time: float, | |
| response_content: str, | |
| input_tokens: int, | |
| output_tokens: int, | |
| cache_read_input_tokens: int | None, | |
| cache_creation_input_tokens: int | None, | |
| tool_calls_count: int, | |
| thinking_content: str | None, | |
| iterations: int, | |
| ) -> None: | |
| """ | |
| Log metrics common to both streaming and non-streaming responses. | |
| Args: | |
| task_name: Metrics task identifier | |
| run_id: Run identifier (None if using caller-provided metric_key) | |
| start_time: Start time from time.perf_counter() | |
| response_content: The full response text | |
| input_tokens: Input token count (actual from API) | |
| output_tokens: Output token count (actual from API) | |
| cache_read_input_tokens: Cache read tokens (if any) | |
| cache_creation_input_tokens: Cache creation tokens (if any) | |
| tool_calls_count: Number of tool calls made | |
| thinking_content: Thinking trace content (if any) | |
| iterations: Number of iterations in the tool execution loop | |
| """ | |
| accumulate_metric(task_name, "tool_calls", tool_calls_count, "count") | |
| if thinking_content: | |
| accumulate_metric(task_name, "thinking", thinking_content, "blob") | |
| log_token_usage_metrics( | |
| task_name, | |
| input_tokens, | |
| output_tokens, | |
| cache_read_input_tokens or 0, | |
| cache_creation_input_tokens or 0, | |
| ) | |
| accumulate_metric(task_name, "response", response_content, "blob") | |
| elapsed_ms = (time.perf_counter() - start_time) * 1000 | |
| accumulate_metric(task_name, "total_duration", elapsed_ms, "ms") | |
| if not self.metric_key and run_id is not None: | |
| log_performance_metrics("dialectic_chat", run_id) | |
| # Prometheus metrics | |
| if settings.METRICS.ENABLED: | |
| prometheus_metrics.record_dialectic_tokens( | |
| count=input_tokens, | |
| token_type=TokenTypes.INPUT.value, | |
| component=DialecticComponents.TOTAL.value, | |
| reasoning_level=self.reasoning_level, | |
| ) | |
| prometheus_metrics.record_dialectic_tokens( | |
| count=output_tokens, | |
| token_type=TokenTypes.OUTPUT.value, | |
| component=DialecticComponents.TOTAL.value, | |
| reasoning_level=self.reasoning_level, | |
| ) | |
| # Emit telemetry event | |
| emit( | |
| DialecticCompletedEvent( | |
| run_id=self._run_id, | |
| workspace_name=self.workspace_name, | |
| peer_name=self.observed, | |
| session_name=self.session_name, | |
| reasoning_level=self.reasoning_level, | |
| total_iterations=iterations, | |
| prefetched_conclusion_count=self._prefetched_conclusion_count, | |
| tool_calls_count=tool_calls_count, | |
| total_duration_ms=elapsed_ms, | |
| input_tokens=input_tokens, | |
| output_tokens=output_tokens, | |
| cache_read_tokens=cache_read_input_tokens or 0, | |
| cache_creation_tokens=cache_creation_input_tokens or 0, | |
| ) | |
| ) | |
| async def answer(self, query: str) -> str: | |
| """ | |
| Answer a query about the peer using agentic tool calling. | |
| The agent will: | |
| 1. Receive the query | |
| 2. Use tools to gather relevant context | |
| 3. Synthesize a response grounded in the gathered context | |
| Args: | |
| query: The question to answer about the peer | |
| Returns: | |
| The synthesized answer string | |
| """ | |
| tool_executor, task_name, run_id, start_time = await self._prepare_query(query) | |
| # Get level-specific settings | |
| level_settings = settings.DIALECTIC.LEVELS[self.reasoning_level] | |
| # Use minimal tools for minimal reasoning to reduce cost | |
| tools = ( | |
| DIALECTIC_TOOLS_MINIMAL | |
| if self.reasoning_level == "minimal" | |
| else DIALECTIC_TOOLS | |
| ) | |
| # Use level-specific max_output_tokens if set, otherwise global default | |
| max_tokens = ( | |
| level_settings.MAX_OUTPUT_TOKENS | |
| if level_settings.MAX_OUTPUT_TOKENS is not None | |
| else settings.DIALECTIC.MAX_OUTPUT_TOKENS | |
| ) | |
| response: HonchoLLMCallResponse[str] = await honcho_llm_call( | |
| model_config=_get_dialectic_level_model_config(self.reasoning_level), | |
| prompt="", # Ignored since we pass messages | |
| max_tokens=max_tokens, | |
| tools=tools, | |
| tool_choice=level_settings.TOOL_CHOICE, | |
| tool_executor=tool_executor, | |
| max_tool_iterations=level_settings.MAX_TOOL_ITERATIONS, | |
| messages=self.messages, | |
| track_name="Dialectic Agent", | |
| max_input_tokens=settings.DIALECTIC.MAX_INPUT_TOKENS, | |
| trace_name="dialectic_chat", | |
| ) | |
| self._log_response_metrics( | |
| task_name=task_name, | |
| run_id=run_id, | |
| start_time=start_time, | |
| response_content=response.content, | |
| input_tokens=response.input_tokens, | |
| output_tokens=response.output_tokens, | |
| cache_read_input_tokens=response.cache_read_input_tokens, | |
| cache_creation_input_tokens=response.cache_creation_input_tokens, | |
| tool_calls_count=len(response.tool_calls_made), | |
| thinking_content=response.thinking_content, | |
| iterations=response.iterations, | |
| ) | |
| return response.content | |
| async def answer_stream(self, query: str) -> AsyncIterator[str]: | |
| """ | |
| Answer a query about the peer using agentic tool calling, streaming the response. | |
| The agent will: | |
| 1. Receive the query | |
| 2. Use tools to gather relevant context (non-streaming) | |
| 3. Stream the synthesized response | |
| Args: | |
| query: The question to answer about the peer | |
| Yields: | |
| Chunks of the response text as they are generated | |
| """ | |
| tool_executor, task_name, run_id, start_time = await self._prepare_query(query) | |
| # Get level-specific settings | |
| level_settings = settings.DIALECTIC.LEVELS[self.reasoning_level] | |
| # Use minimal tools for minimal reasoning to reduce cost | |
| tools = ( | |
| DIALECTIC_TOOLS_MINIMAL | |
| if self.reasoning_level == "minimal" | |
| else DIALECTIC_TOOLS | |
| ) | |
| # Use level-specific max_output_tokens if set, otherwise global default | |
| max_tokens = ( | |
| level_settings.MAX_OUTPUT_TOKENS | |
| if level_settings.MAX_OUTPUT_TOKENS is not None | |
| else settings.DIALECTIC.MAX_OUTPUT_TOKENS | |
| ) | |
| response = cast( | |
| StreamingResponseWithMetadata, | |
| await honcho_llm_call( | |
| model_config=_get_dialectic_level_model_config(self.reasoning_level), | |
| prompt="", # Ignored since we pass messages | |
| max_tokens=max_tokens, | |
| stream=True, | |
| stream_final_only=True, | |
| tools=tools, | |
| tool_choice=level_settings.TOOL_CHOICE, | |
| tool_executor=tool_executor, | |
| max_tool_iterations=level_settings.MAX_TOOL_ITERATIONS, | |
| messages=self.messages, | |
| track_name="Dialectic Agent Stream", | |
| max_input_tokens=settings.DIALECTIC.MAX_INPUT_TOKENS, | |
| trace_name="dialectic_chat", | |
| ), | |
| ) | |
| accumulated_content: list[str] = [] | |
| async for chunk in response: | |
| if chunk.content: | |
| accumulated_content.append(chunk.content) | |
| yield chunk.content | |
| self._log_response_metrics( | |
| task_name=task_name, | |
| run_id=run_id, | |
| start_time=start_time, | |
| response_content="".join(accumulated_content), | |
| input_tokens=response.input_tokens, | |
| output_tokens=response.output_tokens, | |
| cache_read_input_tokens=response.cache_read_input_tokens, | |
| cache_creation_input_tokens=response.cache_creation_input_tokens, | |
| tool_calls_count=len(response.tool_calls_made), | |
| thinking_content=response.thinking_content, | |
| iterations=response.iterations, | |
| ) | |