Duibonduil commited on
Commit
6eb9183
·
verified ·
1 Parent(s): efc2c71

Upload 6 files

Browse files
aworld/core/context/processor/base_compressor.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from abc import ABC, abstractmethod
3
+ from typing import Any, Dict
4
+
5
+ from aworld.config.conf import ModelConfig
6
+ from aworld.core.context.processor import CompressionResult
7
+
8
+
9
+ class BaseCompressor(ABC):
10
+ """Base compressor class"""
11
+
12
+ def __init__(self, config: Dict[str, Any] = None, llm_config: ModelConfig = None):
13
+ self.config = config or {}
14
+ self.llm_config = llm_config
15
+ @abstractmethod
16
+ def compress(self, content: str, metadata: Dict[str, Any] = None) -> CompressionResult:
17
+ """Compress content"""
18
+ pass
19
+
20
+ def _calculate_compression_ratio(self, original: str, compressed: str) -> float:
21
+ """Calculate compression ratio"""
22
+ if len(original) == 0:
23
+ return 1.0
24
+ return len(compressed) / len(original)
aworld/core/context/processor/chunk_utils.py ADDED
@@ -0,0 +1,433 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import time
3
+ from dataclasses import dataclass
4
+ from enum import Enum
5
+ from typing import List, Dict, Any, Union
6
+
7
+ from aworld.core.context.processor import MessageChunk, ChunkResult, MessageType
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+
12
+ class ChunkUtils:
13
+
14
+ def __init__(self,
15
+ enable_chunking: bool = False,
16
+ preserve_order: bool = True,
17
+ merge_consecutive: bool = True,
18
+ max_chunk_size: int = None,
19
+ split_by_tool_name: bool = False):
20
+
21
+ # Chunker configuration
22
+ self.enable_chunking = enable_chunking
23
+ self.preserve_order = preserve_order
24
+ self.merge_consecutive = merge_consecutive
25
+ self.max_chunk_size = max_chunk_size
26
+ self.split_by_tool_name = split_by_tool_name
27
+
28
+ # Statistics
29
+ self.stats = {
30
+ # Chunking statistics
31
+ "chunking": {
32
+ "total_processed": 0,
33
+ "total_chunks_created": 0,
34
+ "processing_time": 0.0
35
+ }
36
+ }
37
+
38
+ def _process_chunking(self, messages: List[Dict[str, Any]], **kwargs) -> List[Dict[str, Any]]:
39
+ """Process chunking logic"""
40
+ # First chunk
41
+ chunk_result = self.split_messages(messages, kwargs.get('metadata', {}))
42
+
43
+ # Then merge back to message list
44
+ merged_messages = self.merge_chunks(chunk_result.chunks,
45
+ kwargs.get('preserve_type_order', True))
46
+
47
+ return merged_messages
48
+
49
+ def classify_message(self, message: Dict[str, Any]) -> MessageType:
50
+ """
51
+ Classify a single message
52
+
53
+ Args:
54
+ message: OpenAI format message
55
+
56
+ Returns:
57
+ Message type
58
+ """
59
+ role = message.get("role", "")
60
+
61
+ if role in ["system", "user", "assistant"]:
62
+ return MessageType.TEXT
63
+ elif role == "tool":
64
+ return MessageType.TOOL
65
+ else:
66
+ logger.warning(f"Unknown message role: {role}")
67
+ return MessageType.UNKNOWN
68
+
69
+ def split_messages(self,
70
+ messages: List[Dict[str, Any]],
71
+ metadata: Dict[str, Any] = None) -> ChunkResult:
72
+ """
73
+ Split message list into chunks by type, and merge messages of the same type into strings
74
+
75
+ Args:
76
+ messages: OpenAI format message list
77
+ metadata: Metadata
78
+
79
+ Returns:
80
+ Chunking result
81
+ """
82
+ start_time = time.time()
83
+
84
+ if not messages:
85
+ return ChunkResult(
86
+ chunks=[],
87
+ total_messages=0,
88
+ processing_time=0.0,
89
+ metadata=metadata or {}
90
+ )
91
+
92
+ chunks = []
93
+ current_chunk_type = None
94
+ current_chunk_messages = []
95
+
96
+ for i, message in enumerate(messages):
97
+ msg_type = self.classify_message(message)
98
+
99
+ # If it's a new type or not merging consecutive messages
100
+ if (current_chunk_type != msg_type or
101
+ not self.merge_consecutive):
102
+
103
+ # Save current chunk (if has content)
104
+ if current_chunk_messages:
105
+ chunk_metadata = (metadata or {}).copy()
106
+ chunk_metadata.update({
107
+ "chunk_index": len(chunks),
108
+ "start_message_index": i - len(current_chunk_messages),
109
+ "end_message_index": i - 1,
110
+ "message_count": len(current_chunk_messages),
111
+ "original_messages": current_chunk_messages.copy()
112
+ })
113
+
114
+ # Merge messages into strings based on message type
115
+ if current_chunk_type == MessageType.TEXT:
116
+ merged_content = self._messages_to_string(current_chunk_messages)
117
+ merged_message = {
118
+ "role": "merged_text",
119
+ "content": merged_content,
120
+ "original_count": len(current_chunk_messages)
121
+ }
122
+ chunk_messages = [merged_message]
123
+ elif current_chunk_type == MessageType.TOOL:
124
+ merged_content = self._tool_messages_to_string(current_chunk_messages)
125
+ merged_message = {
126
+ "role": "merged_tool",
127
+ "content": merged_content,
128
+ "original_count": len(current_chunk_messages)
129
+ }
130
+ chunk_messages = [merged_message]
131
+ else:
132
+ # Unknown type keeps as is
133
+ chunk_messages = current_chunk_messages.copy()
134
+
135
+ chunks.append(MessageChunk(
136
+ message_type=current_chunk_type,
137
+ messages=chunk_messages,
138
+ metadata=chunk_metadata
139
+ ))
140
+
141
+ # Start new chunk
142
+ current_chunk_type = msg_type
143
+ current_chunk_messages = [message]
144
+ else:
145
+ # Add to current chunk
146
+ current_chunk_messages.append(message)
147
+
148
+ # Process the last chunk
149
+ if current_chunk_messages:
150
+ chunk_metadata = (metadata or {}).copy()
151
+ chunk_metadata.update({
152
+ "chunk_index": len(chunks),
153
+ "start_message_index": len(messages) - len(current_chunk_messages),
154
+ "end_message_index": len(messages) - 1,
155
+ "message_count": len(current_chunk_messages),
156
+ "original_messages": current_chunk_messages.copy()
157
+ })
158
+
159
+ # Merge messages into strings based on message type
160
+ if current_chunk_type == MessageType.TEXT:
161
+ merged_content = self._messages_to_string(current_chunk_messages)
162
+ merged_message = {
163
+ "role": "merged_text",
164
+ "content": merged_content,
165
+ "original_count": len(current_chunk_messages)
166
+ }
167
+ chunk_messages = [merged_message]
168
+ elif current_chunk_type == MessageType.TOOL:
169
+ merged_content = self._tool_messages_to_string(current_chunk_messages)
170
+ merged_message = {
171
+ "role": "merged_tool",
172
+ "content": merged_content,
173
+ "original_count": len(current_chunk_messages)
174
+ }
175
+ chunk_messages = [merged_message]
176
+ else:
177
+ chunk_messages = current_chunk_messages.copy()
178
+
179
+ chunks.append(MessageChunk(
180
+ message_type=current_chunk_type,
181
+ messages=chunk_messages,
182
+ metadata=chunk_metadata
183
+ ))
184
+
185
+ processing_time = time.time() - start_time
186
+
187
+ # Update statistics
188
+ self.stats["chunking"]["total_processed"] += len(messages)
189
+ self.stats["chunking"]["total_chunks_created"] += len(chunks)
190
+ self.stats["chunking"]["processing_time"] += processing_time
191
+
192
+ # Build result metadata
193
+ result_metadata = (metadata or {}).copy()
194
+ result_metadata.update({
195
+ "chunk_count": len(chunks),
196
+ "text_chunks": sum(1 for chunk in chunks if chunk.message_type == MessageType.TEXT),
197
+ "tool_chunks": sum(1 for chunk in chunks if chunk.message_type == MessageType.TOOL),
198
+ "unknown_chunks": sum(1 for chunk in chunks if chunk.message_type == MessageType.UNKNOWN),
199
+ "preserve_order": self.preserve_order,
200
+ "merge_consecutive": self.merge_consecutive,
201
+ "processing_time": processing_time,
202
+ "string_merge_applied": True
203
+ })
204
+
205
+ logger.debug(f"Message splitting completed: {len(messages)} messages -> {len(chunks)} chunks (string merge applied)")
206
+
207
+ return ChunkResult(
208
+ chunks=chunks,
209
+ total_messages=len(messages),
210
+ processing_time=processing_time,
211
+ metadata=result_metadata
212
+ )
213
+
214
+ def merge_chunks(self,
215
+ chunks: List[MessageChunk],
216
+ preserve_type_order: bool = True) -> List[Dict[str, Any]]:
217
+ """
218
+ Merge processed chunks back to message list, and split string format messages back to multiple messages
219
+
220
+ Args:
221
+ chunks: Message chunk list
222
+ preserve_type_order: Whether to preserve type order
223
+
224
+ Returns:
225
+ Merged message list
226
+ """
227
+ if not chunks:
228
+ return []
229
+
230
+ if preserve_type_order and self.preserve_order:
231
+ # Merge in original order
232
+ sorted_chunks = sorted(chunks, key=lambda x: x.metadata.get("chunk_index", 0))
233
+ else:
234
+ # Merge by type groups (text first, then tools)
235
+ text_chunks = [chunk for chunk in chunks if chunk.message_type == MessageType.TEXT]
236
+ tool_chunks = [chunk for chunk in chunks if chunk.message_type == MessageType.TOOL]
237
+ unknown_chunks = [chunk for chunk in chunks if chunk.message_type == MessageType.UNKNOWN]
238
+ sorted_chunks = text_chunks + tool_chunks + unknown_chunks
239
+
240
+ merged_messages = []
241
+ for chunk in sorted_chunks:
242
+ chunk_messages = []
243
+
244
+ for message in chunk.messages:
245
+ # Check if it's a merged message that needs splitting
246
+ if message.get("role") == "merged_text":
247
+ # This is a merged TEXT type message that needs splitting
248
+ merged_content = message.get("content", "")
249
+ original_messages = chunk.metadata.get("original_messages", [])
250
+
251
+ if original_messages:
252
+ split_messages = self._string_to_messages(merged_content, original_messages)
253
+ chunk_messages.extend(split_messages)
254
+ else:
255
+ split_messages = self._string_to_messages(merged_content, [])
256
+ chunk_messages.extend(split_messages)
257
+
258
+ elif message.get("role") == "merged_tool":
259
+ # This is a merged TOOL type message that needs splitting
260
+ merged_content = message.get("content", "")
261
+ original_messages = chunk.metadata.get("original_messages", [])
262
+
263
+ if original_messages:
264
+ split_messages = self._string_to_tool_messages(merged_content, original_messages)
265
+ chunk_messages.extend(split_messages)
266
+ else:
267
+ split_messages = self._string_to_tool_messages(merged_content, "")
268
+ chunk_messages.extend(split_messages)
269
+
270
+ else:
271
+ # Regular message added directly
272
+ chunk_messages.append(message)
273
+
274
+ merged_messages.extend(chunk_messages)
275
+
276
+ return merged_messages
277
+
278
+ # Message conversion methods
279
+ @staticmethod
280
+ def _messages_to_string(messages: List[Dict[str, str]]) -> str:
281
+ """Convert OpenAI message format to string"""
282
+ content_parts = []
283
+ for msg in messages:
284
+ role = msg.get('role', 'user')
285
+ content = msg.get('content', '')
286
+ content_parts.append(f"[{role.upper()}]: {content}")
287
+ return "\n".join(content_parts)
288
+
289
+ @staticmethod
290
+ def _string_to_messages(content: str, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
291
+ """Convert string to OpenAI message format"""
292
+ # Restore all tool_calls
293
+ tool_calls = []
294
+ if messages:
295
+ for msg in messages:
296
+ if msg.get("role") == "assistant" and msg.get("tool_calls") is not None:
297
+ tool_calls += msg["tool_calls"]
298
+
299
+ result_messages = []
300
+ lines = content.split('\n')
301
+ current_role = 'user'
302
+ current_content = []
303
+
304
+ for line in lines:
305
+ line = line.strip()
306
+ if line.startswith('[') and ']:' in line:
307
+ # Save previous message
308
+ if current_content:
309
+ result_messages.append({
310
+ 'role': current_role,
311
+ 'content': '\n'.join(current_content).strip()
312
+ })
313
+ current_content = []
314
+
315
+ # Parse new role
316
+ role_end = line.find(']:')
317
+ role = line[1:role_end].lower()
318
+ if role in ['system', 'user', 'assistant']:
319
+ current_role = role
320
+ content_part = line[role_end + 2:].strip()
321
+ if content_part:
322
+ current_content.append(content_part)
323
+ else:
324
+ current_content.append(line)
325
+ else:
326
+ current_content.append(line)
327
+
328
+ # Save last message
329
+ if current_content:
330
+ result_messages.append({
331
+ 'role': current_role,
332
+ 'content': '\n'.join(current_content).strip(),
333
+ })
334
+
335
+ final_messages = result_messages if result_messages else [{'role': 'user', 'content': content}]
336
+
337
+ # Add tool_calls results
338
+ if tool_calls and len(tool_calls) > 0:
339
+ tool_call_chunk = {
340
+ 'role': 'assistant',
341
+ 'content': None,
342
+ 'tool_calls': tool_calls
343
+ }
344
+ final_messages.append(tool_call_chunk)
345
+
346
+ return final_messages
347
+
348
+ def _tool_messages_to_string(self, messages: List[Dict[str, str]]) -> str:
349
+ """Convert tool message format to string"""
350
+ content_parts = []
351
+ for msg in messages:
352
+ role = msg.get('role', 'tool')
353
+ content = msg.get('content', '')
354
+ tool_call_id = msg.get('tool_call_id', '')
355
+ name = msg.get('name', '')
356
+
357
+ if role == 'tool':
358
+ header = f"[TOOL:{name}:{tool_call_id}]"
359
+ else:
360
+ header = f"[{role.upper()}]"
361
+
362
+ content_parts.append(f"{header}: {content}")
363
+ return "\n".join(content_parts)
364
+
365
+ def _string_to_tool_messages(self, content: str, original_prompt: Union[str, List[Dict[str, str]]]) -> List[Dict[str, str]]:
366
+ """Convert string to tool message format"""
367
+ messages = []
368
+ lines = content.split('\n')
369
+ current_role = 'tool'
370
+ current_content = []
371
+ current_tool_call_id = ''
372
+ current_name = ''
373
+
374
+ for line in lines:
375
+ line = line.strip()
376
+ if line.startswith('[') and ']:' in line:
377
+ # Save previous message
378
+ if current_content:
379
+ msg = {
380
+ 'role': current_role,
381
+ 'content': '\n'.join(current_content).strip()
382
+ }
383
+ if current_role == 'tool':
384
+ if current_tool_call_id:
385
+ msg['tool_call_id'] = current_tool_call_id
386
+ if current_name:
387
+ msg['name'] = current_name
388
+ messages.append(msg)
389
+ current_content = []
390
+
391
+ # Parse new role and tool information
392
+ role_end = line.find(']:')
393
+ role_part = line[1:role_end]
394
+ content_part = line[role_end + 2:].strip()
395
+
396
+ if role_part.startswith('TOOL:'):
397
+ # Parse tool message format: [TOOL:name:tool_call_id]
398
+ current_role = 'tool'
399
+ tool_parts = role_part.split(':')
400
+ if len(tool_parts) >= 2:
401
+ current_name = tool_parts[1]
402
+ if len(tool_parts) >= 3:
403
+ current_tool_call_id = tool_parts[2]
404
+ else:
405
+ current_role = role_part.lower()
406
+ current_tool_call_id = ''
407
+ current_name = ''
408
+
409
+ if content_part:
410
+ current_content.append(content_part)
411
+ else:
412
+ current_content.append(line)
413
+
414
+ # Save last message
415
+ if current_content:
416
+ msg = {
417
+ 'role': current_role,
418
+ 'content': '\n'.join(current_content).strip()
419
+ }
420
+ if current_role == 'tool':
421
+ if current_tool_call_id:
422
+ msg['tool_call_id'] = current_tool_call_id
423
+ if current_name:
424
+ msg['name'] = current_name
425
+ messages.append(msg)
426
+
427
+ # If no messages parsed, return original format
428
+ if not messages and isinstance(original_prompt, list):
429
+ return original_prompt
430
+ elif not messages:
431
+ return [{'role': 'tool', 'content': content}]
432
+
433
+ return messages
aworld/core/context/processor/llm_compressor.py ADDED
@@ -0,0 +1,113 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import logging
3
+ import re
4
+ from abc import ABC, abstractmethod
5
+ import traceback
6
+ from typing import Any, Dict, List
7
+
8
+ from aworld.config.conf import ModelConfig
9
+ from aworld.core.context.processor import CompressionResult, CompressionType
10
+ from aworld.core.context.processor.base_compressor import BaseCompressor
11
+ from aworld.models.llm import get_llm_model
12
+ from aworld.config import ConfigDict
13
+
14
+ logger = logging.getLogger(__name__)
15
+
16
+ class LLMCompressor(BaseCompressor):
17
+ """LLM-based prompt compressor"""
18
+
19
+ def __init__(self, config: Dict[str, Any] = None, llm_config: ModelConfig = None):
20
+ super().__init__(config, llm_config)
21
+ self.compression_prompt = self.config.get("compression_prompt", self._default_compression_prompt())
22
+ # Lazy import to avoid circular dependencies
23
+ self._llm_client = self._create_llm_client(llm_config)
24
+
25
+ @staticmethod
26
+ def _remove_think_blocks(content: str) -> str:
27
+ """Remove <think>...</think> blocks from content"""
28
+ # Use regex to remove all <think>...</think> blocks (case insensitive, multiline)
29
+ pattern = r'<think>.*?</think>'
30
+ cleaned_content = re.sub(pattern, '', content, flags=re.IGNORECASE | re.DOTALL)
31
+ return cleaned_content
32
+
33
+ def _create_llm_client(self, llm_config: ModelConfig):
34
+ if llm_config is None:
35
+ return None
36
+ config = ConfigDict(llm_config.model_dump())
37
+ return get_llm_model(config)
38
+
39
+ def _default_compression_prompt(self) -> str:
40
+ """Default compression prompt"""
41
+ return """## Task
42
+ You are a text compression expert. Please intelligently compress the following text, retaining core information and key content while removing redundancy and unimportant parts.
43
+
44
+ ## Compression Requirements
45
+ 1. Keep the position and count of [SYSTEM], [USER], [ASSISTANT], and [TOOL] tags unchanged in the output
46
+ 2. Maintain the main meaning and logical structure of the original text, retain key information and important details, use more concise expressions
47
+ 3. Remove repetitive, redundant statements, ensure the compressed text remains coherent and readable
48
+
49
+ ## Original Text:
50
+ {content}
51
+
52
+ Please output the compressed text:"""
53
+
54
+ def compress(self, content: str) -> CompressionResult:
55
+ """Compress content using LLM"""
56
+ original_content = content
57
+
58
+ # Get LLM client
59
+ llm_client = self._llm_client
60
+ if llm_client is None:
61
+ logger.warning("LLM client unavailable, returning original content")
62
+ return CompressionResult(
63
+ original_content=original_content,
64
+ compressed_content=content,
65
+ compression_ratio=1.0,
66
+ metadata={"error": "LLM client unavailable"},
67
+ compression_type=CompressionType.LLM_BASED
68
+ )
69
+
70
+ try:
71
+ # Build prompt
72
+ prompt = self.compression_prompt.format(content=content)
73
+ messages = [{"role": "user", "content": prompt}]
74
+
75
+ # Call LLM
76
+ response = llm_client.completion(
77
+ messages=messages,
78
+ temperature=0.3
79
+ )
80
+
81
+ # Remove <think>...</think> blocks first, then strip whitespace
82
+ compressed_content = self._remove_think_blocks(response.content).strip()
83
+ compression_ratio = self._calculate_compression_ratio(original_content, compressed_content)
84
+
85
+ return CompressionResult(
86
+ original_content=original_content,
87
+ compressed_content=compressed_content,
88
+ compression_ratio=compression_ratio,
89
+ metadata={
90
+ "prompt_tokens": getattr(response.usage, 'prompt_tokens', 0),
91
+ "completion_tokens": getattr(response.usage, 'completion_tokens', 0),
92
+ },
93
+ compression_type=CompressionType.LLM_BASED
94
+ )
95
+
96
+ except Exception as e:
97
+ logger.error(f"LLM compression failed: {traceback.format_exc()}")
98
+ return CompressionResult(
99
+ original_content=original_content,
100
+ compressed_content=content,
101
+ compression_ratio=1.0,
102
+ metadata={"error": str(e)},
103
+ compression_type=CompressionType.LLM_BASED
104
+ )
105
+
106
+ def compress_batch(self, contents: List[str]) -> List[CompressionResult]:
107
+ """Compress multiple contents in batch"""
108
+ results = []
109
+ for content in contents:
110
+ result = self.compress(content)
111
+ results.append(result)
112
+ return results
113
+
aworld/core/context/processor/llmlingua_compressor.py ADDED
@@ -0,0 +1,295 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ import logging
3
+ from typing import Any, Dict, List, Optional, Pattern, Tuple
4
+
5
+ from aworld.config.conf import ModelConfig
6
+ from aworld.core.context.processor import CompressionResult, CompressionType
7
+ from aworld.core.context.processor.base_compressor import BaseCompressor
8
+
9
+ logger = logging.getLogger(__name__)
10
+
11
+ DEFAULT_LLM_LINGUA_INSTRUCTION = (
12
+ "Given this conversation messages, please compress them while preserving key information"
13
+ )
14
+
15
+
16
+ class LLMLinguaCompressor(BaseCompressor):
17
+ """
18
+ Compress messages using LLMLingua Project.
19
+
20
+ https://github.com/microsoft/LLMLingua
21
+ """
22
+
23
+ # Pattern to match ref tags at the beginning or end of the string,
24
+ # allowing for malformed tags
25
+ _pattern_beginning: Pattern = re.compile(r"\A(?:<#)?(?:ref)?(\d+)(?:#>?)?")
26
+ _pattern_ending: Pattern = re.compile(r"(?:<#)?(?:ref)?(\d+)(?:#>?)?\Z")
27
+
28
+ def __init__(self, config: Dict[str, Any] = None, llm_config: ModelConfig = None):
29
+ super().__init__(config, llm_config)
30
+
31
+ # LLMLingua specific configuration
32
+ self.model_name = self.config.get("model_name", "NousResearch/Llama-2-7b-hf")
33
+ self.device_map = self.config.get("device_map", "cuda")
34
+ self.target_token = self.config.get("target_token", 300)
35
+ self.rank_method = self.config.get("rank_method", "longllmlingua")
36
+ self.model_configuration = self.config.get("model_configuration", {})
37
+ self.open_api_config = self.config.get("open_api_config", {})
38
+ self.instruction = self.config.get("instruction", DEFAULT_LLM_LINGUA_INSTRUCTION)
39
+ self.additional_compress_kwargs = self.config.get("additional_compress_kwargs", {
40
+ "condition_compare": True,
41
+ "condition_in_question": "after",
42
+ "context_budget": "+100",
43
+ "reorder_context": "sort",
44
+ "dynamic_context_compression_ratio": 0.4,
45
+ })
46
+
47
+ self.lingua = None
48
+ self._initialize_lingua()
49
+
50
+ def _initialize_lingua(self):
51
+ """Initialize LLMLingua PromptCompressor"""
52
+ try:
53
+ from llmlingua import PromptCompressor
54
+
55
+ self.lingua = PromptCompressor(
56
+ model_name=self.model_name,
57
+ device_map=self.device_map,
58
+ model_config=self.model_configuration,
59
+ open_api_config=self.open_api_config,
60
+ )
61
+ logger.info(f"LLMLingua compressor initialized with model: {self.model_name}")
62
+
63
+ except ImportError:
64
+ logger.error(
65
+ "Could not import llmlingua python package. "
66
+ "Please install it with `pip install llmlingua`."
67
+ )
68
+ self.lingua = None
69
+ except Exception as e:
70
+ logger.error(f"Failed to initialize LLMLingua compressor: {e}")
71
+ self.lingua = None
72
+
73
+ @staticmethod
74
+ def _format_messages(messages: List[Dict[str, Any]]) -> List[str]:
75
+ """
76
+ Format messages by including special ref tags for tracking after compression
77
+ """
78
+ formatted_messages = []
79
+ for i, message in enumerate(messages):
80
+ role = message.get("role", "unknown")
81
+ content = message.get("content", "").replace("\n\n", "\n")
82
+
83
+ # Format as [ROLE] content with ref tags
84
+ message_string = f"\n\n<#ref{i}#> [{role.upper()}] {content} <#ref{i}#>\n\n"
85
+ formatted_messages.append(message_string)
86
+ return formatted_messages
87
+
88
+ def extract_ref_id_tuples_and_clean(self, contents: List[str]) -> List[Tuple[str, int]]:
89
+ """
90
+ Extracts reference IDs from the contents and cleans up the ref tags.
91
+
92
+ Args:
93
+ contents: A list of contents to be processed.
94
+
95
+ Returns:
96
+ List of tuples containing (cleaned_string, ref_id)
97
+ """
98
+ ref_id_tuples = []
99
+ for content in contents:
100
+ clean_string = content.strip()
101
+ if not clean_string:
102
+ continue
103
+
104
+ # Search for ref tags at the beginning and the end of the string
105
+ ref_id = None
106
+ for pattern in [self._pattern_beginning, self._pattern_ending]:
107
+ match = pattern.search(clean_string)
108
+ if match:
109
+ ref_id = match.group(1)
110
+ clean_string = pattern.sub("", clean_string).strip()
111
+
112
+ # Convert ref ID to int or use -1 if not found
113
+ ref_id_to_use = int(ref_id) if ref_id and ref_id.isdigit() else -1
114
+ ref_id_tuples.append((clean_string, ref_id_to_use))
115
+
116
+ return ref_id_tuples
117
+
118
+ def _parse_compressed_content_to_messages(self, compressed_content: str, original_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
119
+ """
120
+ Parse compressed content back to message format
121
+ """
122
+ # Split by double newlines and filter empty strings
123
+ compressed_parts = [part.strip() for part in compressed_content.split("\n\n") if part.strip()]
124
+
125
+ extracted_metadata = self.extract_ref_id_tuples_and_clean(compressed_parts)
126
+
127
+ compressed_messages = []
128
+ for content, index in extracted_metadata:
129
+ if not content:
130
+ continue
131
+
132
+ # Parse role from content if present
133
+ role_match = re.match(r'\[(\w+)\]\s*(.*)', content)
134
+ if role_match:
135
+ role = role_match.group(1).lower()
136
+ message_content = role_match.group(2).strip()
137
+ else:
138
+ # Fallback to original message role if available
139
+ role = "assistant" # Default role
140
+ message_content = content
141
+ if index != -1 and index < len(original_messages):
142
+ role = original_messages[index].get("role", "assistant")
143
+
144
+ compressed_messages.append({
145
+ "role": role,
146
+ "content": message_content
147
+ })
148
+
149
+ return compressed_messages
150
+
151
+ def compress(self, content: str) -> CompressionResult:
152
+ """
153
+ Compress content using LLMLingua
154
+
155
+ Note: This method expects content to be a JSON string representation of messages
156
+ or will treat it as a single message.
157
+ """
158
+ original_content = content
159
+
160
+ if self.lingua is None:
161
+ logger.warning("LLMLingua compressor unavailable, returning original content")
162
+ return CompressionResult(
163
+ original_content=original_content,
164
+ compressed_content=content,
165
+ compression_ratio=1.0,
166
+ metadata={"error": "LLMLingua compressor unavailable"},
167
+ compression_type=CompressionType.LLMLINGUA
168
+ )
169
+
170
+ try:
171
+ # Try to parse as messages format first
172
+ import json
173
+ try:
174
+ messages = json.loads(content)
175
+ if isinstance(messages, list) and all(isinstance(msg, dict) for msg in messages):
176
+ return self.compress_messages(messages)
177
+ except (json.JSONDecodeError, TypeError):
178
+ pass
179
+
180
+ # Treat as plain text
181
+ formatted_content = [f"\n\n<#ref0#> {content} <#ref0#>\n\n"]
182
+
183
+ compressed_prompt = self.lingua.compress_prompt(
184
+ context=formatted_content,
185
+ instruction=self.instruction,
186
+ question="", # No specific question for plain text
187
+ target_token=self.target_token,
188
+ rank_method=self.rank_method,
189
+ concate_question=False,
190
+ add_instruction=False,
191
+ **self.additional_compress_kwargs,
192
+ )
193
+
194
+ compressed_content = compressed_prompt["compressed_prompt"]
195
+ compression_ratio = self._calculate_compression_ratio(original_content, compressed_content)
196
+
197
+ return CompressionResult(
198
+ original_content=original_content,
199
+ compressed_content=compressed_content,
200
+ compression_ratio=compression_ratio,
201
+ metadata={
202
+ "origin_tokens": compressed_prompt.get("origin_tokens", 0),
203
+ "compressed_tokens": compressed_prompt.get("compressed_tokens", 0),
204
+ "ratio": compressed_prompt.get("ratio", "unknown"),
205
+ },
206
+ compression_type=CompressionType.LLMLINGUA
207
+ )
208
+
209
+ except Exception as e:
210
+ logger.error(f"LLMLingua compression failed: {e}")
211
+ return CompressionResult(
212
+ original_content=original_content,
213
+ compressed_content=content,
214
+ compression_ratio=1.0,
215
+ metadata={"error": str(e)},
216
+ compression_type=CompressionType.LLMLINGUA
217
+ )
218
+
219
+ def compress_messages(self, messages: List[Dict[str, Any]]) -> CompressionResult:
220
+ """
221
+ Compress a list of messages using LLMLingua
222
+ """
223
+ if not messages:
224
+ return CompressionResult(
225
+ original_content="[]",
226
+ compressed_content="[]",
227
+ compression_ratio=1.0,
228
+ metadata={},
229
+ compression_type=CompressionType.LLMLINGUA
230
+ )
231
+
232
+ original_content = str(messages)
233
+
234
+ if self.lingua is None:
235
+ logger.warning("LLMLingua compressor unavailable, returning original messages")
236
+ return CompressionResult(
237
+ original_content=original_content,
238
+ compressed_content=original_content,
239
+ compression_ratio=1.0,
240
+ metadata={"error": "LLMLingua compressor unavailable"},
241
+ compression_type=CompressionType.LLMLINGUA
242
+ )
243
+
244
+ try:
245
+ formatted_messages = self._format_messages(messages)
246
+
247
+ compressed_prompt = self.lingua.compress_prompt(
248
+ context=formatted_messages,
249
+ instruction=self.instruction,
250
+ question="", # No specific question for conversation compression
251
+ target_token=self.target_token,
252
+ rank_method=self.rank_method,
253
+ concate_question=False,
254
+ add_instruction=False,
255
+ **self.additional_compress_kwargs,
256
+ )
257
+
258
+ # Parse compressed content back to messages
259
+ compressed_messages = self._parse_compressed_content_to_messages(
260
+ compressed_prompt["compressed_prompt"], messages
261
+ )
262
+
263
+ compressed_content = str(compressed_messages)
264
+ compression_ratio = self._calculate_compression_ratio(original_content, compressed_content)
265
+
266
+ return CompressionResult(
267
+ original_content=original_content,
268
+ compressed_content=compressed_content,
269
+ compression_ratio=compression_ratio,
270
+ metadata={
271
+ "origin_tokens": compressed_prompt.get("origin_tokens", 0),
272
+ "compressed_tokens": compressed_prompt.get("compressed_tokens", 0),
273
+ "ratio": compressed_prompt.get("ratio", "unknown"),
274
+ "compressed_messages": compressed_messages,
275
+ },
276
+ compression_type=CompressionType.LLMLINGUA
277
+ )
278
+
279
+ except Exception as e:
280
+ logger.error(f"LLMLingua message compression failed: {e}")
281
+ return CompressionResult(
282
+ original_content=original_content,
283
+ compressed_content=original_content,
284
+ compression_ratio=1.0,
285
+ metadata={"error": str(e)},
286
+ compression_type=CompressionType.LLMLINGUA
287
+ )
288
+
289
+ def compress_batch(self, contents: List[str]) -> List[CompressionResult]:
290
+ """Compress multiple contents in batch"""
291
+ results = []
292
+ for content in contents:
293
+ result = self.compress(content)
294
+ results.append(result)
295
+ return results
aworld/core/context/processor/prompt_processor.py ADDED
@@ -0,0 +1,455 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coding: utf-8
2
+ # Copyright (c) 2025 inclusionAI.
3
+
4
+ import time
5
+ from dataclasses import dataclass
6
+ import traceback
7
+ from typing import Dict, Any, List
8
+
9
+ from aworld.core.context.base import Context, AgentContext
10
+ from aworld.core.context.processor import CompressionDecision, ContextProcessingResult, MessagesProcessingResult
11
+ from aworld.core.context.processor.llm_compressor import LLMCompressor, CompressionType
12
+ from aworld.core.context.processor.llmlingua_compressor import LLMLinguaCompressor
13
+ from aworld.core.context.processor.truncate_compressor import TruncateCompressor
14
+ from aworld.core.context.processor.chunk_utils import ChunkUtils, MessageChunk, MessageType
15
+ from aworld.logs.util import Color, color_log, logger
16
+ from aworld.models.utils import num_tokens_from_messages, truncate_tokens_from_messages
17
+ from aworld.config.conf import AgentConfig, ConfigDict, ContextRuleConfig, ModelConfig, OptimizationConfig, LlmCompressionConfig
18
+
19
+ class PromptProcessor:
20
+ """Agent context processor, processes context according to context_rule configuration"""
21
+
22
+ def __init__(self, agent_context: AgentContext):
23
+ self.context_rule = agent_context.context_rule
24
+ self.agent_context = agent_context
25
+ self.compress_pipeline = None
26
+ self.llmlingua_compressor = None
27
+ self.truncate_compressor = None
28
+ self.chunk_pipeline = None
29
+ self._init_pipelines()
30
+
31
+ def _init_pipelines(self):
32
+ """Initialize processing pipelines"""
33
+ # Initialize truncate compressor
34
+ self.truncate_compressor = TruncateCompressor(
35
+ config={},
36
+ llm_config=self.agent_context.model_config
37
+ )
38
+
39
+ if self.context_rule and self.context_rule.llm_compression_config and self.context_rule.llm_compression_config.enabled:
40
+ # Initialize message splitting and compression pipeline
41
+ self.chunk_pipeline = ChunkUtils(
42
+ enable_chunking=True,
43
+ preserve_order=True,
44
+ merge_consecutive=True,
45
+ )
46
+
47
+ # Initialize compression pipeline based on compress_type configuration
48
+ compress_type = self.context_rule.llm_compression_config.compress_type
49
+
50
+ if compress_type == 'llmlingua':
51
+ # Initialize LLMLingua compressor
52
+ self.llmlingua_compressor = LLMLinguaCompressor(
53
+ config=getattr(self.context_rule.llm_compression_config, 'llmlingua_config', {}),
54
+ llm_config=self.agent_context.context_rule.llm_compression_config.compress_model,
55
+ )
56
+ else:
57
+ # Default to LLM-based compression
58
+ self.compress_pipeline = LLMCompressor(
59
+ config=getattr(self.context_rule.llm_compression_config, 'llm_config', {}),
60
+ llm_config=self.agent_context.context_rule.llm_compression_config.compress_model,
61
+ )
62
+
63
+ def _get_compression_type(self) -> CompressionType:
64
+ """Get the compression type based on configuration"""
65
+ if (not self.context_rule or
66
+ not self.context_rule.llm_compression_config or
67
+ not self.context_rule.llm_compression_config.enabled):
68
+ return CompressionType.LLM_BASED
69
+
70
+ compress_type = self.context_rule.llm_compression_config.compress_type
71
+ if compress_type == 'llmlingua':
72
+ return CompressionType.LLMLINGUA
73
+ else:
74
+ return CompressionType.LLM_BASED
75
+
76
+ def get_max_tokens(self):
77
+ return self.agent_context.context_usage.total_context_length * self.context_rule.optimization_config.max_token_budget_ratio
78
+
79
+ def is_out_of_context(self, messages: List[Dict[str, Any]],
80
+ is_last_message_in_memory: bool) -> bool:
81
+ return self._count_tokens_from_messages(messages) > self.get_max_tokens()
82
+ # Calculate based on historical message length to determine if threshold is reached, this is a rough statistic
83
+ # current_usage = self.agent_context.context_usage
84
+ # real_used = current_usage.used_context_length
85
+ # if not is_last_message_in_memory:
86
+ # real_used += self._count_tokens_from_message(messages[-1])
87
+ # return real_used > self.get_max_tokens()
88
+
89
+ def _count_tokens_from_messages(self, messages: List[Dict[str, Any]]) -> int:
90
+ """Calculate token count for messages using utils.py method"""
91
+ return num_tokens_from_messages(messages, model=self.agent_context.model_config.model_type)
92
+
93
+ def _count_tokens_from_message(self, msg: Dict[str, Any]) -> int:
94
+ """Calculate token count for single message using utils.py method"""
95
+ # Convert single message to list format for num_tokens_from_messages
96
+ return num_tokens_from_messages([msg], model=self.agent_context.model_config.model_type)
97
+
98
+ def _count_chunk_tokens(self, chunk: MessageChunk) -> int:
99
+ """Calculate token count for a chunk"""
100
+ return num_tokens_from_messages(chunk.messages, model=self.agent_context.model_config.model_type)
101
+
102
+ def _count_content_tokens(self, content: str) -> int:
103
+ """Calculate token count for content string"""
104
+ return num_tokens_from_messages(content, model=self.agent_context.model_config.model_type)
105
+
106
+ def _truncate_tokens_from_messages(self, content: str, max_tokens: int, keep_both_sides: bool = False) -> str:
107
+ """Calculate token count for messages using utils.py method"""
108
+ return truncate_tokens_from_messages(content, max_tokens, keep_both_sides, model=self.agent_context.model_config.model_type)
109
+
110
+ def decide_compression_strategy(self, chunk: MessageChunk) -> CompressionDecision:
111
+ """
112
+ Decide compression strategy based on chunk token length
113
+
114
+ Args:
115
+ chunk: Message chunk to analyze
116
+
117
+ Returns:
118
+ CompressionDecision with compression strategy
119
+ """
120
+ compression_type = self._get_compression_type()
121
+
122
+ if (not self.context_rule or
123
+ not self.context_rule.llm_compression_config or
124
+ not self.context_rule.llm_compression_config.enabled):
125
+ return CompressionDecision(
126
+ should_compress=False,
127
+ compression_type=compression_type,
128
+ reason="Compression disabled in config",
129
+ token_count=0
130
+ )
131
+
132
+ token_count = self._count_chunk_tokens(chunk)
133
+ trigger_compress_length = self.context_rule.llm_compression_config.trigger_compress_token_length
134
+
135
+ # No compression needed
136
+ if token_count < trigger_compress_length:
137
+ return CompressionDecision(
138
+ should_compress=False,
139
+ compression_type=compression_type,
140
+ reason=f"Token count {token_count} below threshold {trigger_compress_length}",
141
+ token_count=token_count
142
+ )
143
+
144
+ # Use configured compression for content above threshold
145
+ else:
146
+ return CompressionDecision(
147
+ should_compress=True,
148
+ compression_type=compression_type,
149
+ reason=f"Token count {token_count} exceeds threshold {trigger_compress_length}",
150
+ token_count=token_count
151
+ )
152
+
153
+ def decide_content_compression_strategy(self, content: str) -> CompressionDecision:
154
+ compression_type = self._get_compression_type()
155
+
156
+ if (not self.context_rule or
157
+ not self.context_rule.llm_compression_config or
158
+ not self.context_rule.llm_compression_config.enabled):
159
+ return CompressionDecision(
160
+ should_compress=False,
161
+ compression_type=compression_type,
162
+ reason="Compression disabled in config",
163
+ token_count=0
164
+ )
165
+
166
+ token_count = self._count_content_tokens(content)
167
+ trigger_compress_length = self.context_rule.llm_compression_config.trigger_compress_token_length
168
+
169
+ # No compression needed
170
+ if token_count < trigger_compress_length:
171
+ return CompressionDecision(
172
+ should_compress=False,
173
+ compression_type=compression_type,
174
+ reason=f"Token count {token_count} below threshold {trigger_compress_length}",
175
+ token_count=token_count
176
+ )
177
+
178
+ # Use configured compression for content above threshold
179
+ else:
180
+ return CompressionDecision(
181
+ should_compress=True,
182
+ compression_type=compression_type,
183
+ reason=f"Token count {token_count} exceeds threshold {trigger_compress_length}",
184
+ token_count=token_count
185
+ )
186
+
187
+ def should_compress_conversation(self, messages: List[Dict[str, Any]]) -> bool:
188
+ """Determine whether conversation compression is needed (legacy method for compatibility)"""
189
+ if (not self.context_rule or
190
+ not self.context_rule.llm_compression_config or
191
+ not self.context_rule.llm_compression_config.enabled):
192
+ return False
193
+
194
+ # Create temporary chunk for decision
195
+ temp_chunk = MessageChunk(
196
+ message_type=MessageType.TEXT,
197
+ messages=messages,
198
+ metadata={}
199
+ )
200
+
201
+ decision = self.decide_compression_strategy(temp_chunk)
202
+ return decision.should_compress
203
+
204
+ def should_compress_tool_result(self, result: str) -> bool:
205
+ """Determine whether tool result compression is needed (legacy method for compatibility)"""
206
+ if (not self.context_rule or
207
+ not self.context_rule.llm_compression_config or
208
+ not self.context_rule.llm_compression_config.enabled):
209
+ return False
210
+
211
+ decision = self.decide_content_compression_strategy(result)
212
+ return decision.should_compress
213
+
214
+ def process_message_chunks(self,
215
+ chunks: List[MessageChunk],
216
+ base_metadata: Dict[str, Any] = None) -> List[MessageChunk]:
217
+ processed_chunks = []
218
+
219
+ for chunk in chunks:
220
+ try:
221
+ if chunk.message_type == MessageType.TEXT:
222
+ # Process text message chunks
223
+ processed_chunk = self._process_text_chunk(chunk, base_metadata)
224
+ elif chunk.message_type == MessageType.TOOL:
225
+ # Process tool message chunks
226
+ processed_chunk = self._process_tool_chunk(chunk, base_metadata)
227
+ else:
228
+ # Unknown type, keep as is
229
+ processed_chunk = chunk
230
+ logger.warning(f"Unknown message chunk type: {chunk.message_type}")
231
+
232
+ processed_chunks.append(processed_chunk)
233
+
234
+ except Exception as e:
235
+ logger.error(f"Processing message chunk failed: {traceback.format_exc()}")
236
+ # Keep original chunk on failure
237
+ processed_chunks.append(chunk)
238
+
239
+ return processed_chunks
240
+
241
+ def _process_text_chunk(self,
242
+ chunk: MessageChunk,
243
+ base_metadata: Dict[str, Any] = None) -> MessageChunk:
244
+ decision = self.decide_compression_strategy(chunk)
245
+
246
+ if not decision.should_compress:
247
+ logger.debug(f"Skipping text chunk compression: {decision.reason}")
248
+ return chunk
249
+
250
+ try:
251
+ processed_messages = []
252
+
253
+ for message in chunk.messages:
254
+ content = message.get("content", "")
255
+ if not content or not isinstance(content, str):
256
+ processed_messages.append(message)
257
+ continue
258
+
259
+ logger.info(f'Processing text chunk with LLM compression '
260
+ f'(tokens: {decision.token_count}, reason: {decision.reason})')
261
+
262
+ # Use LLM compression
263
+ compression_result = self.compress_pipeline.compress(content)
264
+
265
+ # Create processed message
266
+ processed_message = message.copy()
267
+ processed_message["content"] = compression_result.compressed_content
268
+ processed_messages.append(processed_message)
269
+
270
+ # Update chunk metadata
271
+ updated_metadata = chunk.metadata.copy()
272
+ updated_metadata.update({
273
+ "processed": True,
274
+ "compression_applied": True,
275
+ "compression_type": "llm_based",
276
+ "compression_reason": decision.reason,
277
+ "original_token_count": decision.token_count,
278
+ "processing_method": "llm_compression",
279
+ "original_message_count": len(chunk.messages),
280
+ "processed_message_count": len(processed_messages)
281
+ })
282
+
283
+ return MessageChunk(
284
+ message_type=chunk.message_type,
285
+ messages=processed_messages,
286
+ metadata=updated_metadata
287
+ )
288
+
289
+ return chunk
290
+
291
+ except Exception as e:
292
+ logger.warning(f"Text chunk compression failed: {traceback.format_exc()}")
293
+ return chunk
294
+
295
+ def _process_tool_chunk(self,
296
+ chunk: MessageChunk,
297
+ base_metadata: Dict[str, Any] = None) -> MessageChunk:
298
+ """Process tool message chunks with LLM compression"""
299
+ try:
300
+ processed_messages = []
301
+
302
+ for message in chunk.messages:
303
+ content = message.get("content", "")
304
+
305
+ # Decide compression strategy for this content
306
+ decision = self.decide_content_compression_strategy(content)
307
+
308
+ if decision.should_compress:
309
+ logger.info(f'Processing tool chunk with LLM compression '
310
+ f'(tokens: {decision.token_count}, reason: {decision.reason})')
311
+
312
+ # Use LLM compression
313
+ compression_result = self.compress_pipeline.compress(
314
+ content,
315
+ metadata={
316
+ "tool_name": message.get("name", "unknown_tool"),
317
+ "message_role": message.get("role", "tool"),
318
+ "content_token_count": decision.token_count,
319
+ "compression_reason": decision.reason
320
+ },
321
+ compression_type=CompressionType.LLM_BASED
322
+ )
323
+
324
+ # Create processed message
325
+ processed_message = message.copy()
326
+ processed_message["content"] = compression_result.compressed_content
327
+ processed_messages.append(processed_message)
328
+ else:
329
+ # Messages that don't need compression are kept as is
330
+ logger.debug(f"Skipping tool content compression: {decision.reason}")
331
+ processed_messages.append(message)
332
+
333
+ # Update chunk metadata with compression info
334
+ updated_metadata = chunk.metadata.copy()
335
+ updated_metadata.update({
336
+ "processed": True,
337
+ "tool_compression_applied": True,
338
+ "processing_method": "llm_compression",
339
+ "original_message_count": len(chunk.messages),
340
+ "processed_message_count": len(processed_messages)
341
+ })
342
+
343
+ return MessageChunk(
344
+ message_type=chunk.message_type,
345
+ messages=processed_messages,
346
+ metadata=updated_metadata
347
+ )
348
+
349
+ except Exception as e:
350
+ logger.warning(f"Tool chunk compression failed: {traceback.format_exc()}")
351
+ return chunk
352
+
353
+ def truncate_messages(self, messages: List[Dict[str, Any]]) -> MessagesProcessingResult:
354
+ """Truncate messages using TruncateCompressor"""
355
+ max_tokens = self.get_max_tokens()
356
+ optimization_enabled = self.context_rule.optimization_config.enabled if self.context_rule else True
357
+
358
+ return self.truncate_compressor.truncate_messages(
359
+ messages=messages,
360
+ max_tokens=max_tokens,
361
+ optimization_enabled=optimization_enabled
362
+ )
363
+
364
+ def compress_messages(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
365
+ if (not self.context_rule or
366
+ not self.context_rule.llm_compression_config or
367
+ not self.context_rule.llm_compression_config.enabled):
368
+ return messages
369
+
370
+ compression_type = self._get_compression_type()
371
+
372
+ if compression_type == CompressionType.LLMLINGUA and self.llmlingua_compressor:
373
+ # Use LLMLingua compression directly on messages
374
+ logger.info("Using LLMLingua compression for messages")
375
+
376
+ try:
377
+ compression_result = self.llmlingua_compressor.compress_messages(messages)
378
+
379
+ # Extract compressed messages from metadata
380
+ compressed_messages = compression_result.metadata.get("compressed_messages", messages)
381
+
382
+ logger.info(f"LLMLingua compression completed. "
383
+ f"Original: {len(messages)} messages, "
384
+ f"Compressed: {len(compressed_messages)} messages, "
385
+ f"Compression ratio: {compression_result.compression_ratio:.2f}")
386
+
387
+ return compressed_messages
388
+
389
+ except Exception as e:
390
+ logger.error(f"LLMLingua compression failed: {e}")
391
+ return messages
392
+
393
+ elif compression_type == CompressionType.LLM_BASED and self.compress_pipeline:
394
+ # Use original chunk-based LLM compression
395
+ logger.info("Using LLM-based compression for messages")
396
+
397
+ # 1. Re-split processed messages
398
+ final_chunk_result = self.chunk_pipeline.split_messages(messages)
399
+
400
+ # 2. Process each chunk
401
+ processed_chunks = self.process_message_chunks(final_chunk_result.chunks)
402
+
403
+ # 3. Re-merge messages
404
+ return self.chunk_pipeline.merge_chunks(processed_chunks)
405
+
406
+ else:
407
+ # No appropriate compressor available
408
+ logger.warning(f"No compressor available for type {compression_type}, returning original messages")
409
+ return messages
410
+
411
+ def process_messages(self, messages: List[Dict[str, Any]], context: Context) -> ContextProcessingResult:
412
+ """Process complete context, return processing results and statistics"""
413
+ start_time = time.time()
414
+ if not self.context_rule.optimization_config.enabled:
415
+ return ContextProcessingResult(
416
+ processed_messages=messages,
417
+ processed_tool_results=None,
418
+ statistics={
419
+ "total_processing_time": 0,
420
+ "original_message_count": len(messages),
421
+ },
422
+ )
423
+
424
+ # 1. Content compression
425
+ compressed_messages = self.compress_messages(messages)
426
+
427
+ # 2. Content length limit
428
+ truncated_result = self.truncate_messages(compressed_messages)
429
+ truncated_messages = truncated_result.processed_messages
430
+
431
+ total_time = time.time() - start_time
432
+
433
+ color_log(f"\nContext processing statistics: "
434
+ f"\nOriginal message count={truncated_result.original_messages_len}"
435
+ f"\nProcessed message count={truncated_result.processing_messaged_len}"
436
+ f"\nMax context length max_context_len={self.get_max_tokens()} = {self.agent_context.context_usage.total_context_length} * {self.context_rule.optimization_config.max_token_budget_ratio}"
437
+ f"\nOriginal token count={truncated_result.original_token_len}"
438
+ f"\nProcessed token count={truncated_result.processing_token_len}"
439
+ f"\nTruncation processing time={truncated_result.processing_time:.3f}s"
440
+ f"\nTotal processing time={total_time:.3f}s"
441
+ f"\nMethod used={truncated_result.method_used}"
442
+ f"\norigin_messages={messages}"
443
+ f"\ntruncated_messages={truncated_messages}",
444
+ color=Color.pink,)
445
+
446
+ return ContextProcessingResult(
447
+ processed_messages=truncated_messages,
448
+ processed_tool_results=None,
449
+ statistics={
450
+ "total_processing_time": total_time,
451
+ "original_message_count": len(messages),
452
+ "truncated_message_count": len(truncated_messages),
453
+ },
454
+ )
455
+
aworld/core/context/processor/truncate_compressor.py ADDED
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coding: utf-8
2
+ # Copyright (c) 2025 inclusionAI.
3
+
4
+ import time
5
+ import logging
6
+ from typing import Any, Dict, List
7
+
8
+ from aworld.config.conf import ModelConfig
9
+ from aworld.core.context.processor import CompressionResult, CompressionType, MessagesProcessingResult
10
+ from aworld.core.context.processor.base_compressor import BaseCompressor
11
+ from aworld.logs.util import Color, color_log
12
+ from aworld.models.utils import num_tokens_from_messages
13
+ from aworld.utils import import_package
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ class TruncateCompressor(BaseCompressor):
19
+ """
20
+ Truncate messages compressor for content length management
21
+ """
22
+
23
+ def __init__(self, config: Dict[str, Any] = None, llm_config: ModelConfig = None):
24
+ super().__init__(config, llm_config)
25
+ self.model_type = llm_config.model_type if llm_config else "gpt-3.5-turbo"
26
+ self._init_tokenizer()
27
+
28
+ def _init_tokenizer(self):
29
+ """Initialize tokenizer for text truncation"""
30
+ try:
31
+ import_package("tiktoken")
32
+ import tiktoken
33
+
34
+ if self.model_type.lower() == "qwen":
35
+ from aworld.models.qwen_tokenizer import qwen_tokenizer
36
+ self.tokenizer = qwen_tokenizer
37
+ elif self.model_type.lower() == "openai":
38
+ from aworld.models.openai_tokenizer import openai_tokenizer
39
+ self.tokenizer = openai_tokenizer
40
+ else:
41
+ try:
42
+ self.encoding = tiktoken.encoding_for_model(self.model_type)
43
+ self.tokenizer = None # Use tiktoken directly
44
+ except KeyError:
45
+ logger.warning(f"{self.model_type} model not found. Using cl100k_base encoding.")
46
+ self.encoding = tiktoken.get_encoding("cl100k_base")
47
+ self.tokenizer = None
48
+ except ImportError:
49
+ logger.error("tiktoken not available, text truncation may not work properly")
50
+ self.tokenizer = None
51
+ self.encoding = None
52
+
53
+ def _count_tokens_from_messages(self, messages: List[Dict[str, Any]]) -> int:
54
+ """Calculate token count for messages using utils.py method"""
55
+ return num_tokens_from_messages(messages, model=self.model_type)
56
+
57
+ def _count_tokens_from_message(self, msg: Dict[str, Any]) -> int:
58
+ """Calculate token count for single message using utils.py method"""
59
+ # Convert single message to list format for num_tokens_from_messages
60
+ return num_tokens_from_messages([msg], model=self.model_type)
61
+
62
+ def _truncate_text(self, text: str, max_tokens: int, keep_both_sides: bool = False) -> str:
63
+ """Truncate text content using appropriate tokenizer"""
64
+ if not text:
65
+ return text
66
+
67
+ # Ensure max_tokens is an integer
68
+ max_tokens = int(max_tokens)
69
+ if max_tokens <= 0:
70
+ return ""
71
+
72
+ try:
73
+ if self.tokenizer:
74
+ # Use custom tokenizer (qwen/openai)
75
+ return self.tokenizer.truncate(text, max_tokens, keep_both_sides=keep_both_sides)
76
+ elif self.encoding:
77
+ # Use tiktoken encoding directly
78
+ tokens = self.encoding.encode(text)
79
+ if len(tokens) <= max_tokens:
80
+ return text
81
+
82
+ if keep_both_sides:
83
+ ellipsis = "..."
84
+ ellipsis_tokens = self.encoding.encode(ellipsis)
85
+ ellipsis_len = len(ellipsis_tokens)
86
+ available = max_tokens - ellipsis_len
87
+ if available <= 0:
88
+ # Not enough space for ellipsis
89
+ truncated_tokens = tokens[:max_tokens]
90
+ else:
91
+ left_len = int(available // 2)
92
+ right_len = int(available - left_len)
93
+ truncated_tokens = tokens[:left_len] + ellipsis_tokens + tokens[-right_len:]
94
+ else:
95
+ truncated_tokens = tokens[:max_tokens]
96
+
97
+ return self.encoding.decode(truncated_tokens)
98
+ else:
99
+ # Fallback: simple character truncation
100
+ logger.warning("No tokenizer available, using character-based truncation")
101
+ target_len = max_tokens * 4 # Rough estimate: 1 token = 4 chars
102
+ target_len = int(target_len)
103
+
104
+ if len(text) <= target_len:
105
+ return text
106
+
107
+ if keep_both_sides:
108
+ ellipsis = "..."
109
+ available = target_len - len(ellipsis)
110
+ if available <= 0:
111
+ return text[:target_len]
112
+ left_len = int(available // 2)
113
+ right_len = int(available - left_len)
114
+ return text[:left_len] + ellipsis + text[-right_len:]
115
+ else:
116
+ return text[:target_len]
117
+ except Exception as e:
118
+ logger.error(f"Text truncation failed: {e}")
119
+ return text
120
+
121
+ def _truncate_message(self, msg: Dict[str, Any], max_tokens: int, keep_both_sides: bool = False):
122
+ """Truncate single message content"""
123
+ # Ensure max_tokens is an integer
124
+ max_tokens = int(max_tokens)
125
+
126
+ content = msg.get("content", "")
127
+ if isinstance(content, str):
128
+ truncated_content = self._truncate_text(content, max_tokens, keep_both_sides)
129
+ else:
130
+ # Handle complex content formats
131
+ if isinstance(content, list):
132
+ text_parts = []
133
+ for item in content:
134
+ if isinstance(item, dict) and item.get("text"):
135
+ text_parts.append(item["text"])
136
+ elif isinstance(item, str):
137
+ text_parts.append(item)
138
+ if not text_parts:
139
+ return None
140
+ text = '\n'.join(text_parts)
141
+ else:
142
+ text = str(content)
143
+ truncated_content = self._truncate_text(text, max_tokens, keep_both_sides)
144
+
145
+ new_msg = msg.copy()
146
+ new_msg["content"] = truncated_content
147
+ return new_msg
148
+
149
+ def is_out_of_context(self, messages: List[Dict[str, Any]], max_tokens: int) -> bool:
150
+ """Check if messages exceed token limit"""
151
+ max_tokens = int(max_tokens)
152
+ return self._count_tokens_from_messages(messages) > max_tokens
153
+
154
+ def truncate_messages(self, messages: List[Dict[str, Any]], max_tokens: int,
155
+ optimization_enabled: bool = True) -> MessagesProcessingResult:
156
+ """Truncate messages based on _truncate_input_messages_roughly logic"""
157
+ start_time = time.time()
158
+ original_messages_len = len(messages)
159
+ original_token_len = self._count_tokens_from_messages(messages)
160
+
161
+ # Ensure max_tokens is an integer
162
+ max_tokens = int(max_tokens)
163
+
164
+ if not optimization_enabled:
165
+ processing_time = time.time() - start_time
166
+ return MessagesProcessingResult(
167
+ original_token_len=original_token_len,
168
+ processing_token_len=original_token_len,
169
+ original_messages_len=original_messages_len,
170
+ processing_messaged_len=original_messages_len,
171
+ processing_time=processing_time,
172
+ method_used="no_optimization",
173
+ processed_messages=messages
174
+ )
175
+
176
+ if not self.is_out_of_context(messages=messages, max_tokens=max_tokens):
177
+ processing_time = time.time() - start_time
178
+ return MessagesProcessingResult(
179
+ original_token_len=original_token_len,
180
+ processing_token_len=original_token_len,
181
+ original_messages_len=original_messages_len,
182
+ processing_messaged_len=original_messages_len,
183
+ processing_time=processing_time,
184
+ method_used="within_context_limit",
185
+ processed_messages=messages
186
+ )
187
+
188
+ # Group messages by conversation turns
189
+ turns = []
190
+ for m in messages:
191
+ if m.get("role") == "system":
192
+ continue
193
+ elif m.get("role") == "user":
194
+ turns.append([m])
195
+ else:
196
+ if turns:
197
+ turns[-1].append(m)
198
+ else:
199
+ raise Exception('The input messages (excluding the system message) must start with a user message.')
200
+
201
+ # Process system messages
202
+ if messages and messages[0].get("role") == "system":
203
+ sys_msg = messages[0]
204
+ available_token = max_tokens - self._count_tokens_from_message(sys_msg)
205
+ else:
206
+ sys_msg = None
207
+ available_token = max_tokens
208
+
209
+ # Process messages from back to front, keep the latest conversations
210
+ token_cnt = 0
211
+ new_messages = []
212
+ user_message_count = 0
213
+ for i in range(len(messages) - 1, -1, -1):
214
+ if messages[i].get("role") == "system":
215
+ continue
216
+
217
+ cur_token_cnt = self._count_tokens_from_message(messages[i])
218
+ if cur_token_cnt <= available_token:
219
+ if messages[i].get("role") == "user":
220
+ user_message_count += 1
221
+ new_messages = [messages[i]] + new_messages
222
+ available_token -= cur_token_cnt
223
+ else:
224
+ # Try to truncate message
225
+ if (messages[i].get("role") == "user"):
226
+ # Truncate user message (not the last one)
227
+ color_log(f"to truncate message {messages[i]}", color=Color.pink)
228
+ _msg = self._truncate_message(messages[i], max_tokens=int(available_token))
229
+ color_log(f"truncated message {messages[i]}, {_msg}", color=Color.pink)
230
+ if _msg:
231
+ new_messages = [_msg] + new_messages
232
+ break
233
+ elif messages[i].get("role") == "function" or messages[i].get("role") == "assistant" or messages[i].get("role") == "system":
234
+ # Truncate function message, keep both ends
235
+ logger.debug(f"to truncate message {messages[i]}")
236
+ _msg = self._truncate_message(messages[i], max_tokens=int(available_token), keep_both_sides=True)
237
+ logger.debug(f"truncated message {messages[i]}, {_msg}")
238
+ if _msg:
239
+ new_messages = [_msg] + new_messages
240
+ # Edge case: if the last message is a very long tool message, it might end up with only system+tool without user message, which will cause LLM call to fail
241
+ elif user_message_count == 0:
242
+ continue
243
+ else:
244
+ break
245
+ else:
246
+ # Cannot truncate, record token count and exit
247
+ token_cnt = (max_tokens - available_token) + cur_token_cnt
248
+ break
249
+
250
+ # Re-add system message
251
+ if sys_msg is not None:
252
+ new_messages = [sys_msg] + new_messages
253
+
254
+ # Calculate processed statistics
255
+ processing_time = time.time() - start_time
256
+ processing_token_len = self._count_tokens_from_messages(new_messages)
257
+ processing_messaged_len = len(new_messages)
258
+
259
+ return MessagesProcessingResult(
260
+ original_token_len=original_token_len,
261
+ processing_token_len=processing_token_len,
262
+ original_messages_len=original_messages_len,
263
+ processing_messaged_len=processing_messaged_len,
264
+ processing_time=processing_time,
265
+ method_used="truncate_messages",
266
+ processed_messages=new_messages
267
+ )
268
+
269
+ def compress(self, content: str) -> CompressionResult:
270
+ """
271
+ Compress content by truncating it (for compatibility with BaseCompressor interface)
272
+ """
273
+ # This is a simple truncation, not actual compression
274
+ # For consistency with other compressors, we provide this method
275
+ original_content = content
276
+
277
+ # Use a reasonable default max_tokens for single content truncation
278
+ max_tokens = self.config.get("max_tokens", 2000)
279
+
280
+ try:
281
+ truncated_content = self._truncate_text(content, max_tokens, False)
282
+ compression_ratio = len(truncated_content) / len(original_content) if original_content else 1.0
283
+
284
+ return CompressionResult(
285
+ original_content=original_content,
286
+ compressed_content=truncated_content,
287
+ compression_ratio=compression_ratio,
288
+ metadata={"method": "truncation", "max_tokens": max_tokens},
289
+ compression_type=CompressionType.LLM_BASED # Default type
290
+ )
291
+
292
+ except Exception as e:
293
+ logger.error(f"Truncation failed: {e}")
294
+ return CompressionResult(
295
+ original_content=original_content,
296
+ compressed_content=content,
297
+ compression_ratio=1.0,
298
+ metadata={"error": str(e)},
299
+ compression_type=CompressionType.LLM_BASED
300
+ )
301
+
302
+ def compress_messages(self, messages: List[Dict[str, Any]]) -> CompressionResult:
303
+ """
304
+ Compress messages by truncating them (for compatibility with BaseCompressor interface)
305
+ """
306
+ if not messages:
307
+ return CompressionResult(
308
+ original_content="[]",
309
+ compressed_content="[]",
310
+ compression_ratio=1.0,
311
+ metadata={},
312
+ compression_type=CompressionType.LLM_BASED
313
+ )
314
+
315
+ original_content = str(messages)
316
+ max_tokens = self.config.get("max_tokens", 4000)
317
+
318
+ try:
319
+ result = self.truncate_messages(messages, max_tokens, optimization_enabled=True)
320
+
321
+ compressed_content = str(result.processed_messages)
322
+ compression_ratio = result.processing_token_len / result.original_token_len if result.original_token_len > 0 else 1.0
323
+
324
+ return CompressionResult(
325
+ original_content=original_content,
326
+ compressed_content=compressed_content,
327
+ compression_ratio=compression_ratio,
328
+ metadata={
329
+ "method": "truncation",
330
+ "max_tokens": max_tokens,
331
+ "truncated_messages": result.processed_messages,
332
+ "original_message_count": result.original_messages_len,
333
+ "processed_message_count": result.processing_messaged_len,
334
+ "method_used": result.method_used
335
+ },
336
+ compression_type=CompressionType.LLM_BASED
337
+ )
338
+
339
+ except Exception as e:
340
+ logger.error(f"Message truncation failed: {e}")
341
+ return CompressionResult(
342
+ original_content=original_content,
343
+ compressed_content=original_content,
344
+ compression_ratio=1.0,
345
+ metadata={"error": str(e)},
346
+ compression_type=CompressionType.LLM_BASED
347
+ )
348
+
349
+ def compress_batch(self, contents: List[str]) -> List[CompressionResult]:
350
+ """Compress multiple contents in batch"""
351
+ results = []
352
+ for content in contents:
353
+ result = self.compress(content)
354
+ results.append(result)
355
+ return results