claude-code-proxy / messaging /node_event_pipeline.py
Yash030's picture
Deploy claude-code-nvidia proxy to Hugging Face Spaces
0157ac7
"""CLI event handling for a single queued node (transcript + session + errors)."""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import Any
from loguru import logger
from .cli_event_constants import TRANSCRIPT_EVENT_TYPES, get_status_for_event
from .platforms.base import SessionManagerInterface
from .safe_diagnostics import text_len_hint
from .session import SessionStore
from .transcript import TranscriptBuffer
from .trees.queue_manager import MessageState, MessageTree
async def handle_session_info_event(
event_data: dict[str, Any],
tree: MessageTree | None,
node_id: str,
captured_session_id: str | None,
temp_session_id: str | None,
*,
cli_manager: SessionManagerInterface,
session_store: SessionStore,
) -> tuple[str | None, str | None]:
"""Handle session_info event; return updated (captured_session_id, temp_session_id)."""
if event_data.get("type") != "session_info":
return captured_session_id, temp_session_id
real_session_id = event_data.get("session_id")
if not real_session_id or not temp_session_id:
return captured_session_id, temp_session_id
await cli_manager.register_real_session_id(temp_session_id, real_session_id)
if tree and real_session_id:
await tree.update_state(
node_id,
MessageState.IN_PROGRESS,
session_id=real_session_id,
)
session_store.save_tree(tree.root_id, tree.to_dict())
return real_session_id, None
async def process_parsed_cli_event(
parsed: dict[str, Any],
transcript: TranscriptBuffer,
update_ui: Callable[..., Awaitable[None]],
last_status: str | None,
had_transcript_events: bool,
tree: MessageTree | None,
node_id: str,
captured_session_id: str | None,
*,
session_store: SessionStore,
format_status: Callable[..., str],
propagate_error_to_children: Callable[[str, str, str], Awaitable[None]],
log_messaging_error_details: bool = False,
) -> tuple[str | None, bool]:
"""Process a single parsed CLI event. Returns (last_status, had_transcript_events)."""
ptype = parsed.get("type") or ""
if ptype in TRANSCRIPT_EVENT_TYPES:
transcript.apply(parsed)
had_transcript_events = True
status = get_status_for_event(ptype, parsed, format_status)
if status is not None:
await update_ui(status)
last_status = status
elif ptype == "block_stop":
await update_ui(last_status, force=True)
elif ptype == "complete":
if not had_transcript_events:
transcript.apply({"type": "text_chunk", "text": "Done."})
logger.info("HANDLER: Task complete, updating UI")
await update_ui(format_status("✅", "Complete"), force=True)
if tree and captured_session_id:
await tree.update_state(
node_id,
MessageState.COMPLETED,
session_id=captured_session_id,
)
session_store.save_tree(tree.root_id, tree.to_dict())
elif ptype == "error":
error_msg = parsed.get("message", "Unknown error")
if log_messaging_error_details:
logger.error("HANDLER: Error event received: {}", error_msg)
else:
em = error_msg if isinstance(error_msg, str) else str(error_msg)
logger.error(
"HANDLER: Error event received: message_chars={}",
text_len_hint(em),
)
logger.info("HANDLER: Updating UI with error status")
await update_ui(format_status("❌", "Error"), force=True)
if tree:
await propagate_error_to_children(node_id, error_msg, "Parent task failed")
return last_status, had_transcript_events