Spaces:
Runtime error
Runtime error
| import asyncio | |
| import logging | |
| import time | |
| from enum import Enum | |
| from functools import cache | |
| from inspect import cleandoc as c | |
| from typing import TypedDict | |
| from sqlalchemy import update | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from src import schemas | |
| from src.cache.client import cache as cache_client | |
| from src.config import ConfiguredModelSettings, settings | |
| from src.crud.session import session_cache_key | |
| from src.dependencies import tracked_db | |
| from src.exceptions import ResourceNotFoundException | |
| from src.llm import HonchoLLMCallResponse, honcho_llm_call | |
| from src.models import Message | |
| from src.telemetry import prometheus_metrics | |
| from src.telemetry.events import AgentToolSummaryCreatedEvent, emit | |
| from src.telemetry.logging import accumulate_metric, conditional_observe | |
| from src.telemetry.prometheus.metrics import ( | |
| DeriverComponents, | |
| DeriverTaskTypes, | |
| TokenTypes, | |
| ) | |
| from src.utils.formatting import utc_now_iso | |
| from src.utils.tokens import estimate_tokens, track_deriver_input_tokens | |
| from .. import crud, models | |
| logger = logging.getLogger(__name__) | |
| # TypedDict definitions for summary data | |
| class Summary(TypedDict): | |
| """ | |
| A summary object. Stored in session metadata and used in a session's get_context. | |
| Attributes: | |
| content: The summary text. | |
| message_id: The primary key ID of the message that this summary covers up to. | |
| summary_type: The type of summary (short or long). | |
| created_at: The timestamp of when the summary was created (ISO format string). | |
| token_count: The number of tokens in the summary text. | |
| """ | |
| content: str | |
| message_id: int | |
| summary_type: str | |
| created_at: str | |
| token_count: int | |
| message_public_id: str | |
| def to_schema_summary(s: Summary) -> schemas.Summary: | |
| return schemas.Summary( | |
| content=s["content"], | |
| message_id=s["message_id"], | |
| summary_type=s["summary_type"], | |
| created_at=s["created_at"], | |
| token_count=s["token_count"], | |
| message_public_id=s.get("message_public_id", ""), | |
| ) | |
| # Export the public functions | |
| __all__ = [ | |
| "get_summary", | |
| "get_both_summaries", | |
| "get_summarized_history", | |
| "get_session_context", | |
| "get_session_context_formatted", | |
| "SummaryType", | |
| "Summary", | |
| "to_schema_summary", | |
| ] | |
| def _get_summary_model_config() -> ConfiguredModelSettings: | |
| return settings.SUMMARY.MODEL_CONFIG | |
| # Configuration constants for summaries | |
| MESSAGES_PER_SHORT_SUMMARY = settings.SUMMARY.MESSAGES_PER_SHORT_SUMMARY | |
| MESSAGES_PER_LONG_SUMMARY = settings.SUMMARY.MESSAGES_PER_LONG_SUMMARY | |
| SUMMARIES_KEY = "summaries" | |
| # The types of summary to store in the session metadata | |
| class SummaryType(Enum): | |
| SHORT = "honcho_chat_summary_short" | |
| LONG = "honcho_chat_summary_long" | |
| def short_summary_prompt( | |
| formatted_messages: str, | |
| output_words: int, | |
| previous_summary_text: str, | |
| ) -> str: | |
| """Generate the short summary prompt.""" | |
| return c(f""" | |
| You are a system that summarizes parts of a conversation to create a concise and accurate summary. Focus on capturing: | |
| 1. Key facts and information shared (**Capture as many explicit facts as possible**) | |
| 2. User preferences, opinions, and questions | |
| 3. Important context and requests | |
| 4. Core topics discussed | |
| If there is a previous summary, ALWAYS make your new summary inclusive of both it and the new messages, therefore capturing the ENTIRE conversation. Prioritize key facts across the entire conversation. | |
| Provide a concise, factual summary that captures the essence of the conversation. Your summary should be detailed enough to serve as context for future messages, but brief enough to be helpful. Prefer a thorough chronological narrative over a list of bullet points. | |
| Return only the summary without any explanation or meta-commentary. | |
| <previous_summary> | |
| {previous_summary_text} | |
| </previous_summary> | |
| <conversation> | |
| {formatted_messages} | |
| </conversation> | |
| Hard limit: {output_words} words maximum. If needed, drop lower-priority detail to stay within the limit. | |
| """) | |
| def long_summary_prompt( | |
| formatted_messages: str, | |
| output_words: int, | |
| previous_summary_text: str, | |
| ) -> str: | |
| """Generate the long summary prompt.""" | |
| return c(f""" | |
| You are a system that creates thorough, comprehensive summaries of conversations. Focus on capturing: | |
| 1. Key facts and information shared (**Capture as many explicit facts as possible**) | |
| 2. User preferences, opinions, and questions | |
| 3. Important context and requests | |
| 4. Core topics discussed in detail | |
| 5. User's apparent emotional state and personality traits | |
| 6. Important themes and patterns across the conversation | |
| If there is a previous summary, ALWAYS make your new summary inclusive of both it and the new messages, therefore capturing the ENTIRE conversation. Prioritize key facts across the entire conversation. | |
| Provide a thorough and detailed summary that captures the essence of the conversation. Your summary should serve as a comprehensive record of the important information in this conversation. Prefer an exhaustive chronological narrative over a list of bullet points. | |
| Return only the summary without any explanation or meta-commentary. | |
| <previous_summary> | |
| {previous_summary_text} | |
| </previous_summary> | |
| <conversation> | |
| {formatted_messages} | |
| </conversation> | |
| Hard limit: {output_words} words maximum. If needed, drop lower-priority detail to stay within the limit. | |
| """) | |
| def estimate_short_summary_prompt_tokens() -> int: | |
| """Estimate tokens for the short summary prompt (without messages/previous_summary).""" | |
| try: | |
| return estimate_tokens( | |
| short_summary_prompt( | |
| formatted_messages="", | |
| output_words=0, | |
| previous_summary_text="", | |
| ) | |
| ) | |
| except Exception: | |
| # Return a rough estimate if estimation fails | |
| return 200 | |
| def estimate_long_summary_prompt_tokens() -> int: | |
| """Estimate tokens for the long summary prompt (without messages/previous_summary).""" | |
| try: | |
| return estimate_tokens( | |
| long_summary_prompt( | |
| formatted_messages="", | |
| output_words=0, | |
| previous_summary_text="", | |
| ) | |
| ) | |
| except Exception: | |
| # Return a rough estimate if estimation fails | |
| return 200 | |
| async def create_short_summary( | |
| formatted_messages: str, | |
| input_tokens: int, | |
| previous_summary: str | None = None, | |
| ) -> HonchoLLMCallResponse[str]: | |
| # input_tokens indicates how many tokens the message list + previous summary take up | |
| # we want to optimize short summaries to be smaller than the actual content being summarized | |
| # so we ask the agent to produce a word count roughly equal to either the input, or the max | |
| # size if the input is larger. the word/token ratio is roughly 4:3 so we multiply by 0.75. | |
| # LLMs *seem* to respond better to getting asked for a word count but should workshop this. | |
| output_words = int(min(input_tokens, settings.SUMMARY.MAX_TOKENS_SHORT) * 0.75) | |
| if previous_summary: | |
| previous_summary_text = previous_summary | |
| else: | |
| previous_summary_text = "There is no previous summary -- the messages are the beginning of the conversation." | |
| prompt = short_summary_prompt( | |
| formatted_messages, output_words, previous_summary_text | |
| ) | |
| return await honcho_llm_call( | |
| model_config=_get_summary_model_config(), | |
| prompt=prompt, | |
| max_tokens=settings.SUMMARY.MAX_TOKENS_SHORT, | |
| ) | |
| async def create_long_summary( | |
| formatted_messages: str, | |
| previous_summary: str | None = None, | |
| ) -> HonchoLLMCallResponse[str]: | |
| # the word/token ratio is roughly 4:3 so we multiply by 0.75. | |
| # LLMs *seem* to respond better to getting asked for a word count but should workshop this. | |
| output_words = int(settings.SUMMARY.MAX_TOKENS_LONG * 0.75) | |
| if previous_summary: | |
| previous_summary_text = previous_summary | |
| else: | |
| previous_summary_text = "There is no previous summary -- the messages are the beginning of the conversation." | |
| prompt = long_summary_prompt( | |
| formatted_messages, output_words, previous_summary_text | |
| ) | |
| return await honcho_llm_call( | |
| model_config=_get_summary_model_config(), | |
| prompt=prompt, | |
| max_tokens=settings.SUMMARY.MAX_TOKENS_LONG, | |
| ) | |
| async def summarize_if_needed( | |
| workspace_name: str, | |
| session_name: str, | |
| message_id: int, | |
| message_seq_in_session: int, | |
| message_public_id: str, | |
| configuration: schemas.ResolvedConfiguration, | |
| ) -> None: | |
| """ | |
| Create short/long summaries if thresholds met. | |
| This function checks for both short and long summary needs independently, | |
| without assuming any relationship between their thresholds. | |
| Args: | |
| workspace_name: The workspace name | |
| session_name: The session name | |
| message_id: The message ID | |
| message_seq_in_session: The sequence number of the message in the session | |
| message_public_id: The public ID of the message | |
| configuration: The resolved configuration for the message | |
| """ | |
| if configuration.summary.enabled is False: | |
| return | |
| should_create_long: bool = ( | |
| message_seq_in_session % configuration.summary.messages_per_long_summary == 0 | |
| ) | |
| should_create_short: bool = ( | |
| message_seq_in_session % configuration.summary.messages_per_short_summary == 0 | |
| ) | |
| if should_create_long is False and should_create_short is False: | |
| return | |
| # If both summaries need to be created, run them in parallel | |
| if should_create_long and should_create_short: | |
| async def create_long_summary_task(): | |
| await _create_and_save_summary( | |
| workspace_name, | |
| session_name, | |
| message_id=message_id, | |
| message_seq_in_session=message_seq_in_session, | |
| message_public_id=message_public_id, | |
| summary_type=SummaryType.LONG, | |
| configuration=configuration, | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| "long_summary_up_to_message", | |
| message_seq_in_session, | |
| "count", | |
| ) | |
| async def create_short_summary_task(): | |
| await _create_and_save_summary( | |
| workspace_name, | |
| session_name, | |
| message_id=message_id, | |
| message_seq_in_session=message_seq_in_session, | |
| message_public_id=message_public_id, | |
| summary_type=SummaryType.SHORT, | |
| configuration=configuration, | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| "short_summary_up_to_message", | |
| message_seq_in_session, | |
| "count", | |
| ) | |
| await asyncio.gather( | |
| create_long_summary_task(), | |
| create_short_summary_task(), | |
| return_exceptions=True, | |
| ) | |
| else: | |
| # If only one summary needs to be created, run individually | |
| if should_create_long: | |
| await _create_and_save_summary( | |
| workspace_name, | |
| session_name, | |
| message_id=message_id, | |
| message_seq_in_session=message_seq_in_session, | |
| message_public_id=message_public_id, | |
| summary_type=SummaryType.LONG, | |
| configuration=configuration, | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| "long_summary_up_to_message", | |
| message_seq_in_session, | |
| "count", | |
| ) | |
| elif should_create_short: | |
| await _create_and_save_summary( | |
| workspace_name, | |
| session_name, | |
| message_id=message_id, | |
| message_seq_in_session=message_seq_in_session, | |
| message_public_id=message_public_id, | |
| summary_type=SummaryType.SHORT, | |
| configuration=configuration, | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| "short_summary_up_to_message", | |
| message_seq_in_session, | |
| "count", | |
| ) | |
| async def _create_and_save_summary( | |
| workspace_name: str, | |
| session_name: str, | |
| *, | |
| message_id: int, | |
| message_seq_in_session: int, | |
| message_public_id: str, | |
| summary_type: SummaryType, | |
| configuration: schemas.ResolvedConfiguration, | |
| ) -> None: | |
| """ | |
| Create a new summary and save it to the database. | |
| 1. Get the latest summary | |
| 2. Get the messages since the latest summary | |
| 3. Generate a new summary using the messages and the previous summary | |
| 4. Save the new summary to the database | |
| """ | |
| logger.debug("Creating new %s summary", summary_type.name) | |
| summary_start = time.perf_counter() | |
| async with tracked_db("summary.fetch_data") as db: | |
| latest_summary = await get_summary( | |
| db, workspace_name, session_name, summary_type | |
| ) | |
| if latest_summary: | |
| latest_summary_message_id = latest_summary["message_id"] | |
| # Skip if latest summary already covers message. | |
| if latest_summary_message_id >= message_id: | |
| return | |
| previous_summary_text = latest_summary["content"] if latest_summary else None | |
| # Calculate the sequence range for messages to summarize | |
| # We want to get the last N messages where N is the configured summary interval | |
| messages_per_summary = ( | |
| configuration.summary.messages_per_long_summary | |
| if summary_type == SummaryType.LONG | |
| else configuration.summary.messages_per_short_summary | |
| ) | |
| start_seq = max(message_seq_in_session - messages_per_summary + 1, 1) | |
| messages: list[Message] = await crud.get_messages_by_seq_range( | |
| db, | |
| workspace_name, | |
| session_name, | |
| start_seq=start_seq, | |
| end_seq=message_seq_in_session, | |
| ) | |
| if not messages: | |
| logger.warning("No messages to summarize for message %s", message_id) | |
| return | |
| # Extract values before closing session | |
| formatted_messages = _format_messages(messages) | |
| last_message_id = messages[-1].id | |
| last_message_content_preview = messages[-1].content[:30] | |
| message_count = len(messages) | |
| messages_tokens = sum([message.token_count for message in messages]) | |
| previous_summary_tokens = latest_summary["token_count"] if latest_summary else 0 | |
| input_tokens = messages_tokens + previous_summary_tokens | |
| ( | |
| new_summary, | |
| is_fallback, | |
| llm_input_tokens, | |
| llm_output_tokens, | |
| ) = await _create_summary( | |
| formatted_messages=formatted_messages, | |
| previous_summary_text=previous_summary_text, | |
| summary_type=summary_type, | |
| input_tokens=input_tokens, | |
| message_public_id=message_public_id, | |
| last_message_id=last_message_id, | |
| last_message_content_preview=last_message_content_preview, | |
| message_count=message_count, | |
| ) | |
| # Step 3: Save to database with new transaction | |
| if not is_fallback: | |
| # Get base prompt tokens based on summary type | |
| if summary_type == SummaryType.SHORT: | |
| prompt_tokens = estimate_short_summary_prompt_tokens() | |
| else: | |
| prompt_tokens = estimate_long_summary_prompt_tokens() | |
| track_deriver_input_tokens( | |
| task_type=DeriverTaskTypes.SUMMARY, | |
| components={ | |
| DeriverComponents.PROMPT: prompt_tokens, | |
| DeriverComponents.MESSAGES: messages_tokens, | |
| DeriverComponents.PREVIOUS_SUMMARY: previous_summary_tokens, | |
| }, | |
| ) | |
| # Track output tokens | |
| if settings.METRICS.ENABLED: | |
| prometheus_metrics.record_deriver_tokens( | |
| count=new_summary["token_count"], | |
| task_type=DeriverTaskTypes.SUMMARY.value, | |
| token_type=TokenTypes.OUTPUT.value, | |
| component=DeriverComponents.OUTPUT_TOTAL.value, | |
| ) | |
| # Save summary to database with new transaction | |
| async with tracked_db("summary.save") as db: | |
| await _save_summary( | |
| db, | |
| new_summary, | |
| workspace_name, | |
| session_name, | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| f"{summary_type.name}_summary_text", | |
| new_summary["content"], | |
| "blob", | |
| ) | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| f"{summary_type.name}_summary_size", | |
| new_summary["token_count"], | |
| "tokens", | |
| ) | |
| summary_duration = (time.perf_counter() - summary_start) * 1000 | |
| accumulate_metric( | |
| f"summary_{workspace_name}_{message_id}", | |
| f"{summary_type.name}_summary_creation", | |
| summary_duration, | |
| "ms", | |
| ) | |
| # Emit telemetry event (only for non-fallback summaries) | |
| # Note: Using AgentToolSummaryCreatedEvent with dummy run_id/iteration since | |
| # this is called from the deriver, not from an agentic loop | |
| if not is_fallback: | |
| emit( | |
| AgentToolSummaryCreatedEvent( | |
| run_id="deriver", # Placeholder - not from an agentic run | |
| iteration=0, # Placeholder - not from an agentic loop | |
| parent_category="deriver", | |
| agent_type="summarizer", | |
| workspace_name=workspace_name, | |
| session_name=session_name, | |
| message_id=message_public_id, | |
| message_count=len(messages), | |
| message_seq_in_session=message_seq_in_session, | |
| summary_type="short" if summary_type == SummaryType.SHORT else "long", | |
| input_tokens=llm_input_tokens, | |
| output_tokens=llm_output_tokens, | |
| ) | |
| ) | |
| async def _create_summary( | |
| formatted_messages: str, | |
| previous_summary_text: str | None, | |
| summary_type: SummaryType, | |
| input_tokens: int, | |
| message_public_id: str, | |
| last_message_id: int, | |
| last_message_content_preview: str, | |
| message_count: int, | |
| ) -> tuple[Summary, bool, int, int]: | |
| """ | |
| Generate a summary of the provided messages using an LLM. | |
| Args: | |
| formatted_messages: Pre-formatted message string | |
| previous_summary_text: Optional previous summary to provide context | |
| summary_type: Type of summary to create ("short" or "long") | |
| input_tokens: Token count for input | |
| message_public_id: Public ID of the last message | |
| last_message_id: ID of the last message | |
| last_message_content_preview: Preview of last message content for fallback | |
| message_count: Number of messages for fallback | |
| Returns: | |
| A tuple of (Summary, is_fallback, llm_input_tokens, llm_output_tokens) | |
| where is_fallback indicates if the summary was generated using a | |
| fallback instead of an LLM call, and the token counts are from the LLM call | |
| (0 if fallback was used) | |
| """ | |
| response: HonchoLLMCallResponse[str] | None = None | |
| is_fallback = False | |
| llm_input_tokens = 0 | |
| llm_output_tokens = 0 | |
| try: | |
| if summary_type == SummaryType.SHORT: | |
| response = await create_short_summary( | |
| formatted_messages, input_tokens, previous_summary_text | |
| ) | |
| else: | |
| response = await create_long_summary( | |
| formatted_messages, previous_summary_text | |
| ) | |
| summary_text = response.content | |
| summary_tokens = response.output_tokens | |
| llm_input_tokens = response.input_tokens | |
| llm_output_tokens = response.output_tokens | |
| # Detect potential issues with the summary | |
| if not summary_text.strip(): | |
| logger.error( | |
| "Generated summary is empty (finish_reasons=%s). Falling back to basic summary.", | |
| response.finish_reasons, | |
| ) | |
| is_fallback = True | |
| summary_text = ( | |
| f"Conversation with {message_count} messages about {last_message_content_preview}..." | |
| if message_count > 0 | |
| else "" | |
| ) | |
| summary_tokens = estimate_tokens(summary_text) if summary_text else 0 | |
| llm_input_tokens = 0 | |
| llm_output_tokens = 0 | |
| except Exception: | |
| logger.exception("Error generating summary!") | |
| # Fallback to a basic summary in case of error | |
| summary_text = ( | |
| f"Conversation with {message_count} messages about {last_message_content_preview}..." | |
| if message_count > 0 | |
| else "" | |
| ) | |
| summary_tokens = 0 | |
| is_fallback = True | |
| return ( | |
| Summary( | |
| content=summary_text, | |
| message_id=last_message_id, | |
| summary_type=summary_type.value, | |
| created_at=utc_now_iso(), | |
| token_count=summary_tokens, | |
| message_public_id=message_public_id, | |
| ), | |
| is_fallback, | |
| llm_input_tokens, | |
| llm_output_tokens, | |
| ) | |
| async def _save_summary( | |
| db: AsyncSession, | |
| summary: Summary, | |
| workspace_name: str, | |
| session_name: str, | |
| ) -> None: | |
| """ | |
| Save a summary as metadata on a session. | |
| Args: | |
| db: Database session | |
| summary: The summary to save | |
| workspace_name: Workspace name | |
| session_name: Session name | |
| """ | |
| from src.exceptions import ResourceNotFoundException | |
| # Get the label value from the enum | |
| label_value = summary["summary_type"] | |
| try: | |
| session = await crud.get_session(db, session_name, workspace_name) | |
| except ResourceNotFoundException: | |
| # If session doesn't exist, we can't save the summary | |
| logger.warning( | |
| f"Cannot save summary: session {session_name} not found in workspace {workspace_name}" | |
| ) | |
| return | |
| # Use SQLAlchemy update() with PostgreSQL's || operator to properly merge JSONB | |
| # We need to merge the new summary into the existing summaries structure | |
| update_data = {} | |
| existing_summaries = session.internal_metadata.get(SUMMARIES_KEY, {}) | |
| existing_summaries[label_value] = summary | |
| update_data[SUMMARIES_KEY] = existing_summaries | |
| stmt = ( | |
| update(models.Session) | |
| .where(models.Session.workspace_name == workspace_name) | |
| .where(models.Session.name == session_name) | |
| .values( | |
| internal_metadata=models.Session.internal_metadata.op("||")(update_data) | |
| ) | |
| ) | |
| await db.execute(stmt) | |
| await db.commit() | |
| cache_key = session_cache_key(workspace_name, session_name) | |
| await cache_client.delete(cache_key) | |
| async def get_summarized_history( | |
| db: AsyncSession, | |
| workspace_name: str, | |
| session_name: str, | |
| cutoff: int | None = None, | |
| summary_type: SummaryType = SummaryType.SHORT, | |
| ) -> str: | |
| """ | |
| Get a summarized version of the chat history by combining the latest summary | |
| with all messages since that summary. | |
| Note: history is exclusive of the cutoff message. | |
| Args: | |
| db: Database session | |
| workspace_name: The workspace name | |
| session_name: The session name | |
| cutoff: (Optional) message ID to cutoff at | |
| summary_type: Type of summary to get ("short" or "long") | |
| Returns: | |
| A string formatted history text with summary and recent messages | |
| """ | |
| # Get messages since the latest summary and the summary itself | |
| summary = await get_summary(db, workspace_name, session_name, summary_type) | |
| # Check if we have a valid summary with a message_id | |
| if summary: | |
| messages = await crud.get_messages_id_range( | |
| db, | |
| workspace_name, | |
| session_name, | |
| start_id=summary["message_id"], | |
| end_id=cutoff, | |
| ) | |
| else: | |
| messages = await crud.get_messages_id_range( | |
| db, workspace_name, session_name, end_id=cutoff | |
| ) | |
| # Format messages | |
| messages_text = _format_messages(messages) | |
| if summary: | |
| # Combine summary with recent messages | |
| return f""" | |
| <summary> | |
| {summary["content"]} | |
| </summary> | |
| <recent_messages> | |
| {messages_text} | |
| </recent_messages> | |
| """ | |
| # No summary available, return just the messages | |
| return messages_text | |
| async def get_summary( | |
| db: AsyncSession, | |
| workspace_name: str, | |
| session_name: str, | |
| summary_type: SummaryType = SummaryType.SHORT, | |
| ) -> Summary | None: | |
| """ | |
| Get summary for a given session. | |
| Args: | |
| db: Database session | |
| workspace_name: The workspace name | |
| session_name: The session name | |
| summary_type: Type of summary to retrieve ("short" or "long") | |
| Returns: | |
| The summary data dictionary, or None if no summary exists | |
| """ | |
| try: | |
| session = await crud.get_session(db, session_name, workspace_name) | |
| except ResourceNotFoundException: | |
| # If session doesn't exist, there's no summary to retrieve | |
| return None | |
| summaries: dict[str, Summary] = session.internal_metadata.get(SUMMARIES_KEY, {}) | |
| if not summaries or summary_type.value not in summaries: | |
| return None | |
| return summaries[summary_type.value] | |
| async def get_both_summaries( | |
| db: AsyncSession, | |
| workspace_name: str, | |
| session_name: str, | |
| ) -> tuple[Summary | None, Summary | None]: | |
| """ | |
| Get both short and long summaries for a given session. | |
| Args: | |
| db: Database session | |
| workspace_name: The workspace name | |
| session_name: The session name | |
| Returns: | |
| A tuple of the short and long summaries, or None if no summary exists | |
| """ | |
| try: | |
| session = await crud.get_session(db, session_name, workspace_name) | |
| except ResourceNotFoundException: | |
| # If session doesn't exist, there's no summary to retrieve | |
| return None, None | |
| summaries: dict[str, Summary] = session.internal_metadata.get(SUMMARIES_KEY, {}) | |
| return summaries.get(SummaryType.SHORT.value), summaries.get(SummaryType.LONG.value) | |
| async def get_session_context( | |
| db: AsyncSession, | |
| workspace_name: str, | |
| session_name: str, | |
| token_limit: int, | |
| *, | |
| cutoff: int | None = None, | |
| include_summary: bool = True, | |
| ) -> tuple[schemas.Summary | None, list[models.Message]]: | |
| """ | |
| Get session context similar to the API endpoint but for internal use. | |
| Args: | |
| db: Database session | |
| workspace_name: The workspace name | |
| session_name: The session name | |
| token_limit: Maximum tokens for the context | |
| cutoff: Optional message ID to stop at (exclusive) | |
| include_summary: Whether to include summary if available | |
| Returns: | |
| Tuple of (summary, messages) where summary is a Summary pydantic model (or None) | |
| and messages is the list of message objects | |
| """ | |
| if token_limit <= 0: | |
| return None, [] | |
| summary = None | |
| messages_tokens = token_limit | |
| messages_start_id = 0 | |
| if include_summary: | |
| # Allocate 40% of tokens to summary, 60% to messages | |
| summary_tokens_limit = int(token_limit * 0.4) | |
| latest_short_summary, latest_long_summary = await get_both_summaries( | |
| db, workspace_name, session_name | |
| ) | |
| long_len = latest_long_summary["token_count"] if latest_long_summary else 0 | |
| short_len = latest_short_summary["token_count"] if latest_short_summary else 0 | |
| # Return the longest summary that fits within the token limit | |
| if ( | |
| latest_long_summary | |
| and long_len <= summary_tokens_limit | |
| and long_len > short_len | |
| ): | |
| summary = schemas.Summary( | |
| content=latest_long_summary["content"], | |
| message_id=latest_long_summary["message_id"], | |
| summary_type=latest_long_summary["summary_type"], | |
| created_at=latest_long_summary["created_at"], | |
| token_count=latest_long_summary["token_count"], | |
| message_public_id=latest_long_summary.get("message_public_id", ""), | |
| ) | |
| messages_tokens = token_limit - latest_long_summary["token_count"] | |
| messages_start_id = latest_long_summary["message_id"] | |
| elif ( | |
| latest_short_summary and short_len <= summary_tokens_limit and short_len > 0 | |
| ): | |
| summary = schemas.Summary( | |
| content=latest_short_summary["content"], | |
| message_id=latest_short_summary["message_id"], | |
| summary_type=latest_short_summary["summary_type"], | |
| created_at=latest_short_summary["created_at"], | |
| token_count=latest_short_summary["token_count"], | |
| message_public_id=latest_short_summary.get("message_public_id", ""), | |
| ) | |
| messages_tokens = token_limit - latest_short_summary["token_count"] | |
| messages_start_id = latest_short_summary["message_id"] | |
| else: | |
| logger.debug( | |
| "No summary available for get_context call with token limit %s, returning empty string. Normal if brand-new session. long_summary_len: %s, short_summary_len: %s", | |
| token_limit, | |
| long_len, | |
| short_len, | |
| ) | |
| # Get recent messages after summary | |
| messages = await crud.get_messages_id_range( | |
| db, | |
| workspace_name, | |
| session_name, | |
| start_id=messages_start_id, | |
| end_id=cutoff, | |
| token_limit=messages_tokens, | |
| ) | |
| return summary, messages | |
| async def get_session_context_formatted( | |
| db: AsyncSession, | |
| workspace_name: str, | |
| session_name: str, | |
| token_limit: int, | |
| *, | |
| cutoff: int | None = None, | |
| include_summary: bool = True, | |
| ) -> str: | |
| """ | |
| Get formatted session context as a string for internal use (e.g., deriver). | |
| This is a convenience wrapper around get_session_context that formats | |
| the output as a string. | |
| """ | |
| if token_limit <= 0: | |
| return "" | |
| summary, messages = await get_session_context( | |
| db, | |
| workspace_name, | |
| session_name, | |
| token_limit, | |
| cutoff=cutoff, | |
| include_summary=include_summary, | |
| ) | |
| # Format the messages | |
| messages_text = _format_messages(messages) | |
| summary_content = summary.content if summary else "" | |
| if summary_content and messages_text: | |
| return f"""<summary> | |
| {summary_content} | |
| </summary> | |
| <recent_messages> | |
| {messages_text} | |
| </recent_messages>""" | |
| elif summary_content: | |
| return f"""<summary> | |
| {summary_content} | |
| </summary>""" | |
| elif messages_text: | |
| return messages_text | |
| else: | |
| return "" | |
| def _format_messages(messages: list[models.Message]) -> str: | |
| """ | |
| Format a list of messages into a string by concatenating their content and | |
| prefixing each with the peer name. | |
| """ | |
| if len(messages) == 0: | |
| return "" | |
| return "\n".join([f"{msg.peer_name}: {msg.content}" for msg in messages]) | |