Spaces:
Sleeping
Sleeping
File size: 6,166 Bytes
e8cb8ed a0c854c e8cb8ed 3e51aaf e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed a1ddd9f e8cb8ed 6e7384d e8cb8ed decf3cd e8cb8ed decf3cd e8cb8ed a0c854c e8cb8ed b1fde37 e8cb8ed decf3cd a0c854c decf3cd 9736803 decf3cd b1fde37 decf3cd a1ddd9f e8cb8ed a1ddd9f e8cb8ed 3e51aaf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
from typing import Optional, Dict, Any, List, AsyncGenerator
from google.adk.sessions import DatabaseSessionService, Session
from google.genai import types
import uuid
from .utils import logger
from google.api_core.exceptions import AlreadyExists as AlreadyExistsError
class LazyDatabaseSessionService(DatabaseSessionService):
"""
A session service that defers database insertion until the first message is added.
This prevents empty sessions from cluttering the database on page loads.
"""
def __init__(self, db_url: str):
super().__init__(db_url=db_url)
# In-memory store for pending sessions: {session_id: {metadata}}
self._pending_sessions: Dict[str, Dict[str, Any]] = {}
async def create_session(self, session_id: str, user_id: str, app_name: str, **kwargs) -> Session:
"""
Overrides create_session to store metadata in memory instead of DB.
"""
# FIX: Handle None session_id (generate one if missing)
if not session_id:
session_id = str(uuid.uuid4())
logger.info(f"💤 Lazy Session Created (Pending): {session_id}")
# Store metadata for later
self._pending_sessions[session_id] = {
"user_id": user_id,
"app_name": app_name,
"kwargs": kwargs
}
# Return a temporary Session object (not persisted yet)
# FIX: Session model expects 'id', not 'session_id'. And no 'history'.
return Session(
id=session_id,
user_id=user_id,
app_name=app_name
)
async def get_session(self, session_id: str, **kwargs) -> Optional[Session]:
"""
Checks pending sessions first, then falls back to DB.
FIX: Added **kwargs to match base signature (which accepts app_name etc.)
"""
# 1. Check pending
if session_id in self._pending_sessions:
meta = self._pending_sessions[session_id]
# Return a fresh Session object from memory metadata
return Session(
id=session_id,
user_id=meta["user_id"],
app_name=meta["app_name"]
)
# 2. Check DB (Super)
# FIX: Pass session_id, app_name, and user_id as keyword arguments
return await super().get_session(
session_id=session_id,
app_name=kwargs.get("app_name"),
user_id=kwargs.get("user_id")
)
async def add_message(self, session_id: str, message: types.Content) -> None:
"""
On first message, persists the session to DB before adding the message.
Note: The Runner might call append_event directly, so we handle it there too.
"""
# 1. Check if this is a pending session
if session_id in self._pending_sessions:
logger.info(f"⏰ Waking up Lazy Session (add_message): {session_id}")
meta = self._pending_sessions.pop(session_id)
# Persist the session now!
try:
await super().create_session(
session_id=session_id,
user_id=meta["user_id"],
app_name=meta["app_name"],
**meta["kwargs"]
)
logger.info(f"💾 Session {session_id} persisted to DB.")
except AlreadyExistsError:
logger.info(f"🤝 Session {session_id} already exists in DB, skipping creation.")
# 2. Add the message (Super)
await super().add_message(session_id=session_id, message=message)
async def append_event(self, session: Session, event: Any) -> None:
"""
Overrides append_event to ensure session exists in DB before appending.
The Runner calls this method to add user messages/events.
"""
session_id = session.id
# 1. Check if this is a pending session
if session_id in self._pending_sessions:
logger.info(f"⏰ Waking up Lazy Session (append_event): {session_id}")
meta = self._pending_sessions.pop(session_id)
# Persist the session now!
try:
persisted_session = await super().create_session(
session_id=session_id,
user_id=meta["user_id"],
app_name=meta["app_name"],
**meta["kwargs"]
)
logger.info(f"💾 Session {session_id} persisted to DB.")
except AlreadyExistsError:
logger.info(f"🤝 Session {session_id} already exists in DB, fetching existing session.")
# If it already exists, fetch it to get correct state (timestamps)
persisted_session = await super().get_session(
session_id=session_id,
app_name=meta["app_name"],
user_id=meta["user_id"]
)
# FIX: Update the passed session object with the fresh timestamp from the DB
# This prevents "stale session" errors in append_event
if hasattr(persisted_session, 'last_update_time'):
session.last_update_time = persisted_session.last_update_time
# 2. Append the event (Super)
await super().append_event(session=session, event=event)
async def list_sessions(self, app_name: str = None, **kwargs) -> List[Session]:
"""
Overrides list_sessions to EXCLUDE pending sessions.
FIX: Updated signature to match base class (likely just app_name or kwargs).
The error said "takes 1 positional argument but 5 were given", which implies
it might be defined as `list_sessions(self, app_name: str = None)` or similar.
Safe bet is to accept kwargs and pass them through.
"""
# Only return sessions that are actually in the DB
# FIX: Pass app_name as keyword argument to avoid "takes 1 positional argument" error
return await super().list_sessions(app_name=app_name, **kwargs)
|