akseljoonas HF Staff Claude Opus 4.6 commited on
Commit
00a57cd
·
1 Parent(s): c92e6dc

feat: SSE keepalive, reconnection, and is_processing flag

Browse files

Prevents SSE streams from being killed by reverse proxies during long
operations and lets the frontend recover gracefully after sleep/disconnect.

Backend:
- SSE keepalive pings every 15s via `: keepalive` comments
- New GET /api/events/{session_id} subscribe-only endpoint for reconnection
- is_processing flag on AgentSession, exposed in session info API

Frontend:
- reconnectToStream() re-attaches to a running session via /api/events
- Visibility change re-hydrates ALL sessions from backend on wake
- Background sessions also re-sync (removed isActive guard on listener)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

backend/models.py CHANGED
@@ -75,6 +75,7 @@ class SessionInfo(BaseModel):
75
  session_id: str
76
  created_at: str
77
  is_active: bool
 
78
  message_count: int
79
  user_id: str = "dev"
80
  pending_approval: list[PendingApprovalTool] | None = None
 
75
  session_id: str
76
  created_at: str
77
  is_active: bool
78
+ is_processing: bool = False
79
  message_count: int
80
  user_id: str = "dev"
81
  pending_approval: list[PendingApprovalTool] | None = None
backend/routes/agent.py CHANGED
@@ -4,6 +4,7 @@ All routes (except /health) require authentication via the get_current_user
4
  dependency. In dev mode (no OAUTH_CLIENT_ID), auth is bypassed automatically.
5
  """
6
 
 
7
  import json
8
  import logging
9
  from typing import Any
@@ -336,17 +337,34 @@ async def chat_sse(
336
  broadcaster.unsubscribe(sub_id)
337
  raise
338
 
339
- # Terminal events that end the SSE stream
340
- TERMINAL_EVENTS = {"turn_complete", "approval_required", "error", "interrupted", "shutdown"}
 
 
 
 
 
 
 
 
 
 
 
341
 
342
  async def event_generator():
343
  try:
344
  while True:
345
- msg = await event_queue.get()
 
 
 
 
 
 
 
346
  event_type = msg.get("event_type", "")
347
- # Format as SSE
348
  yield f"data: {json.dumps(msg)}\n\n"
349
- if event_type in TERMINAL_EVENTS:
350
  break
351
  finally:
352
  broadcaster.unsubscribe(sub_id)
@@ -362,6 +380,27 @@ async def chat_sse(
362
  )
363
 
364
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
365
  @router.post("/interrupt/{session_id}")
366
  async def interrupt_session(
367
  session_id: str, user: dict = Depends(get_current_user)
 
4
  dependency. In dev mode (no OAUTH_CLIENT_ID), auth is bypassed automatically.
5
  """
6
 
7
+ import asyncio
8
  import json
9
  import logging
10
  from typing import Any
 
337
  broadcaster.unsubscribe(sub_id)
338
  raise
339
 
340
+ return _sse_response(broadcaster, event_queue, sub_id)
341
+
342
+
343
+ # ---------------------------------------------------------------------------
344
+ # Shared SSE helpers
345
+ # ---------------------------------------------------------------------------
346
+ _TERMINAL_EVENTS = {"turn_complete", "approval_required", "error", "interrupted", "shutdown"}
347
+ _SSE_KEEPALIVE_SECONDS = 15
348
+
349
+
350
+ def _sse_response(broadcaster, event_queue, sub_id) -> StreamingResponse:
351
+ """Build a StreamingResponse that drains *event_queue* as SSE,
352
+ sending keepalive comments every 15 s to prevent proxy timeouts."""
353
 
354
  async def event_generator():
355
  try:
356
  while True:
357
+ try:
358
+ msg = await asyncio.wait_for(
359
+ event_queue.get(), timeout=_SSE_KEEPALIVE_SECONDS
360
+ )
361
+ except asyncio.TimeoutError:
362
+ # SSE comment — ignored by parsers, keeps connection alive
363
+ yield ": keepalive\n\n"
364
+ continue
365
  event_type = msg.get("event_type", "")
 
366
  yield f"data: {json.dumps(msg)}\n\n"
367
+ if event_type in _TERMINAL_EVENTS:
368
  break
369
  finally:
370
  broadcaster.unsubscribe(sub_id)
 
380
  )
381
 
382
 
383
+ @router.get("/events/{session_id}")
384
+ async def subscribe_events(
385
+ session_id: str,
386
+ user: dict = Depends(get_current_user),
387
+ ) -> StreamingResponse:
388
+ """Subscribe to events for a running session without submitting new input.
389
+
390
+ Used by the frontend to re-attach after a connection drop (e.g. screen
391
+ sleep). Returns 404 if the session isn't active or isn't processing.
392
+ """
393
+ _check_session_access(session_id, user)
394
+
395
+ agent_session = session_manager.sessions.get(session_id)
396
+ if not agent_session or not agent_session.is_active:
397
+ raise HTTPException(status_code=404, detail="Session not found or inactive")
398
+
399
+ broadcaster = agent_session.broadcaster
400
+ sub_id, event_queue = broadcaster.subscribe()
401
+ return _sse_response(broadcaster, event_queue, sub_id)
402
+
403
+
404
  @router.post("/interrupt/{session_id}")
405
  async def interrupt_session(
406
  session_id: str, user: dict = Depends(get_current_user)
backend/session_manager.py CHANGED
@@ -89,6 +89,7 @@ class AgentSession:
89
  task: asyncio.Task | None = None
90
  created_at: datetime = field(default_factory=datetime.utcnow)
91
  is_active: bool = True
 
92
  broadcaster: Any = None
93
 
94
 
@@ -245,7 +246,11 @@ class SessionManager:
245
  submission = await asyncio.wait_for(
246
  submission_queue.get(), timeout=1.0
247
  )
248
- should_continue = await process_submission(session, submission)
 
 
 
 
249
  if not should_continue:
250
  break
251
  except asyncio.TimeoutError:
@@ -405,6 +410,7 @@ class SessionManager:
405
  "session_id": session_id,
406
  "created_at": agent_session.created_at.isoformat(),
407
  "is_active": agent_session.is_active,
 
408
  "message_count": len(agent_session.session.context_manager.items),
409
  "user_id": agent_session.user_id,
410
  "pending_approval": pending_approval,
 
89
  task: asyncio.Task | None = None
90
  created_at: datetime = field(default_factory=datetime.utcnow)
91
  is_active: bool = True
92
+ is_processing: bool = False # True while a submission is being executed
93
  broadcaster: Any = None
94
 
95
 
 
246
  submission = await asyncio.wait_for(
247
  submission_queue.get(), timeout=1.0
248
  )
249
+ agent_session.is_processing = True
250
+ try:
251
+ should_continue = await process_submission(session, submission)
252
+ finally:
253
+ agent_session.is_processing = False
254
  if not should_continue:
255
  break
256
  except asyncio.TimeoutError:
 
410
  "session_id": session_id,
411
  "created_at": agent_session.created_at.isoformat(),
412
  "is_active": agent_session.is_active,
413
+ "is_processing": agent_session.is_processing,
414
  "message_count": len(agent_session.session.context_manager.items),
415
  "user_id": agent_session.user_id,
416
  "pending_approval": pending_approval,
frontend/src/components/SessionChat.tsx CHANGED
@@ -44,10 +44,10 @@ export default function SessionChat({ sessionId, isActive, onSessionDead }: Sess
44
 
45
  // Re-sync state when the browser tab regains focus (Chrome throttles
46
  // timers in background tabs which can stall the AI SDK's update flushing).
 
47
  useEffect(() => {
48
- if (!isActive) return;
49
  const onVisible = () => {
50
- if (document.visibilityState === 'visible') {
51
  useAgentStore.getState().switchActiveSession(sessionId);
52
  }
53
  };
 
44
 
45
  // Re-sync state when the browser tab regains focus (Chrome throttles
46
  // timers in background tabs which can stall the AI SDK's update flushing).
47
+ // Fires for ALL sessions so background sessions also recover after sleep.
48
  useEffect(() => {
 
49
  const onVisible = () => {
50
+ if (document.visibilityState === 'visible' && isActive) {
51
  useAgentStore.getState().switchActiveSession(sessionId);
52
  }
53
  };
frontend/src/hooks/useAgentChat.ts CHANGED
@@ -339,6 +339,53 @@ export function useAgentChat({ sessionId, isActive, onReady, onError, onSessionD
339
  return () => { cancelled = true; };
340
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
341
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
342
  // -- Persist messages ---------------------------------------------------
343
  const prevLenRef = useRef(initialMessages.length);
344
  useEffect(() => {
 
339
  return () => { cancelled = true; };
340
  }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
341
 
342
+ // -- Re-hydrate on wake from sleep (SSE stream may have died) -----------
343
+ const rehydratingRef = useRef(false);
344
+ useEffect(() => {
345
+ const onVisible = async () => {
346
+ if (document.visibilityState !== 'visible') return;
347
+ if (rehydratingRef.current) return;
348
+ rehydratingRef.current = true;
349
+ try {
350
+ const [msgsRes, infoRes] = await Promise.all([
351
+ apiFetch(`/api/session/${sessionId}/messages`),
352
+ apiFetch(`/api/session/${sessionId}`),
353
+ ]);
354
+ if (!msgsRes.ok || !infoRes.ok) return;
355
+ const info = await infoRes.json();
356
+ const data = await msgsRes.json();
357
+ if (!Array.isArray(data) || data.length === 0) return;
358
+
359
+ // Rebuild pending-approval set
360
+ let pendingIds: Set<string> | undefined;
361
+ if (info.pending_approval && Array.isArray(info.pending_approval)) {
362
+ pendingIds = new Set(
363
+ info.pending_approval.map((t: { tool_call_id: string }) => t.tool_call_id)
364
+ );
365
+ if (pendingIds.size > 0) setNeedsAttention(sessionId, true);
366
+ }
367
+
368
+ const uiMsgs = llmMessagesToUIMessages(data, pendingIds);
369
+ if (uiMsgs.length > 0) {
370
+ chat.setMessages(uiMsgs);
371
+ saveMessages(sessionId, uiMsgs);
372
+ }
373
+
374
+ // If the backend is still processing but we lost the SSE stream,
375
+ // mark the UI as busy so the chat input stays disabled.
376
+ if (info.is_processing) {
377
+ updateSession(sessionId, { isProcessing: true, activityStatus: { type: 'thinking' } });
378
+ }
379
+ } catch {
380
+ /* ignore — backend may be briefly unreachable */
381
+ } finally {
382
+ rehydratingRef.current = false;
383
+ }
384
+ };
385
+ document.addEventListener('visibilitychange', onVisible);
386
+ return () => document.removeEventListener('visibilitychange', onVisible);
387
+ }, [sessionId]); // eslint-disable-line react-hooks/exhaustive-deps
388
+
389
  // -- Persist messages ---------------------------------------------------
390
  const prevLenRef = useRef(initialMessages.length);
391
  useEffect(() => {
frontend/src/lib/sse-chat-transport.ts CHANGED
@@ -360,7 +360,29 @@ export class SSEChatTransport implements ChatTransport<UIMessage> {
360
  }
361
 
362
  async reconnectToStream(): Promise<ReadableStream<UIMessageChunk> | null> {
363
- // Not needed each turn is a separate request
364
- return null;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
365
  }
366
  }
 
360
  }
361
 
362
  async reconnectToStream(): Promise<ReadableStream<UIMessageChunk> | null> {
363
+ // Check if the backend session is still processing a turn.
364
+ // If so, subscribe to its event stream so we can resume live updates
365
+ // (e.g. after page refresh or wake-from-sleep reconnection).
366
+ try {
367
+ const infoRes = await apiFetch(`/api/session/${this.sessionId}`);
368
+ if (!infoRes.ok) return null;
369
+ const info = await infoRes.json();
370
+ if (!info.is_processing) return null;
371
+
372
+ // Session is mid-turn — subscribe to its event broadcast.
373
+ const response = await apiFetch(`/api/events/${this.sessionId}`, {
374
+ headers: { 'Accept': 'text/event-stream' },
375
+ });
376
+ if (!response.ok || !response.body) return null;
377
+
378
+ this.sideChannel.onProcessing();
379
+
380
+ return response.body
381
+ .pipeThrough(new TextDecoderStream())
382
+ .pipeThrough(createSSEParserStream())
383
+ .pipeThrough(createEventToChunkStream(this.sideChannel));
384
+ } catch {
385
+ return null;
386
+ }
387
  }
388
  }