File size: 5,876 Bytes
a2dd2c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import os
import uuid
from typing import TYPE_CHECKING
from helpers import guids

if TYPE_CHECKING:
    from agent import AgentContext

from helpers.print_style import PrintStyle

QUEUE_KEY = "message_queue"
QUEUE_SEQ_KEY = "message_queue_seq"
UPLOAD_FOLDER = "/a0/usr/uploads"


def get_queue(context: "AgentContext") -> list:
    """Get current queue from context.data."""
    return context.get_data(QUEUE_KEY) or []


def _get_next_seq(context: "AgentContext") -> int:
    """Get next sequence number."""
    seq = context.get_data(QUEUE_SEQ_KEY) or 0
    seq += 1
    context.set_data(QUEUE_SEQ_KEY, seq)
    return seq


def _sync_output(context: "AgentContext"):
    """Sync queue to output_data for frontend polling."""
    queue = get_queue(context)
    # Truncate text for frontend display
    truncated = []
    for item in queue:
        truncated.append({
            "id": item["id"],
            "seq": item.get("seq", 0),
            "text": item["text"][:100] + "..." if len(item["text"]) > 100 else item["text"],
            "attachments": [a.split("/")[-1] for a in item.get("attachments", [])],
            "attachment_count": len(item.get("attachments", [])),
        })
    context.set_output_data(QUEUE_KEY, truncated)


def add(
    context: "AgentContext",
    text: str,
    attachments: list[str] | None = None,
    item_id: str | None = None,
) -> dict:
    """Add message to queue. Attachments should be filenames, will be converted to full paths."""
    queue = get_queue(context)
    
    # Convert filenames to full paths
    full_paths = []
    for att in (attachments or []):
        if att.startswith("/"):
            full_paths.append(att)
        else:
            full_paths.append(f"{UPLOAD_FOLDER}/{att}")
    
    item = {
        "id": item_id or guids.generate_id(),
        "seq": _get_next_seq(context),
        "text": text,
        "attachments": full_paths,
    }
    queue.append(item)
    context.set_data(QUEUE_KEY, queue)
    _sync_output(context)
    return item


def remove(context: "AgentContext", item_id: str | None = None) -> int:
    """Remove item(s). If item_id is None, clears all. Returns remaining count."""
    if not item_id:
        context.set_data(QUEUE_KEY, [])
        context.set_output_data(QUEUE_KEY, [])
        return 0
    queue = [i for i in get_queue(context) if i["id"] != item_id]
    context.set_data(QUEUE_KEY, queue)
    _sync_output(context)
    return len(queue)


def pop_first(context: "AgentContext") -> dict | None:
    """Remove and return first item."""
    queue = get_queue(context)
    if not queue:
        return None
    item = queue.pop(0)
    context.set_data(QUEUE_KEY, queue)
    _sync_output(context)
    return item


def pop_item(context: "AgentContext", item_id: str) -> dict | None:
    """Remove and return specific item."""
    queue = get_queue(context)
    for i, item in enumerate(queue):
        if item["id"] == item_id:
            queue.pop(i)
            context.set_data(QUEUE_KEY, queue)
            _sync_output(context)
            return item
    return None


def has_queue(context: "AgentContext") -> bool:
    """Check if queue has items."""
    return len(get_queue(context)) > 0


def log_user_message(
    context: "AgentContext",
    message: str,
    attachment_paths: list[str],
    message_id: str | None = None,
    source: str = "",
):
    """Log user message to console and UI. Used by message API and queue processing."""
    # Prepare attachment filenames for logging
    attachment_filenames = (
        [os.path.basename(path) for path in attachment_paths]
        if attachment_paths
        else []
    )
    
    # Print to console
    label = f"User message{source}:"
    PrintStyle(
        background_color="#6C3483", font_color="white", bold=True, padding=True
    ).print(label)
    PrintStyle(font_color="white", padding=False).print(f"> {message}")
    if attachment_filenames:
        PrintStyle(font_color="white", padding=False).print("Attachments:")
        for filename in attachment_filenames:
            PrintStyle(font_color="white", padding=False).print(f"- {filename}")
    
    # Log to UI
    context.log.log(
        type="user",
        heading="",
        content=message,
        kvps={"attachments": attachment_filenames},
        id=message_id,
    )


def send_message(context: "AgentContext", item: dict, source: str = " (from queue)"):
    """Send a single queued message (log + communicate)."""
    from agent import UserMessage  # Import here to avoid circular import
    
    message = item.get("text", "")
    attachments = item.get("attachments", [])
    msg_id = str(uuid.uuid4())
    log_user_message(context, message, attachments, message_id=msg_id, source=source)
    context.communicate(UserMessage(message, attachments, id=msg_id))


def send_next(context: "AgentContext") -> bool:
    """Send next queued message. Returns True if sent, False if queue empty."""
    if not has_queue(context):
        return False
    item = pop_first(context)
    if item:
        send_message(context, item)
        return True
    return False


def send_all_aggregated(context: "AgentContext") -> int:
    """Aggregate and send all queued messages as one. Returns count of items sent."""
    from agent import UserMessage  # Import here to avoid circular import
    
    if not has_queue(context):
        return 0
    
    items = []
    while has_queue(context):
        items.append(pop_first(context))
    
    # Combine texts with separator
    text = "\n\n---\n\n".join(i["text"] for i in items if i["text"])
    attachments = [a for i in items for a in i.get("attachments", [])]
    
    msg_id = str(uuid.uuid4())
    log_user_message(context, text, attachments, message_id=msg_id, source=" (queued batch)")
    context.communicate(UserMessage(text, attachments, id=msg_id))
    return len(items)