Spaces:
Running
Running
File size: 9,831 Bytes
0157ac7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 | """Command handlers for messaging platform commands (/stop, /stats, /clear).
Extracted from ClaudeMessageHandler to keep handler.py focused on
core message processing logic.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from loguru import logger
if TYPE_CHECKING:
from messaging.handler import ClaudeMessageHandler
from messaging.models import IncomingMessage
async def handle_stop_command(
handler: ClaudeMessageHandler, incoming: IncomingMessage
) -> None:
"""Handle /stop command from messaging platform."""
# Reply-scoped stop: reply "/stop" to stop only that task.
if incoming.is_reply() and incoming.reply_to_message_id:
reply_id = incoming.reply_to_message_id
tree = handler.tree_queue.get_tree_for_node(reply_id)
node_id = handler.tree_queue.resolve_parent_node_id(reply_id) if tree else None
if not node_id:
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
handler.format_status(
"⏹", "Stopped.", "Nothing to stop for that message."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
return
count = await handler.stop_task(node_id)
noun = "request" if count == 1 else "requests"
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
handler.format_status("⏹", "Stopped.", f"Cancelled {count} {noun}."),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
return
# Global stop: legacy behavior (stop everything)
count = await handler.stop_all_tasks()
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
handler.format_status(
"⏹", "Stopped.", f"Cancelled {count} pending or active requests."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
async def handle_stats_command(
handler: ClaudeMessageHandler, incoming: IncomingMessage
) -> None:
"""Handle /stats command."""
stats = handler.cli_manager.get_stats()
tree_count = handler.tree_queue.get_tree_count()
ctx = handler.get_render_ctx()
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
"📊 "
+ ctx.bold("Stats")
+ "\n"
+ ctx.escape_text(f"• Active CLI: {stats['active_sessions']}")
+ "\n"
+ ctx.escape_text(f"• Message Trees: {tree_count}"),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
async def _delete_message_ids(
handler: ClaudeMessageHandler, chat_id: str, msg_ids: set[str]
) -> None:
"""Best-effort delete messages by ID. Sorts numeric IDs descending."""
if not msg_ids:
return
def _as_int(s: str) -> int | None:
try:
return int(str(s))
except Exception:
return None
numeric: list[tuple[int, str]] = []
non_numeric: list[str] = []
for mid in msg_ids:
n = _as_int(mid)
if n is None:
non_numeric.append(mid)
else:
numeric.append((n, mid))
numeric.sort(reverse=True)
ordered = [mid for _, mid in numeric] + non_numeric
try:
CHUNK = 100
for i in range(0, len(ordered), CHUNK):
chunk = ordered[i : i + CHUNK]
await handler.platform.queue_delete_messages(
chat_id, chunk, fire_and_forget=False
)
except Exception as e:
logger.debug(f"Batch delete failed: {type(e).__name__}: {e}")
async def _handle_clear_branch(
handler: ClaudeMessageHandler,
incoming: IncomingMessage,
branch_root_id: str,
) -> None:
"""
Clear a branch (replied-to node + all descendants).
Order: cancel tasks, delete messages, remove branch, update session store.
"""
tree = handler.tree_queue.get_tree_for_node(branch_root_id)
if not tree:
return
# 1) Cancel branch tasks (no stop_all)
cancelled = await handler.tree_queue.cancel_branch(branch_root_id)
handler.update_cancelled_nodes_ui(cancelled)
# 2) Collect message IDs from branch nodes only
msg_ids: set[str] = set()
branch_ids = tree.get_descendants(branch_root_id)
for nid in branch_ids:
node = tree.get_node(nid)
if node:
if node.incoming.message_id:
msg_ids.add(str(node.incoming.message_id))
if node.status_message_id:
msg_ids.add(str(node.status_message_id))
if incoming.message_id:
msg_ids.add(str(incoming.message_id))
# 3) Delete messages (best-effort)
await _delete_message_ids(handler, incoming.chat_id, msg_ids)
# 4) Remove branch from tree
removed, root_id, removed_entire_tree = await handler.tree_queue.remove_branch(
branch_root_id
)
# 5) Update session store
try:
handler.session_store.remove_node_mappings([n.node_id for n in removed])
if removed_entire_tree:
handler.session_store.remove_tree(root_id)
else:
updated_tree = handler.tree_queue.get_tree(root_id)
if updated_tree:
handler.session_store.save_tree(root_id, updated_tree.to_dict())
except Exception as e:
logger.warning(f"Failed to update session store after branch clear: {e}")
async def handle_clear_command(
handler: ClaudeMessageHandler, incoming: IncomingMessage
) -> None:
"""
Handle /clear command.
Reply-scoped: reply to a message to clear that branch (node + descendants).
Standalone: global clear (stop all, delete all chat messages, reset store).
"""
from messaging.trees import TreeQueueManager
if incoming.is_reply() and incoming.reply_to_message_id:
reply_id = incoming.reply_to_message_id
tree = handler.tree_queue.get_tree_for_node(reply_id)
branch_root_id = (
handler.tree_queue.resolve_parent_node_id(reply_id) if tree else None
)
if not branch_root_id:
cancel_fn = getattr(handler.platform, "cancel_pending_voice", None)
if cancel_fn is not None:
cancelled = await cancel_fn(incoming.chat_id, reply_id)
if cancelled is not None:
voice_msg_id, status_msg_id = cancelled
msg_ids_to_del: set[str] = {voice_msg_id, status_msg_id}
if incoming.message_id is not None:
msg_ids_to_del.add(str(incoming.message_id))
await _delete_message_ids(handler, incoming.chat_id, msg_ids_to_del)
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
handler.format_status("🗑", "Cleared.", "Voice note cancelled."),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
return
msg_id = await handler.platform.queue_send_message(
incoming.chat_id,
handler.format_status(
"🗑", "Cleared.", "Nothing to clear for that message."
),
fire_and_forget=False,
message_thread_id=incoming.message_thread_id,
)
handler.record_outgoing_message(
incoming.platform, incoming.chat_id, msg_id, "command"
)
return
await _handle_clear_branch(handler, incoming, branch_root_id)
return
# Global clear
# 1) Stop tasks first (ensures no more work is running).
await handler.stop_all_tasks()
# 2) Clear chat: best-effort delete messages we can identify.
msg_ids: set[str] = set()
# Add any recorded message IDs for this chat (commands, command replies, etc).
try:
for mid in handler.session_store.get_message_ids_for_chat(
incoming.platform, incoming.chat_id
):
if mid is not None:
msg_ids.add(str(mid))
except Exception as e:
logger.debug(f"Failed to read message log for /clear: {e}")
try:
msg_ids.update(
handler.tree_queue.get_message_ids_for_chat(
incoming.platform, incoming.chat_id
)
)
except Exception as e:
logger.warning(f"Failed to gather messages for /clear: {e}")
# Also delete the command message itself.
if incoming.message_id is not None:
msg_ids.add(str(incoming.message_id))
await _delete_message_ids(handler, incoming.chat_id, msg_ids)
# 3) Clear persistent state and reset in-memory queue/tree state.
try:
handler.session_store.clear_all()
except Exception as e:
logger.warning(f"Failed to clear session store: {e}")
handler.replace_tree_queue(
TreeQueueManager(
queue_update_callback=handler.update_queue_positions,
node_started_callback=handler.mark_node_processing,
)
)
|