akseljoonas HF Staff commited on
Commit
ea2014d
·
1 Parent(s): 93c72b5

fix: buffer websocket events during disconnects and replay on reconnect

Browse files
agent/tools/papers_tool.py CHANGED
@@ -249,7 +249,7 @@ def _format_read_paper_toc(parsed: dict[str, Any], arxiv_id: str) -> str:
249
  lines.append(f"{prefix}- **{s['title']}**: {preview}")
250
 
251
  lines.append(
252
- '\n**Tip:** Call read_paper with section parameter (e.g. section="4" or section="Experiments") to read a specific section.'
253
  )
254
  return "\n".join(lines)
255
 
 
249
  lines.append(f"{prefix}- **{s['title']}**: {preview}")
250
 
251
  lines.append(
252
+ '\nCall read_paper with section parameter (e.g. section="4" or section="Experiments") to read a specific section.'
253
  )
254
  return "\n".join(lines)
255
 
backend/routes/agent.py CHANGED
@@ -386,20 +386,22 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str) -> None:
386
  await websocket.close(code=4003, reason="Access denied")
387
  return
388
 
389
- await ws_manager.connect(websocket, session_id)
390
-
391
- # Send "ready" immediately on WebSocket connection so the frontend
392
- # knows the session is alive. The original ready event from _run_session
393
- # fires before the WS is connected and is always lost.
394
- try:
395
- await websocket.send_json(
396
- {
397
- "event_type": "ready",
398
- "data": {"message": "Agent initialized"},
399
- }
400
- )
401
- except Exception as e:
402
- logger.error(f"Failed to send ready event for session {session_id}: {e}")
 
 
403
 
404
  try:
405
  while True:
 
386
  await websocket.close(code=4003, reason="Access denied")
387
  return
388
 
389
+ had_buffered = await ws_manager.connect(websocket, session_id)
390
+
391
+ # Send "ready" on fresh connections so the frontend knows the session
392
+ # is alive. Skip it when buffered events were flushed — those already
393
+ # contain the correct state and a ready would incorrectly reset
394
+ # isProcessing on the frontend.
395
+ if not had_buffered:
396
+ try:
397
+ await websocket.send_json(
398
+ {
399
+ "event_type": "ready",
400
+ "data": {"message": "Agent initialized"},
401
+ }
402
+ )
403
+ except Exception as e:
404
+ logger.error(f"Failed to send ready event for session {session_id}: {e}")
405
 
406
  try:
407
  while True:
backend/session_manager.py CHANGED
@@ -320,6 +320,8 @@ class SessionManager:
320
  if not agent_session:
321
  return False
322
 
 
 
323
  # Clean up sandbox Space before cancelling the task
324
  await self._cleanup_sandbox(agent_session.session)
325
 
 
320
  if not agent_session:
321
  return False
322
 
323
+ ws_manager.clear_buffer(session_id)
324
+
325
  # Clean up sandbox Space before cancelling the task
326
  await self._cleanup_sandbox(agent_session.session)
327
 
backend/websocket.py CHANGED
@@ -1,12 +1,16 @@
1
  """WebSocket connection manager for real-time communication."""
2
 
3
  import logging
 
4
  from typing import Any
5
 
6
  from fastapi import WebSocket
7
 
8
  logger = logging.getLogger(__name__)
9
 
 
 
 
10
 
11
  class ConnectionManager:
12
  """Manages WebSocket connections for multiple sessions."""
@@ -14,13 +18,36 @@ class ConnectionManager:
14
  def __init__(self) -> None:
15
  # session_id -> WebSocket
16
  self.active_connections: dict[str, WebSocket] = {}
 
 
 
 
 
17
 
18
- async def connect(self, websocket: WebSocket, session_id: str) -> None:
19
- """Accept a WebSocket connection and register it."""
 
20
  logger.info(f"Attempting to accept WebSocket for session {session_id}")
21
  await websocket.accept()
22
  self.active_connections[session_id] = websocket
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  logger.info(f"WebSocket connected and registered for session {session_id}")
 
24
 
25
  def disconnect(self, session_id: str) -> None:
26
  """Remove a WebSocket connection."""
@@ -28,23 +55,39 @@ class ConnectionManager:
28
  del self.active_connections[session_id]
29
  logger.info(f"WebSocket disconnected for session {session_id}")
30
 
 
 
 
 
31
  async def send_event(
32
  self, session_id: str, event_type: str, data: dict[str, Any] | None = None
33
  ) -> None:
34
- """Send an event to a specific session's WebSocket."""
35
- if session_id not in self.active_connections:
36
- logger.warning(f"No active connection for session {session_id}")
37
- return
38
 
39
- message = {"event_type": event_type}
 
 
 
40
  if data is not None:
41
  message["data"] = data
42
 
 
 
 
 
 
 
 
43
  try:
44
  await self.active_connections[session_id].send_json(message)
45
  except Exception as e:
46
  logger.error(f"Error sending to session {session_id}: {e}")
47
  self.disconnect(session_id)
 
 
 
 
 
48
 
49
  async def broadcast(
50
  self, event_type: str, data: dict[str, Any] | None = None
 
1
  """WebSocket connection manager for real-time communication."""
2
 
3
  import logging
4
+ from collections import deque
5
  from typing import Any
6
 
7
  from fastapi import WebSocket
8
 
9
  logger = logging.getLogger(__name__)
10
 
11
+ # Max events buffered per session while WS is disconnected.
12
+ MAX_EVENT_BUFFER = 500
13
+
14
 
15
  class ConnectionManager:
16
  """Manages WebSocket connections for multiple sessions."""
 
18
  def __init__(self) -> None:
19
  # session_id -> WebSocket
20
  self.active_connections: dict[str, WebSocket] = {}
21
+ # session_id -> events buffered while WS was disconnected
22
+ self._event_buffers: dict[str, deque[dict[str, Any]]] = {}
23
+
24
+ async def connect(self, websocket: WebSocket, session_id: str) -> bool:
25
+ """Accept a WebSocket connection, register it, and flush buffered events.
26
 
27
+ Returns True if buffered events were flushed (i.e. this is a reconnect
28
+ that had missed events).
29
+ """
30
  logger.info(f"Attempting to accept WebSocket for session {session_id}")
31
  await websocket.accept()
32
  self.active_connections[session_id] = websocket
33
+
34
+ # Flush events that were buffered while the WS was disconnected
35
+ buffered = self._event_buffers.pop(session_id, None)
36
+ if buffered:
37
+ logger.info(
38
+ f"Flushing {len(buffered)} buffered events for session {session_id}"
39
+ )
40
+ for message in buffered:
41
+ try:
42
+ await websocket.send_json(message)
43
+ except Exception:
44
+ logger.error(
45
+ f"Error flushing buffered event for session {session_id}"
46
+ )
47
+ break
48
+
49
  logger.info(f"WebSocket connected and registered for session {session_id}")
50
+ return bool(buffered)
51
 
52
  def disconnect(self, session_id: str) -> None:
53
  """Remove a WebSocket connection."""
 
55
  del self.active_connections[session_id]
56
  logger.info(f"WebSocket disconnected for session {session_id}")
57
 
58
+ def clear_buffer(self, session_id: str) -> None:
59
+ """Clear the event buffer for a session (e.g. on session delete)."""
60
+ self._event_buffers.pop(session_id, None)
61
+
62
  async def send_event(
63
  self, session_id: str, event_type: str, data: dict[str, Any] | None = None
64
  ) -> None:
65
+ """Send an event to a specific session's WebSocket.
 
 
 
66
 
67
+ If no WebSocket is connected, the event is buffered so it can be
68
+ replayed when the client reconnects.
69
+ """
70
+ message: dict[str, Any] = {"event_type": event_type}
71
  if data is not None:
72
  message["data"] = data
73
 
74
+ if session_id not in self.active_connections:
75
+ buf = self._event_buffers.setdefault(
76
+ session_id, deque(maxlen=MAX_EVENT_BUFFER)
77
+ )
78
+ buf.append(message)
79
+ return
80
+
81
  try:
82
  await self.active_connections[session_id].send_json(message)
83
  except Exception as e:
84
  logger.error(f"Error sending to session {session_id}: {e}")
85
  self.disconnect(session_id)
86
+ # Buffer the event that failed to send
87
+ buf = self._event_buffers.setdefault(
88
+ session_id, deque(maxlen=MAX_EVENT_BUFFER)
89
+ )
90
+ buf.append(message)
91
 
92
  async def broadcast(
93
  self, event_type: str, data: dict[str, Any] | None = None
frontend/src/lib/ws-chat-transport.ts CHANGED
@@ -402,7 +402,13 @@ export class WebSocketChatTransport implements ChatTransport<UIMessage> {
402
  switch (event.event_type) {
403
  // -- Side-channel only events ----------------------------------------
404
  case 'ready':
405
- this.sideChannel.onReady();
 
 
 
 
 
 
406
  break;
407
 
408
  case 'shutdown':
 
402
  switch (event.event_type) {
403
  // -- Side-channel only events ----------------------------------------
404
  case 'ready':
405
+ if (this.streamController) {
406
+ // Reconnect during an active turn — don't reset processing state,
407
+ // just signal that the connection is back.
408
+ this.sideChannel.onConnectionChange(true);
409
+ } else {
410
+ this.sideChannel.onReady();
411
+ }
412
  break;
413
 
414
  case 'shutdown':