File size: 9,595 Bytes
4ef118d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
import json
import uuid
from datetime import datetime
from typing import Any

from agno.agent import Agent
from agno.utils.log import logger

from ..services.agent_registry import get_summary_model


async def update_session_summary(
    conversation_id: str,
    old_summary: dict[str, Any] | None,
    new_messages: list[dict[str, Any]],
    database_provider: str | None = None,
    memory_provider: str | None = None,
    memory_model: str | None = None,
    memory_api_key: str | None = None,
    memory_base_url: str | None = None,
    # New separate summary config
    summary_provider: str | None = None,
    summary_model: str | None = None,
    summary_api_key: str | None = None,
    summary_base_url: str | None = None,
    rebuild_from_scratch: bool = False,
) -> None:
    """
    Async background task to update session summary.
    
    Args:
        conversation_id: The conversation UUID
        old_summary: The current session summary JSON (or None)
        new_messages: List of recent messages to incorporate (User + AI)
        database_provider: The database provider ID
        memory_provider: Optional provider override
        memory_model: Optional model override
        memory_api_key: Optional API key override
        memory_base_url: Optional base URL override
    """
    try:
        # 1. Validation
        logger.info(f"Triggering update_session_summary for {conversation_id}. Messages count: {len(new_messages)}")
        if not conversation_id or not new_messages:
            logger.warning("Missing conversation_id or new_messages for summary update")
            return

        # Only process if we have user input and AI response
        # Should contain at least one user message and one assistant message from the latest turn

        # 2. Get Lite Model
        # We need a dummy request object to reuse get_summary_model logic or just use global settings
        # Creating a simple namespace to mock 'request' for get_summary_model
        from types import SimpleNamespace
        dummy_request = SimpleNamespace(
            memory_provider=memory_provider,
            memory_model=memory_model,
            memory_api_key=memory_api_key,
            memory_base_url=memory_base_url,
            summary_provider=summary_provider,
            summary_model=summary_model,
            summary_api_key=summary_api_key,
            summary_base_url=summary_base_url,
        )
        summary_model = get_summary_model(dummy_request)

        if not summary_model:
            logger.warning(f"Skipping session summary update for {conversation_id}: No summary model configured")
            return

        # 2a. Re-fetch Latest Summary from DB to avoid race conditions
        # The old_summary passed from stream_chat might be stale if requests were fast.
        from ..models.db import DbFilter, DbQueryRequest
        from .db_service import execute_db_async, get_db_adapter

        adapter = get_db_adapter(database_provider)
        if rebuild_from_scratch:
            old_summary = None
        elif adapter:
            try:
                latest_req = DbQueryRequest(
                    providerId=adapter.config.id,
                    action="select",
                    table="conversations",
                    columns=["session_summary"],
                    filters=[DbFilter(op="eq", column="id", value=conversation_id)],
                    maybeSingle=True,
                )
                latest_res = await execute_db_async(adapter, latest_req)
                if latest_res.data and isinstance(latest_res.data, dict):
                    row = latest_res.data
                    raw_summary = row.get("session_summary")
                    if raw_summary:
                        if isinstance(raw_summary, str):
                            try:
                                old_summary = json.loads(raw_summary)
                            except (ValueError, json.JSONDecodeError):
                                pass
                        elif isinstance(raw_summary, dict):
                            old_summary = raw_summary
            except Exception as db_exc:
                logger.warning(f"Failed to re-fetch latest summary, using passed old_summary: {db_exc}")

        # 3. Prepare Prompt
        if rebuild_from_scratch:
            current_summary_text = "None (Starting Fresh Rebuild)"
            task_instruction = """Summarize the conversation history from the provided lines into a fresh narrative summary.
- **CRITICAL: Discard any previous context not present in the new lines.**
- Create a concise narrative summary of the complete conversation history."""
        else:
            current_summary_text = old_summary.get("summary", "") if old_summary else "No summary yet."
            task_instruction = """Integrate the new lines into the existing summary.
- **CRITICAL: You MUST PRESERVE all important details from the 'Current Summary'. Do NOT discard existing topics.**
- Merge new information naturally."""

        # Extract text content from new messages
        conversation_text = ""
        for msg in new_messages:
            role = msg.get("role", "unknown")
            content = msg.get("content", "")
            if content:
                conversation_text += f"{role.upper()}: {content}\n"

        prompt = f"""
You are an expert conversation summarizer.
Current Summary:
{current_summary_text}

New Conversation Lines:
{conversation_text}

Task:
{task_instruction}
- Keep the summary concise but comprehensive (under 500 words).
- **Maintain a concise list of high-level topics (max 5 total). avoid granular details as topics.**
- Output valid JSON format matching the schema below exactly.

Expected JSON Structure:
{{
    "summary": "The narrative summary...",
    "topics": ["topic1", "topic2"],
    "last_active_date": "YYYY-MM-DD"
}}
        
Time: {datetime.now().isoformat()}
"""

        # 4. Generate Summary (Non-streaming)
        agent = Agent(
            model=summary_model,
            description="You are a session summarizer.",
            instructions="Output JSON only.",
        )

        response = await agent.arun(prompt)
        new_summary_text = response.content

        # 5. Parse Response
        # Try to parse strict JSON, if failed, wrap the text
        # Define error keywords to check against
        error_keywords = ["balance", "quota", "insufficient", "unauthorized", "rate limit", "error", "401", "429", "500"]

        try:
            # Clean potential markdown code blocks
            clean_text = new_summary_text.replace("```json", "").replace("```", "").strip()
            summary_data = json.loads(clean_text)

            # Validate JSON structure
            if not isinstance(summary_data, dict) or "summary" not in summary_data:
                logger.warning(f"Invalid summary JSON structure from model: {summary_data}")
                # If parsed but invalid structure, treat as failed logic... (omitted detailed comment for brevity)
            else:
                # VALIDATE CONTENT: Check if the summary text itself is an error message
                summary_content = str(summary_data.get("summary", "")).lower()
                if any(kw in summary_content for kw in error_keywords) and len(summary_content) < 100:
                    logger.error(f"Summary content looks like a provider error: {summary_data['summary']}")
                    return # Stop here to avoid overwriting with error text

        except json.JSONDecodeError:
            # Check for API error keywords in the raw text
            lower_text = new_summary_text.lower()
            if any(kw in lower_text for kw in error_keywords) and len(new_summary_text) < 100:
                logger.error(f"Summary generation failed with API error message: {new_summary_text}")
                return # CRITICAL: Stop here. Do not overwrite DB with error message.

            # Fallback if model didn't output JSON but text looks like a valid summary (not an error)
            summary_data = {
                "summary": new_summary_text,
                "topics": [],
                "last_run_id": str(uuid.uuid4())
            }

        # Ensure last_run_id is set (or updated)
        if "last_run_id" not in summary_data:
            summary_data["last_run_id"] = str(uuid.uuid4())

        summary_data["updated_at"] = datetime.now().isoformat()

        # 6. Update Database
        from ..models.db import DbFilter, DbQueryRequest
        from .db_service import get_db_adapter

        adapter = get_db_adapter(database_provider) # Use request-selected provider when available
        if adapter:
            # We update the JSONB session_summary column
            logger.debug(f"Updating summary using adapter {adapter.config.type} for {conversation_id}")

            req = DbQueryRequest(
                providerId=adapter.config.id,
                action="update",
                table="conversations",
                payload={"session_summary": summary_data},
                filters=[DbFilter(op="eq", column="id", value=conversation_id)],
            )

            result = await execute_db_async(adapter, req)

            if result.error:
                 logger.error(f"Failed to update session summary DB: {result.error}")
            else:
                 logger.info(f"Updated session summary for {conversation_id}")
        else:
             logger.warning("No DB adapter available for summary update")

    except Exception as e:
        logger.error(f"Failed to update session summary: {e}")