import json import uuid from datetime import datetime from typing import Any from agno.agent import Agent from agno.utils.log import logger from ..services.agent_registry import get_summary_model async def update_session_summary( conversation_id: str, old_summary: dict[str, Any] | None, new_messages: list[dict[str, Any]], database_provider: str | None = None, memory_provider: str | None = None, memory_model: str | None = None, memory_api_key: str | None = None, memory_base_url: str | None = None, # New separate summary config summary_provider: str | None = None, summary_model: str | None = None, summary_api_key: str | None = None, summary_base_url: str | None = None, rebuild_from_scratch: bool = False, ) -> None: """ Async background task to update session summary. Args: conversation_id: The conversation UUID old_summary: The current session summary JSON (or None) new_messages: List of recent messages to incorporate (User + AI) database_provider: The database provider ID memory_provider: Optional provider override memory_model: Optional model override memory_api_key: Optional API key override memory_base_url: Optional base URL override """ try: # 1. Validation logger.info(f"Triggering update_session_summary for {conversation_id}. Messages count: {len(new_messages)}") if not conversation_id or not new_messages: logger.warning("Missing conversation_id or new_messages for summary update") return # Only process if we have user input and AI response # Should contain at least one user message and one assistant message from the latest turn # 2. Get Lite Model # We need a dummy request object to reuse get_summary_model logic or just use global settings # Creating a simple namespace to mock 'request' for get_summary_model from types import SimpleNamespace dummy_request = SimpleNamespace( memory_provider=memory_provider, memory_model=memory_model, memory_api_key=memory_api_key, memory_base_url=memory_base_url, summary_provider=summary_provider, summary_model=summary_model, summary_api_key=summary_api_key, summary_base_url=summary_base_url, ) summary_model = get_summary_model(dummy_request) if not summary_model: logger.warning(f"Skipping session summary update for {conversation_id}: No summary model configured") return # 2a. Re-fetch Latest Summary from DB to avoid race conditions # The old_summary passed from stream_chat might be stale if requests were fast. from ..models.db import DbFilter, DbQueryRequest from .db_service import execute_db_async, get_db_adapter adapter = get_db_adapter(database_provider) if rebuild_from_scratch: old_summary = None elif adapter: try: latest_req = DbQueryRequest( providerId=adapter.config.id, action="select", table="conversations", columns=["session_summary"], filters=[DbFilter(op="eq", column="id", value=conversation_id)], maybeSingle=True, ) latest_res = await execute_db_async(adapter, latest_req) if latest_res.data and isinstance(latest_res.data, dict): row = latest_res.data raw_summary = row.get("session_summary") if raw_summary: if isinstance(raw_summary, str): try: old_summary = json.loads(raw_summary) except (ValueError, json.JSONDecodeError): pass elif isinstance(raw_summary, dict): old_summary = raw_summary except Exception as db_exc: logger.warning(f"Failed to re-fetch latest summary, using passed old_summary: {db_exc}") # 3. Prepare Prompt if rebuild_from_scratch: current_summary_text = "None (Starting Fresh Rebuild)" task_instruction = """Summarize the conversation history from the provided lines into a fresh narrative summary. - **CRITICAL: Discard any previous context not present in the new lines.** - Create a concise narrative summary of the complete conversation history.""" else: current_summary_text = old_summary.get("summary", "") if old_summary else "No summary yet." task_instruction = """Integrate the new lines into the existing summary. - **CRITICAL: You MUST PRESERVE all important details from the 'Current Summary'. Do NOT discard existing topics.** - Merge new information naturally.""" # Extract text content from new messages conversation_text = "" for msg in new_messages: role = msg.get("role", "unknown") content = msg.get("content", "") if content: conversation_text += f"{role.upper()}: {content}\n" prompt = f""" You are an expert conversation summarizer. Current Summary: {current_summary_text} New Conversation Lines: {conversation_text} Task: {task_instruction} - Keep the summary concise but comprehensive (under 500 words). - **Maintain a concise list of high-level topics (max 5 total). avoid granular details as topics.** - Output valid JSON format matching the schema below exactly. Expected JSON Structure: {{ "summary": "The narrative summary...", "topics": ["topic1", "topic2"], "last_active_date": "YYYY-MM-DD" }} Time: {datetime.now().isoformat()} """ # 4. Generate Summary (Non-streaming) agent = Agent( model=summary_model, description="You are a session summarizer.", instructions="Output JSON only.", ) response = await agent.arun(prompt) new_summary_text = response.content # 5. Parse Response # Try to parse strict JSON, if failed, wrap the text # Define error keywords to check against error_keywords = ["balance", "quota", "insufficient", "unauthorized", "rate limit", "error", "401", "429", "500"] try: # Clean potential markdown code blocks clean_text = new_summary_text.replace("```json", "").replace("```", "").strip() summary_data = json.loads(clean_text) # Validate JSON structure if not isinstance(summary_data, dict) or "summary" not in summary_data: logger.warning(f"Invalid summary JSON structure from model: {summary_data}") # If parsed but invalid structure, treat as failed logic... (omitted detailed comment for brevity) else: # VALIDATE CONTENT: Check if the summary text itself is an error message summary_content = str(summary_data.get("summary", "")).lower() if any(kw in summary_content for kw in error_keywords) and len(summary_content) < 100: logger.error(f"Summary content looks like a provider error: {summary_data['summary']}") return # Stop here to avoid overwriting with error text except json.JSONDecodeError: # Check for API error keywords in the raw text lower_text = new_summary_text.lower() if any(kw in lower_text for kw in error_keywords) and len(new_summary_text) < 100: logger.error(f"Summary generation failed with API error message: {new_summary_text}") return # CRITICAL: Stop here. Do not overwrite DB with error message. # Fallback if model didn't output JSON but text looks like a valid summary (not an error) summary_data = { "summary": new_summary_text, "topics": [], "last_run_id": str(uuid.uuid4()) } # Ensure last_run_id is set (or updated) if "last_run_id" not in summary_data: summary_data["last_run_id"] = str(uuid.uuid4()) summary_data["updated_at"] = datetime.now().isoformat() # 6. Update Database from ..models.db import DbFilter, DbQueryRequest from .db_service import get_db_adapter adapter = get_db_adapter(database_provider) # Use request-selected provider when available if adapter: # We update the JSONB session_summary column logger.debug(f"Updating summary using adapter {adapter.config.type} for {conversation_id}") req = DbQueryRequest( providerId=adapter.config.id, action="update", table="conversations", payload={"session_summary": summary_data}, filters=[DbFilter(op="eq", column="id", value=conversation_id)], ) result = await execute_db_async(adapter, req) if result.error: logger.error(f"Failed to update session summary DB: {result.error}") else: logger.info(f"Updated session summary for {conversation_id}") else: logger.warning("No DB adapter available for summary update") except Exception as e: logger.error(f"Failed to update session summary: {e}")