Spaces:
Running
Running
File size: 14,856 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
#!/usr/bin/env python
"""
Pure Trace Analysis Functions
This module provides pure functions for analyzing trace content characteristics
without any database dependencies. These functions can be used independently
or by the backend services for trace management.
"""
import re
import logging
from typing import Dict, Any, Tuple
logger = logging.getLogger(__name__)
def analyze_trace_characteristics(
trace_content: str,
optimize_content: bool = True
) -> Dict[str, Any]:
"""
Analyze the trace content to determine its characteristics and optimal parameters
for sliding window processing.
Args:
trace_content: The full content of the trace
optimize_content: Whether to apply content optimization for cost savings
Returns:
Dictionary with trace characteristics and recommended parameters
"""
logger.info("Analyzing trace characteristics...")
# Apply content optimization if requested
optimization_stats = None
if optimize_content:
logger.info("Applying content optimization for cost savings...")
trace_content, optimization_stats = preprocess_content_for_cost_optimization(trace_content, aggressive=True)
# Calculate basic content metrics (on optimized content)
total_length = len(trace_content)
lines = trace_content.split('\n')
line_count = len(lines)
# Calculate average line length
avg_line_length = sum(len(line) for line in lines) / max(1, line_count)
# Check for code density (common programming keywords and syntax)
code_pattern = r'function|class|def|return|import|export|const|var|let|async|await|if|else|for|while'
code_matches = len(re.findall(code_pattern, trace_content))
code_density = code_matches / max(1, total_length/100) # Normalize per 100 chars
# Check for structured data (JSON, XML, etc.)
struct_pattern = r'[{}\[\],":]+|<\/?[a-z]+>|<\/[a-z]+>'
struct_matches = len(re.findall(struct_pattern, trace_content))
struct_density = struct_matches / max(1, total_length/100) # Normalize per 100 chars
# Calculate entity patterns (camelCase, PascalCase, snake_case identifiers)
entity_pattern = r'\b[A-Z][a-z]+[A-Z][a-zA-Z]*\b|\b[a-z]+_[a-z_]+\b|\b[a-z]+[A-Z][a-zA-Z]*\b'
entity_matches = len(re.findall(entity_pattern, trace_content))
entity_density = entity_matches / max(1, total_length/100) # Normalize per 100 chars
# Check for complete interaction patterns - IMPROVED PATTERN
# Look for full interaction blocks with system/user/assistant messages
interaction_sizes = []
conversation_turn_count = 0
# Try to identify input/output blocks in JSON format
input_output_pattern = r'"input":.*?"output":'
io_blocks = re.findall(input_output_pattern, trace_content, re.DOTALL)
if io_blocks:
for block in io_blocks:
interaction_sizes.append(len(block))
avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0
logger.info(f"Found {len(io_blocks)} input/output blocks with average size of {avg_interaction_size:.0f} characters")
# If we couldn't find structured blocks, try to find conversation turns
if not interaction_sizes:
conversation_pattern = r'(user:|assistant:|system:).*?(user:|assistant:|system:|\Z)'
conversation_blocks = re.findall(conversation_pattern, trace_content, re.DOTALL)
if conversation_blocks:
for block in conversation_blocks:
# The full match is in the first element
full_text = block[0] if isinstance(block, tuple) else block
interaction_sizes.append(len(full_text))
avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0
conversation_turn_count = len(conversation_blocks)
logger.info(f"Found {len(conversation_blocks)} conversation blocks with average size of {avg_interaction_size:.0f} characters")
# Count patterns for conversation analysis
human_turns = len(re.findall(r'\buser:', trace_content, re.IGNORECASE))
ai_turns = len(re.findall(r'\bassistant:', trace_content, re.IGNORECASE))
system_messages = len(re.findall(r'\bsystem:', trace_content, re.IGNORECASE))
# Base parameters (ultra-aggressive optimization for 1M token limit)
base_window = 350000 # Increased from 200K to 350K characters (~87K tokens)
base_overlap_ratio = 0.05 # Reduced from 12.5% to 5% overlap (ultra-minimal)
# Adjust window size based on content characteristics
if interaction_sizes:
# Calculate max interaction size to ensure we capture full interactions
max_interaction_size = max(interaction_sizes)
# Use at least 2x the max interaction size to ensure complete interactions
window_size = max(base_window, int(max_interaction_size * 2))
logger.info(f"Adjusting window size based on max interaction size ({max_interaction_size} chars): {window_size}")
overlap_ratio = 0.08 # 8% overlap to ensure we don't split interactions (still very minimal)
elif struct_density > 0.5:
# High structured data density (JSON, XML) - use maximum windows
window_size = 400000 # Use maximum window size
overlap_ratio = 0.03 # 3% overlap for structured data (minimal)
trace_type = "structured_data"
elif code_density > 0.5:
# High code density - use maximum windows to capture entire modules
window_size = 400000 # Use maximum window size
overlap_ratio = 0.05 # 5% overlap for code (minimal)
trace_type = "code"
elif entity_density > 0.5:
# High entity density - use very large windows with minimal overlap
window_size = int(base_window * 1.14) # ~400K window size
overlap_ratio = 0.05 # 5% overlap for entity relationships
trace_type = "entity_rich"
elif avg_line_length > 150:
# Very long lines (likely logs) - use maximum window size with minimal overlap
window_size = 400000 # Maximum window size
overlap_ratio = 0.03 # 3% overlap for logs (very minimal)
trace_type = "log"
elif avg_line_length < 50:
# Short lines (conversation logs) - still use very large windows
window_size = int(base_window * 1.14) # ~400K window size
overlap_ratio = 0.08 # 8% overlap for context (still minimal)
trace_type = "conversation"
else:
# Default case - use base parameters
window_size = base_window
overlap_ratio = base_overlap_ratio
trace_type = "general"
# Ultra-aggressive scaling for large documents
if total_length > 800000:
# Very large document - force to maximum window size
window_size = 400000 # Maximum possible
overlap_ratio = 0.03 # Minimal overlap
logger.info(f"Ultra-large document detected ({total_length:,} chars): using maximum window size with minimal overlap")
elif total_length > 400000:
# Large document - use near-maximum window sizes
window_size = min(400000, int(total_length * 0.6)) # 60% of total length or max
overlap_ratio = 0.04 # Very minimal overlap
logger.info(f"Large document detected ({total_length:,} chars): using near-maximum window size")
elif total_length > 200000:
# Medium-large document - use large windows
window_size = min(400000, int(total_length * 0.8)) # 80% of total length or max
overlap_ratio = 0.05 # Minimal overlap
# Calculate overlap size from ratio
overlap_size = int(window_size * overlap_ratio)
# Ultra-aggressive constraints - push to absolute limits
window_size = max(100000, min(400000, window_size)) # Between 100K-400K chars (25K-100K tokens)
overlap_size = max(2000, min(20000, overlap_size)) # Between 2K-20K chars (minimal overlap)
# Estimate number of windows needed
estimated_windows = max(1, int((total_length - overlap_size) / (window_size - overlap_size) + 1))
# Determine trace type if it wasn't set above
if 'trace_type' not in locals():
if human_turns > 0 and ai_turns > 0:
trace_type = "conversation"
elif code_density > 0.2:
trace_type = "code"
elif struct_density > 0.2:
trace_type = "structured_data"
else:
trace_type = "general"
# Return comprehensive trace analysis
return {
"total_length": total_length,
"line_count": line_count,
"avg_line_length": round(avg_line_length, 1),
"code_density": round(code_density, 2),
"struct_density": round(struct_density, 2),
"entity_density": round(entity_density, 2),
"interaction_count": len(interaction_sizes) if interaction_sizes else conversation_turn_count,
"avg_interaction_size": round(sum(interaction_sizes) / len(interaction_sizes), 1) if interaction_sizes else 0,
"human_turns": human_turns,
"ai_turns": ai_turns,
"system_messages": system_messages,
"trace_type": trace_type,
"recommended_window_size": window_size,
"recommended_overlap_size": overlap_size,
"estimated_windows": estimated_windows,
"processing_complexity": "high" if estimated_windows > 10 else "medium" if estimated_windows > 3 else "low",
"optimization_stats": optimization_stats
}
def display_trace_summary(analysis: Dict[str, Any]) -> None:
"""
Display a formatted summary of trace characteristics to the user.
Args:
analysis: Dictionary containing trace analysis data
"""
print("\n" + "=" * 80)
print(f"TRACE ANALYSIS SUMMARY")
print("=" * 80)
# Basic statistics
print(f"\nBASIC STATISTICS:")
print(f" Total length: {analysis['total_length']:,} characters")
print(f" Line count: {analysis['line_count']:,} lines")
print(f" Average line length: {analysis['avg_line_length']:.1f} characters")
# Content type
print(f"\nCONTENT CHARACTERISTICS:")
print(f" Detected trace type: {analysis['trace_type'].replace('_', ' ').title()}")
if analysis['trace_type'] == 'conversation':
print(f" Conversation turns: {analysis['interaction_count']} ")
print(f" - Human messages: {analysis['human_turns']}")
print(f" - AI messages: {analysis['ai_turns']}")
print(f" - System messages: {analysis['system_messages']}")
if analysis['avg_interaction_size'] > 0:
print(f" Average message size: {analysis['avg_interaction_size']:.1f} characters")
print(f" Content density metrics:")
print(f" - Code density: {analysis['code_density']:.2f}")
print(f" - Structured data density: {analysis['struct_density']:.2f}")
print(f" - Entity density: {analysis['entity_density']:.2f}")
# Processing recommendations
print(f"\nPROCESSING RECOMMENDATIONS:")
print(f" Recommended window size: {analysis['recommended_window_size']:,} characters")
print(f" Recommended overlap size: {analysis['recommended_overlap_size']:,} characters")
print(f" Estimated number of windows: {analysis['estimated_windows']}")
print(f" Processing complexity: {analysis['processing_complexity'].title()}")
# Processing time estimate
base_time_per_window = 15 # seconds per window baseline
if analysis['processing_complexity'] == 'high':
time_factor = 1.5
elif analysis['processing_complexity'] == 'medium':
time_factor = 1.0
else:
time_factor = 0.8
estimated_time = base_time_per_window * analysis['estimated_windows'] * time_factor
minutes = int(estimated_time / 60)
seconds = int(estimated_time % 60)
print(f" Estimated processing time: ~{minutes}m {seconds}s")
print("\n" + "=" * 80)
def preprocess_content_for_cost_optimization(content: str, aggressive: bool = True) -> Tuple[str, Dict[str, Any]]:
"""
Preprocess content to reduce character count while preserving semantic meaning.
This can significantly reduce API costs by removing redundant data.
Args:
content: Original content
aggressive: Whether to apply aggressive optimization
Returns:
Tuple of (optimized_content, optimization_stats)
"""
original_length = len(content)
# Start with basic optimizations
optimized = content
# Remove excessive whitespace and normalize line endings
optimized = re.sub(r'\n\s*\n\s*\n+', '\n\n', optimized) # Convert 3+ newlines to 2
optimized = re.sub(r'[ \t]+', ' ', optimized) # Multiple spaces/tabs to single space
optimized = re.sub(r' +\n', '\n', optimized) # Remove trailing spaces
if aggressive:
# More aggressive optimizations that may slightly reduce semantic richness
# but significantly reduce costs
# Remove debug/verbose logging patterns
optimized = re.sub(r'DEBUG:.*?\n', '', optimized)
optimized = re.sub(r'TRACE:.*?\n', '', optimized)
optimized = re.sub(r'\[DEBUG\].*?\n', '', optimized)
# Remove timestamp patterns (keep date for context)
optimized = re.sub(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized)
optimized = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized)
# Remove excessive JSON indentation while preserving structure
optimized = re.sub(r'\n {8,}', '\n ', optimized) # Reduce deep indentation
# Remove empty JSON objects/arrays that add no semantic value
optimized = re.sub(r'{\s*}', '{}', optimized)
optimized = re.sub(r'\[\s*\]', '[]', optimized)
# Normalize repeated patterns (like multiple "---" separators)
optimized = re.sub(r'-{5,}', '-----', optimized)
optimized = re.sub(r'={5,}', '=====', optimized)
# Remove excessive blank lines in code/JSON blocks
optimized = re.sub(r'(\{[^}]*)\n\n+([^}]*\})', r'\1\n\2', optimized)
# Final cleanup
optimized = optimized.strip()
# Calculate statistics
final_length = len(optimized)
reduction = original_length - final_length
reduction_percentage = (reduction / original_length) * 100 if original_length > 0 else 0
stats = {
"original_length": original_length,
"optimized_length": final_length,
"characters_removed": reduction,
"reduction_percentage": round(reduction_percentage, 2),
"aggressive_mode": aggressive
}
logger.info(f"Content optimization complete: {reduction:,} characters removed ({reduction_percentage:.1f}% reduction)")
return optimized, stats |