Spaces:
Paused
Paused
File size: 10,002 Bytes
7d4338a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 | """Core compaction logic for the compaction plugin."""
import os
from datetime import datetime
import models as models_module
from agent import Agent
from helpers import tokens
from helpers.history import History, output_text
from helpers.persist_chat import (
export_json_chat,
get_chat_folder_path,
save_tmp_chat,
remove_msg_files,
)
from helpers.state_monitor_integration import mark_dirty_all
MIN_COMPACTION_TOKENS = 1000
from plugins._model_config.helpers.model_config import (
get_chat_model_config,
get_utility_model_config,
get_preset_by_name,
build_model_config,
build_chat_model,
build_utility_model,
)
def _save_pre_compaction_backup(context, full_text: str) -> dict[str, str]:
"""Save the original chat as JSON and plain text before compaction.
Returns dict with 'json' and 'txt' absolute file paths.
"""
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
backup_dir = os.path.join(get_chat_folder_path(context.id), "backups")
os.makedirs(backup_dir, exist_ok=True)
json_path = os.path.join(backup_dir, f"pre-compact-{timestamp}.json")
txt_path = os.path.join(backup_dir, f"pre-compact-{timestamp}.txt")
json_content = export_json_chat(context)
with open(json_path, "w", encoding="utf-8") as f:
f.write(json_content)
with open(txt_path, "w", encoding="utf-8") as f:
f.write(full_text)
return {"json": json_path, "txt": txt_path}
def _build_model(use_chat_model: bool, preset_name: str | None, agent):
"""Build the LLM model for compaction based on user selection.
If preset_name is given, builds from that preset's config.
Otherwise falls back to the agent's currently configured model.
"""
if preset_name:
preset = get_preset_by_name(preset_name)
if preset:
model_key = "chat" if use_chat_model else "utility"
cfg = preset.get(model_key, {})
if cfg.get("provider") or cfg.get("name"):
mc = build_model_config(cfg, models_module.ModelType.CHAT)
return cfg, models_module.get_chat_model(
mc.provider, mc.name, model_config=mc, **mc.build_kwargs()
)
if use_chat_model:
cfg = get_chat_model_config(agent)
return cfg, build_chat_model(agent)
else:
cfg = get_utility_model_config(agent)
return cfg, build_utility_model(agent)
async def run_compaction(
context,
use_chat_model: bool = True,
preset_name: str | None = None,
) -> None:
"""
Compact the chat history into a single summarized message.
This function:
1. Extracts the full conversation text
2. Estimates token count and checks against model context window
3. If needed, splits history and summarizes iteratively
4. Calls the LLM to generate a comprehensive summary
5. Replaces the history with a single AI message containing the summary
6. Resets the log and creates a response log item
7. Persists the changes
The function streams progress to the frontend via the log system.
If any error occurs, the original history is preserved.
"""
agent = context.agent0
try:
# Step 1: Extract full conversation text
history_output = agent.history.output()
full_text = output_text(history_output, ai_label="assistant", human_label="user")
if not full_text.strip():
raise ValueError("No conversation content to compact")
# Step 2: Estimate tokens, resolve model, and compute context budget
token_count = tokens.approximate_tokens(full_text)
resolved_cfg, model = _build_model(use_chat_model, preset_name, agent)
ctx_length = int(resolved_cfg.get("ctx_length", 128000)) if resolved_cfg else 128000
max_input_tokens = int(ctx_length * 0.7)
# Step 3: Create progress log item (count user-visible messages only)
visible_types = {"user", "response"}
visible_count = sum(1 for item in context.log.logs if item.type in visible_types)
log_item = context.log.log(
type="info",
heading="Compacting chat history...",
content=f"Analyzing {visible_count} messages (~{token_count} tokens)...",
)
# Step 4: Handle large histories by chunking if necessary
if token_count > max_input_tokens:
summary = await _compact_large_history(
agent, full_text, token_count, max_input_tokens, log_item, model
)
else:
summary = await _compact_single_pass(
agent, full_text, log_item, model
)
if not summary or not summary.strip():
raise ValueError("Compaction produced empty summary")
# Step 5: Save pre-compaction backup before destroying history
backup_paths = _save_pre_compaction_backup(context, full_text)
# Step 6: Replace history with compacted version
backup_note = (
f"\n\n---\n"
f"*Pre-compaction backup of the full original conversation:*\n"
f"- `{backup_paths['txt']}`"
)
compacted_content = f"## Context compacted\n\n{summary}{backup_note}"
agent.history = History(agent=agent)
agent.history.add_message(ai=True, content=compacted_content)
# Clear subordinate chain
agent.data.pop(Agent.DATA_NAME_SUBORDINATE, None)
context.streaming_agent = None
# Step 7: Reset log and create response
context.log.reset()
context.log.log(
type="response",
heading="Context compacted",
content=compacted_content,
update_progress="none",
)
# Step 8: Persist and notify
save_tmp_chat(context)
remove_msg_files(context.id)
# Step 9: Force progress bar to inactive state LAST
# This must happen after all log operations and persist
context.log.set_progress("Waiting for input", 0, False)
mark_dirty_all(reason="plugins.compaction.compact_chat")
except Exception as e:
# Log error but don't modify history
context.log.log(
type="error",
heading="Compaction Failed",
content=str(e),
)
mark_dirty_all(reason="plugins.compaction.compact_chat_error")
raise
async def _compact_single_pass(agent, full_text: str, log_item, model) -> str:
"""Compact history in a single LLM call using the provided model."""
system_prompt = agent.read_prompt("compact.sys.md")
user_prompt = agent.read_prompt("compact.msg.md", conversation=full_text)
async def stream_cb(chunk: str, total: str):
if chunk:
log_item.stream(content=chunk)
summary, _ = await model.unified_call(
system_message=system_prompt,
user_message=user_prompt,
response_callback=stream_cb,
)
return summary
async def _compact_large_history(
agent, full_text: str, token_count: int, max_input_tokens: int, log_item, model
) -> str:
"""Handle large histories by splitting into chunks and summarizing iteratively."""
log_item.update(
content=f"History is large (~{token_count} tokens). Splitting into chunks...",
)
lines = full_text.split('\n')
mid = len(lines) // 2
chunks = ['\n'.join(lines[:mid]), '\n'.join(lines[mid:])]
summaries = []
for i, chunk in enumerate(chunks, 1):
log_item.update(content=f"Summarizing part {i}/{len(chunks)}...")
system_prompt = agent.read_prompt("compact.sys.md")
user_prompt = agent.read_prompt("compact.msg.md", conversation=chunk)
chunk_summary, _ = await model.unified_call(
system_message=system_prompt,
user_message=user_prompt,
)
summaries.append(chunk_summary)
combined = "\n\n---\n\n".join(summaries)
log_item.update(content="Creating final summary from parts...")
final_prompt = agent.read_prompt("compact.sys.md")
final_user = agent.read_prompt(
"compact.msg.md",
conversation=f"This is a multi-part conversation. Here are summaries of each part:\n\n{combined}",
)
async def stream_cb(chunk: str, total: str):
if chunk:
log_item.stream(content=chunk)
final_summary, _ = await model.unified_call(
system_message=final_prompt,
user_message=final_user,
response_callback=stream_cb,
)
return final_summary
async def get_compaction_stats(context) -> dict:
"""
Get statistics about the current chat for the confirmation modal.
Returns:
dict with message_count, token_count, model_name
"""
agent = context.agent0
# Count user-visible conversation turns only
# 'user' = user sent a message, 'response' = agent final response
# Other types (agent, tool, code_exe, etc.) are intermediate processing steps
visible_types = {"user", "response"}
message_count = sum(
1 for item in context.log.logs
if item.type in visible_types
)
# Estimate tokens
history_output = agent.history.output()
full_text = output_text(history_output, ai_label="assistant", human_label="user")
token_count = tokens.approximate_tokens(full_text) if full_text else 0
# Get model names for both chat and utility
chat_cfg = get_chat_model_config(agent)
utility_cfg = get_utility_model_config(agent)
chat_model_name = chat_cfg.get("name", "Default") if chat_cfg else "Default"
utility_model_name = utility_cfg.get("name", "Default") if utility_cfg else "Default"
return {
"message_count": message_count,
"token_count": token_count,
"model_name": chat_model_name,
"chat_model_name": chat_model_name,
"utility_model_name": utility_model_name,
}
|