File size: 5,694 Bytes
033ca06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | """Memory update queue with debounce mechanism."""
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
from src.config.memory_config import get_memory_config
@dataclass
class ConversationContext:
"""Context for a conversation to be processed for memory update."""
thread_id: str
messages: list[Any]
timestamp: datetime = field(default_factory=datetime.utcnow)
class MemoryUpdateQueue:
"""Queue for memory updates with debounce mechanism.
This queue collects conversation contexts and processes them after
a configurable debounce period. Multiple conversations received within
the debounce window are batched together.
"""
def __init__(self):
"""Initialize the memory update queue."""
self._queue: list[ConversationContext] = []
self._lock = threading.Lock()
self._timer: threading.Timer | None = None
self._processing = False
def add(self, thread_id: str, messages: list[Any]) -> None:
"""Add a conversation to the update queue.
Args:
thread_id: The thread ID.
messages: The conversation messages.
"""
config = get_memory_config()
if not config.enabled:
return
context = ConversationContext(
thread_id=thread_id,
messages=messages,
)
with self._lock:
# Check if this thread already has a pending update
# If so, replace it with the newer one
self._queue = [c for c in self._queue if c.thread_id != thread_id]
self._queue.append(context)
# Reset or start the debounce timer
self._reset_timer()
print(f"Memory update queued for thread {thread_id}, queue size: {len(self._queue)}")
def _reset_timer(self) -> None:
"""Reset the debounce timer."""
config = get_memory_config()
# Cancel existing timer if any
if self._timer is not None:
self._timer.cancel()
# Start new timer
self._timer = threading.Timer(
config.debounce_seconds,
self._process_queue,
)
self._timer.daemon = True
self._timer.start()
print(f"Memory update timer set for {config.debounce_seconds}s")
def _process_queue(self) -> None:
"""Process all queued conversation contexts."""
# Import here to avoid circular dependency
from src.agents.memory.updater import MemoryUpdater
with self._lock:
if self._processing:
# Already processing, reschedule
self._reset_timer()
return
if not self._queue:
return
self._processing = True
contexts_to_process = self._queue.copy()
self._queue.clear()
self._timer = None
print(f"Processing {len(contexts_to_process)} queued memory updates")
try:
updater = MemoryUpdater()
for context in contexts_to_process:
try:
print(f"Updating memory for thread {context.thread_id}")
success = updater.update_memory(
messages=context.messages,
thread_id=context.thread_id,
)
if success:
print(f"Memory updated successfully for thread {context.thread_id}")
else:
print(f"Memory update skipped/failed for thread {context.thread_id}")
except Exception as e:
print(f"Error updating memory for thread {context.thread_id}: {e}")
# Small delay between updates to avoid rate limiting
if len(contexts_to_process) > 1:
time.sleep(0.5)
finally:
with self._lock:
self._processing = False
def flush(self) -> None:
"""Force immediate processing of the queue.
This is useful for testing or graceful shutdown.
"""
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
self._process_queue()
def clear(self) -> None:
"""Clear the queue without processing.
This is useful for testing.
"""
with self._lock:
if self._timer is not None:
self._timer.cancel()
self._timer = None
self._queue.clear()
self._processing = False
@property
def pending_count(self) -> int:
"""Get the number of pending updates."""
with self._lock:
return len(self._queue)
@property
def is_processing(self) -> bool:
"""Check if the queue is currently being processed."""
with self._lock:
return self._processing
# Global singleton instance
_memory_queue: MemoryUpdateQueue | None = None
_queue_lock = threading.Lock()
def get_memory_queue() -> MemoryUpdateQueue:
"""Get the global memory update queue singleton.
Returns:
The memory update queue instance.
"""
global _memory_queue
with _queue_lock:
if _memory_queue is None:
_memory_queue = MemoryUpdateQueue()
return _memory_queue
def reset_memory_queue() -> None:
"""Reset the global memory queue.
This is useful for testing.
"""
global _memory_queue
with _queue_lock:
if _memory_queue is not None:
_memory_queue.clear()
_memory_queue = None
|