Spaces:
Running
Running
| """CLI event handling for a single queued node (transcript + session + errors).""" | |
| from __future__ import annotations | |
| from collections.abc import Awaitable, Callable | |
| from typing import Any | |
| from loguru import logger | |
| from .cli_event_constants import TRANSCRIPT_EVENT_TYPES, get_status_for_event | |
| from .platforms.base import SessionManagerInterface | |
| from .safe_diagnostics import text_len_hint | |
| from .session import SessionStore | |
| from .transcript import TranscriptBuffer | |
| from .trees.queue_manager import MessageState, MessageTree | |
| async def handle_session_info_event( | |
| event_data: dict[str, Any], | |
| tree: MessageTree | None, | |
| node_id: str, | |
| captured_session_id: str | None, | |
| temp_session_id: str | None, | |
| *, | |
| cli_manager: SessionManagerInterface, | |
| session_store: SessionStore, | |
| ) -> tuple[str | None, str | None]: | |
| """Handle session_info event; return updated (captured_session_id, temp_session_id).""" | |
| if event_data.get("type") != "session_info": | |
| return captured_session_id, temp_session_id | |
| real_session_id = event_data.get("session_id") | |
| if not real_session_id or not temp_session_id: | |
| return captured_session_id, temp_session_id | |
| await cli_manager.register_real_session_id(temp_session_id, real_session_id) | |
| if tree and real_session_id: | |
| await tree.update_state( | |
| node_id, | |
| MessageState.IN_PROGRESS, | |
| session_id=real_session_id, | |
| ) | |
| session_store.save_tree(tree.root_id, tree.to_dict()) | |
| return real_session_id, None | |
| async def process_parsed_cli_event( | |
| parsed: dict[str, Any], | |
| transcript: TranscriptBuffer, | |
| update_ui: Callable[..., Awaitable[None]], | |
| last_status: str | None, | |
| had_transcript_events: bool, | |
| tree: MessageTree | None, | |
| node_id: str, | |
| captured_session_id: str | None, | |
| *, | |
| session_store: SessionStore, | |
| format_status: Callable[..., str], | |
| propagate_error_to_children: Callable[[str, str, str], Awaitable[None]], | |
| log_messaging_error_details: bool = False, | |
| ) -> tuple[str | None, bool]: | |
| """Process a single parsed CLI event. Returns (last_status, had_transcript_events).""" | |
| ptype = parsed.get("type") or "" | |
| if ptype in TRANSCRIPT_EVENT_TYPES: | |
| transcript.apply(parsed) | |
| had_transcript_events = True | |
| status = get_status_for_event(ptype, parsed, format_status) | |
| if status is not None: | |
| await update_ui(status) | |
| last_status = status | |
| elif ptype == "block_stop": | |
| await update_ui(last_status, force=True) | |
| elif ptype == "complete": | |
| if not had_transcript_events: | |
| transcript.apply({"type": "text_chunk", "text": "Done."}) | |
| logger.info("HANDLER: Task complete, updating UI") | |
| await update_ui(format_status("✅", "Complete"), force=True) | |
| if tree and captured_session_id: | |
| await tree.update_state( | |
| node_id, | |
| MessageState.COMPLETED, | |
| session_id=captured_session_id, | |
| ) | |
| session_store.save_tree(tree.root_id, tree.to_dict()) | |
| elif ptype == "error": | |
| error_msg = parsed.get("message", "Unknown error") | |
| if log_messaging_error_details: | |
| logger.error("HANDLER: Error event received: {}", error_msg) | |
| else: | |
| em = error_msg if isinstance(error_msg, str) else str(error_msg) | |
| logger.error( | |
| "HANDLER: Error event received: message_chars={}", | |
| text_len_hint(em), | |
| ) | |
| logger.info("HANDLER: Updating UI with error status") | |
| await update_ui(format_status("❌", "Error"), force=True) | |
| if tree: | |
| await propagate_error_to_children(node_id, error_msg, "Parent task failed") | |
| return last_status, had_transcript_events | |