File size: 3,786 Bytes
0157ac7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""CLI event handling for a single queued node (transcript + session + errors)."""

from __future__ import annotations

from collections.abc import Awaitable, Callable
from typing import Any

from loguru import logger

from .cli_event_constants import TRANSCRIPT_EVENT_TYPES, get_status_for_event
from .platforms.base import SessionManagerInterface
from .safe_diagnostics import text_len_hint
from .session import SessionStore
from .transcript import TranscriptBuffer
from .trees.queue_manager import MessageState, MessageTree


async def handle_session_info_event(
    event_data: dict[str, Any],
    tree: MessageTree | None,
    node_id: str,
    captured_session_id: str | None,
    temp_session_id: str | None,
    *,
    cli_manager: SessionManagerInterface,
    session_store: SessionStore,
) -> tuple[str | None, str | None]:
    """Handle session_info event; return updated (captured_session_id, temp_session_id)."""
    if event_data.get("type") != "session_info":
        return captured_session_id, temp_session_id

    real_session_id = event_data.get("session_id")
    if not real_session_id or not temp_session_id:
        return captured_session_id, temp_session_id

    await cli_manager.register_real_session_id(temp_session_id, real_session_id)
    if tree and real_session_id:
        await tree.update_state(
            node_id,
            MessageState.IN_PROGRESS,
            session_id=real_session_id,
        )
        session_store.save_tree(tree.root_id, tree.to_dict())

    return real_session_id, None


async def process_parsed_cli_event(
    parsed: dict[str, Any],
    transcript: TranscriptBuffer,
    update_ui: Callable[..., Awaitable[None]],
    last_status: str | None,
    had_transcript_events: bool,
    tree: MessageTree | None,
    node_id: str,
    captured_session_id: str | None,
    *,
    session_store: SessionStore,
    format_status: Callable[..., str],
    propagate_error_to_children: Callable[[str, str, str], Awaitable[None]],
    log_messaging_error_details: bool = False,
) -> tuple[str | None, bool]:
    """Process a single parsed CLI event. Returns (last_status, had_transcript_events)."""
    ptype = parsed.get("type") or ""

    if ptype in TRANSCRIPT_EVENT_TYPES:
        transcript.apply(parsed)
        had_transcript_events = True

    status = get_status_for_event(ptype, parsed, format_status)
    if status is not None:
        await update_ui(status)
        last_status = status
    elif ptype == "block_stop":
        await update_ui(last_status, force=True)
    elif ptype == "complete":
        if not had_transcript_events:
            transcript.apply({"type": "text_chunk", "text": "Done."})
        logger.info("HANDLER: Task complete, updating UI")
        await update_ui(format_status("✅", "Complete"), force=True)
        if tree and captured_session_id:
            await tree.update_state(
                node_id,
                MessageState.COMPLETED,
                session_id=captured_session_id,
            )
            session_store.save_tree(tree.root_id, tree.to_dict())
    elif ptype == "error":
        error_msg = parsed.get("message", "Unknown error")
        if log_messaging_error_details:
            logger.error("HANDLER: Error event received: {}", error_msg)
        else:
            em = error_msg if isinstance(error_msg, str) else str(error_msg)
            logger.error(
                "HANDLER: Error event received: message_chars={}",
                text_len_hint(em),
            )
        logger.info("HANDLER: Updating UI with error status")
        await update_ui(format_status("❌", "Error"), force=True)
        if tree:
            await propagate_error_to_children(node_id, error_msg, "Parent task failed")

    return last_status, had_transcript_events