Qurio / backend-python /src /services /summary_service.py
veeiiinnnnn's picture
Add backend-python and Dockerfile
4ef118d
import json
import uuid
from datetime import datetime
from typing import Any
from agno.agent import Agent
from agno.utils.log import logger
from ..services.agent_registry import get_summary_model
async def update_session_summary(
conversation_id: str,
old_summary: dict[str, Any] | None,
new_messages: list[dict[str, Any]],
database_provider: str | None = None,
memory_provider: str | None = None,
memory_model: str | None = None,
memory_api_key: str | None = None,
memory_base_url: str | None = None,
# New separate summary config
summary_provider: str | None = None,
summary_model: str | None = None,
summary_api_key: str | None = None,
summary_base_url: str | None = None,
rebuild_from_scratch: bool = False,
) -> None:
"""
Async background task to update session summary.
Args:
conversation_id: The conversation UUID
old_summary: The current session summary JSON (or None)
new_messages: List of recent messages to incorporate (User + AI)
database_provider: The database provider ID
memory_provider: Optional provider override
memory_model: Optional model override
memory_api_key: Optional API key override
memory_base_url: Optional base URL override
"""
try:
# 1. Validation
logger.info(f"Triggering update_session_summary for {conversation_id}. Messages count: {len(new_messages)}")
if not conversation_id or not new_messages:
logger.warning("Missing conversation_id or new_messages for summary update")
return
# Only process if we have user input and AI response
# Should contain at least one user message and one assistant message from the latest turn
# 2. Get Lite Model
# We need a dummy request object to reuse get_summary_model logic or just use global settings
# Creating a simple namespace to mock 'request' for get_summary_model
from types import SimpleNamespace
dummy_request = SimpleNamespace(
memory_provider=memory_provider,
memory_model=memory_model,
memory_api_key=memory_api_key,
memory_base_url=memory_base_url,
summary_provider=summary_provider,
summary_model=summary_model,
summary_api_key=summary_api_key,
summary_base_url=summary_base_url,
)
summary_model = get_summary_model(dummy_request)
if not summary_model:
logger.warning(f"Skipping session summary update for {conversation_id}: No summary model configured")
return
# 2a. Re-fetch Latest Summary from DB to avoid race conditions
# The old_summary passed from stream_chat might be stale if requests were fast.
from ..models.db import DbFilter, DbQueryRequest
from .db_service import execute_db_async, get_db_adapter
adapter = get_db_adapter(database_provider)
if rebuild_from_scratch:
old_summary = None
elif adapter:
try:
latest_req = DbQueryRequest(
providerId=adapter.config.id,
action="select",
table="conversations",
columns=["session_summary"],
filters=[DbFilter(op="eq", column="id", value=conversation_id)],
maybeSingle=True,
)
latest_res = await execute_db_async(adapter, latest_req)
if latest_res.data and isinstance(latest_res.data, dict):
row = latest_res.data
raw_summary = row.get("session_summary")
if raw_summary:
if isinstance(raw_summary, str):
try:
old_summary = json.loads(raw_summary)
except (ValueError, json.JSONDecodeError):
pass
elif isinstance(raw_summary, dict):
old_summary = raw_summary
except Exception as db_exc:
logger.warning(f"Failed to re-fetch latest summary, using passed old_summary: {db_exc}")
# 3. Prepare Prompt
if rebuild_from_scratch:
current_summary_text = "None (Starting Fresh Rebuild)"
task_instruction = """Summarize the conversation history from the provided lines into a fresh narrative summary.
- **CRITICAL: Discard any previous context not present in the new lines.**
- Create a concise narrative summary of the complete conversation history."""
else:
current_summary_text = old_summary.get("summary", "") if old_summary else "No summary yet."
task_instruction = """Integrate the new lines into the existing summary.
- **CRITICAL: You MUST PRESERVE all important details from the 'Current Summary'. Do NOT discard existing topics.**
- Merge new information naturally."""
# Extract text content from new messages
conversation_text = ""
for msg in new_messages:
role = msg.get("role", "unknown")
content = msg.get("content", "")
if content:
conversation_text += f"{role.upper()}: {content}\n"
prompt = f"""
You are an expert conversation summarizer.
Current Summary:
{current_summary_text}
New Conversation Lines:
{conversation_text}
Task:
{task_instruction}
- Keep the summary concise but comprehensive (under 500 words).
- **Maintain a concise list of high-level topics (max 5 total). avoid granular details as topics.**
- Output valid JSON format matching the schema below exactly.
Expected JSON Structure:
{{
"summary": "The narrative summary...",
"topics": ["topic1", "topic2"],
"last_active_date": "YYYY-MM-DD"
}}
Time: {datetime.now().isoformat()}
"""
# 4. Generate Summary (Non-streaming)
agent = Agent(
model=summary_model,
description="You are a session summarizer.",
instructions="Output JSON only.",
)
response = await agent.arun(prompt)
new_summary_text = response.content
# 5. Parse Response
# Try to parse strict JSON, if failed, wrap the text
# Define error keywords to check against
error_keywords = ["balance", "quota", "insufficient", "unauthorized", "rate limit", "error", "401", "429", "500"]
try:
# Clean potential markdown code blocks
clean_text = new_summary_text.replace("```json", "").replace("```", "").strip()
summary_data = json.loads(clean_text)
# Validate JSON structure
if not isinstance(summary_data, dict) or "summary" not in summary_data:
logger.warning(f"Invalid summary JSON structure from model: {summary_data}")
# If parsed but invalid structure, treat as failed logic... (omitted detailed comment for brevity)
else:
# VALIDATE CONTENT: Check if the summary text itself is an error message
summary_content = str(summary_data.get("summary", "")).lower()
if any(kw in summary_content for kw in error_keywords) and len(summary_content) < 100:
logger.error(f"Summary content looks like a provider error: {summary_data['summary']}")
return # Stop here to avoid overwriting with error text
except json.JSONDecodeError:
# Check for API error keywords in the raw text
lower_text = new_summary_text.lower()
if any(kw in lower_text for kw in error_keywords) and len(new_summary_text) < 100:
logger.error(f"Summary generation failed with API error message: {new_summary_text}")
return # CRITICAL: Stop here. Do not overwrite DB with error message.
# Fallback if model didn't output JSON but text looks like a valid summary (not an error)
summary_data = {
"summary": new_summary_text,
"topics": [],
"last_run_id": str(uuid.uuid4())
}
# Ensure last_run_id is set (or updated)
if "last_run_id" not in summary_data:
summary_data["last_run_id"] = str(uuid.uuid4())
summary_data["updated_at"] = datetime.now().isoformat()
# 6. Update Database
from ..models.db import DbFilter, DbQueryRequest
from .db_service import get_db_adapter
adapter = get_db_adapter(database_provider) # Use request-selected provider when available
if adapter:
# We update the JSONB session_summary column
logger.debug(f"Updating summary using adapter {adapter.config.type} for {conversation_id}")
req = DbQueryRequest(
providerId=adapter.config.id,
action="update",
table="conversations",
payload={"session_summary": summary_data},
filters=[DbFilter(op="eq", column="id", value=conversation_id)],
)
result = await execute_db_async(adapter, req)
if result.error:
logger.error(f"Failed to update session summary DB: {result.error}")
else:
logger.info(f"Updated session summary for {conversation_id}")
else:
logger.warning("No DB adapter available for summary update")
except Exception as e:
logger.error(f"Failed to update session summary: {e}")