File size: 6,166 Bytes
e8cb8ed
 
 
 
 
 
a0c854c
e8cb8ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3e51aaf
 
 
 
e8cb8ed
 
 
 
 
 
 
 
 
 
a1ddd9f
e8cb8ed
a1ddd9f
e8cb8ed
a1ddd9f
e8cb8ed
 
a1ddd9f
e8cb8ed
 
a1ddd9f
e8cb8ed
 
 
 
 
 
a1ddd9f
e8cb8ed
a1ddd9f
e8cb8ed
 
 
6e7384d
 
 
 
 
 
e8cb8ed
 
 
 
decf3cd
e8cb8ed
 
 
decf3cd
e8cb8ed
 
 
a0c854c
 
 
 
 
 
 
 
 
 
e8cb8ed
 
b1fde37
e8cb8ed
decf3cd
 
 
 
 
 
 
 
 
 
 
 
 
a0c854c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
decf3cd
9736803
 
 
 
 
decf3cd
b1fde37
decf3cd
a1ddd9f
e8cb8ed
 
a1ddd9f
 
 
 
e8cb8ed
 
3e51aaf
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141

from typing import Optional, Dict, Any, List, AsyncGenerator
from google.adk.sessions import DatabaseSessionService, Session
from google.genai import types
import uuid
from .utils import logger
from google.api_core.exceptions import AlreadyExists as AlreadyExistsError

class LazyDatabaseSessionService(DatabaseSessionService):
    """
    A session service that defers database insertion until the first message is added.
    This prevents empty sessions from cluttering the database on page loads.
    """
    def __init__(self, db_url: str):
        super().__init__(db_url=db_url)
        # In-memory store for pending sessions: {session_id: {metadata}}
        self._pending_sessions: Dict[str, Dict[str, Any]] = {}

    async def create_session(self, session_id: str, user_id: str, app_name: str, **kwargs) -> Session:
        """
        Overrides create_session to store metadata in memory instead of DB.
        """
        # FIX: Handle None session_id (generate one if missing)
        if not session_id:
            session_id = str(uuid.uuid4())
            
        logger.info(f"💤 Lazy Session Created (Pending): {session_id}")
        
        # Store metadata for later
        self._pending_sessions[session_id] = {
            "user_id": user_id,
            "app_name": app_name,
            "kwargs": kwargs
        }
        
        # Return a temporary Session object (not persisted yet)
        # FIX: Session model expects 'id', not 'session_id'. And no 'history'.
        return Session(
            id=session_id,
            user_id=user_id,
            app_name=app_name
        )

    async def get_session(self, session_id: str, **kwargs) -> Optional[Session]:
        """
        Checks pending sessions first, then falls back to DB.
        FIX: Added **kwargs to match base signature (which accepts app_name etc.)
        """
        # 1. Check pending
        if session_id in self._pending_sessions:
            meta = self._pending_sessions[session_id]
            # Return a fresh Session object from memory metadata
            return Session(
                id=session_id,
                user_id=meta["user_id"],
                app_name=meta["app_name"]
            )
            
        # 2. Check DB (Super)
        # FIX: Pass session_id, app_name, and user_id as keyword arguments
        return await super().get_session(
            session_id=session_id,
            app_name=kwargs.get("app_name"),
            user_id=kwargs.get("user_id")
        )

    async def add_message(self, session_id: str, message: types.Content) -> None:
        """
        On first message, persists the session to DB before adding the message.
        Note: The Runner might call append_event directly, so we handle it there too.
        """
        # 1. Check if this is a pending session
        if session_id in self._pending_sessions:
            logger.info(f"⏰ Waking up Lazy Session (add_message): {session_id}")
            meta = self._pending_sessions.pop(session_id)
            
            # Persist the session now!
            try:
                await super().create_session(
                    session_id=session_id,
                    user_id=meta["user_id"],
                    app_name=meta["app_name"],
                    **meta["kwargs"]
                )
                logger.info(f"💾 Session {session_id} persisted to DB.")
            except AlreadyExistsError:
                logger.info(f"🤝 Session {session_id} already exists in DB, skipping creation.")
            
        # 2. Add the message (Super)
        await super().add_message(session_id=session_id, message=message)

    async def append_event(self, session: Session, event: Any) -> None:
        """
        Overrides append_event to ensure session exists in DB before appending.
        The Runner calls this method to add user messages/events.
        """
        session_id = session.id
        
        # 1. Check if this is a pending session
        if session_id in self._pending_sessions:
            logger.info(f"⏰ Waking up Lazy Session (append_event): {session_id}")
            meta = self._pending_sessions.pop(session_id)
            
            # Persist the session now!
            try:
                persisted_session = await super().create_session(
                    session_id=session_id,
                    user_id=meta["user_id"],
                    app_name=meta["app_name"],
                    **meta["kwargs"]
                )
                logger.info(f"💾 Session {session_id} persisted to DB.")
            except AlreadyExistsError:
                logger.info(f"🤝 Session {session_id} already exists in DB, fetching existing session.")
                # If it already exists, fetch it to get correct state (timestamps)
                persisted_session = await super().get_session(
                    session_id=session_id,
                    app_name=meta["app_name"],
                    user_id=meta["user_id"]
                )
            
            # FIX: Update the passed session object with the fresh timestamp from the DB
            # This prevents "stale session" errors in append_event
            if hasattr(persisted_session, 'last_update_time'):
                session.last_update_time = persisted_session.last_update_time
            
        # 2. Append the event (Super)
        await super().append_event(session=session, event=event)

    async def list_sessions(self, app_name: str = None, **kwargs) -> List[Session]:
        """
        Overrides list_sessions to EXCLUDE pending sessions.
        FIX: Updated signature to match base class (likely just app_name or kwargs).
        The error said "takes 1 positional argument but 5 were given", which implies
        it might be defined as `list_sessions(self, app_name: str = None)` or similar.
        Safe bet is to accept kwargs and pass them through.
        """
        # Only return sessions that are actually in the DB
        # FIX: Pass app_name as keyword argument to avoid "takes 1 positional argument" error
        return await super().list_sessions(app_name=app_name, **kwargs)