#!/usr/bin/env python """ Pure Trace Analysis Functions This module provides pure functions for analyzing trace content characteristics without any database dependencies. These functions can be used independently or by the backend services for trace management. """ import re import logging from typing import Dict, Any, Tuple logger = logging.getLogger(__name__) def analyze_trace_characteristics( trace_content: str, optimize_content: bool = True ) -> Dict[str, Any]: """ Analyze the trace content to determine its characteristics and optimal parameters for sliding window processing. Args: trace_content: The full content of the trace optimize_content: Whether to apply content optimization for cost savings Returns: Dictionary with trace characteristics and recommended parameters """ logger.info("Analyzing trace characteristics...") # Apply content optimization if requested optimization_stats = None if optimize_content: logger.info("Applying content optimization for cost savings...") trace_content, optimization_stats = preprocess_content_for_cost_optimization(trace_content, aggressive=True) # Calculate basic content metrics (on optimized content) total_length = len(trace_content) lines = trace_content.split('\n') line_count = len(lines) # Calculate average line length avg_line_length = sum(len(line) for line in lines) / max(1, line_count) # Check for code density (common programming keywords and syntax) code_pattern = r'function|class|def|return|import|export|const|var|let|async|await|if|else|for|while' code_matches = len(re.findall(code_pattern, trace_content)) code_density = code_matches / max(1, total_length/100) # Normalize per 100 chars # Check for structured data (JSON, XML, etc.) struct_pattern = r'[{}\[\],":]+|<\/?[a-z]+>|<\/[a-z]+>' struct_matches = len(re.findall(struct_pattern, trace_content)) struct_density = struct_matches / max(1, total_length/100) # Normalize per 100 chars # Calculate entity patterns (camelCase, PascalCase, snake_case identifiers) entity_pattern = r'\b[A-Z][a-z]+[A-Z][a-zA-Z]*\b|\b[a-z]+_[a-z_]+\b|\b[a-z]+[A-Z][a-zA-Z]*\b' entity_matches = len(re.findall(entity_pattern, trace_content)) entity_density = entity_matches / max(1, total_length/100) # Normalize per 100 chars # Check for complete interaction patterns - IMPROVED PATTERN # Look for full interaction blocks with system/user/assistant messages interaction_sizes = [] conversation_turn_count = 0 # Try to identify input/output blocks in JSON format input_output_pattern = r'"input":.*?"output":' io_blocks = re.findall(input_output_pattern, trace_content, re.DOTALL) if io_blocks: for block in io_blocks: interaction_sizes.append(len(block)) avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0 logger.info(f"Found {len(io_blocks)} input/output blocks with average size of {avg_interaction_size:.0f} characters") # If we couldn't find structured blocks, try to find conversation turns if not interaction_sizes: conversation_pattern = r'(user:|assistant:|system:).*?(user:|assistant:|system:|\Z)' conversation_blocks = re.findall(conversation_pattern, trace_content, re.DOTALL) if conversation_blocks: for block in conversation_blocks: # The full match is in the first element full_text = block[0] if isinstance(block, tuple) else block interaction_sizes.append(len(full_text)) avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0 conversation_turn_count = len(conversation_blocks) logger.info(f"Found {len(conversation_blocks)} conversation blocks with average size of {avg_interaction_size:.0f} characters") # Count patterns for conversation analysis human_turns = len(re.findall(r'\buser:', trace_content, re.IGNORECASE)) ai_turns = len(re.findall(r'\bassistant:', trace_content, re.IGNORECASE)) system_messages = len(re.findall(r'\bsystem:', trace_content, re.IGNORECASE)) # Base parameters (ultra-aggressive optimization for 1M token limit) base_window = 350000 # Increased from 200K to 350K characters (~87K tokens) base_overlap_ratio = 0.05 # Reduced from 12.5% to 5% overlap (ultra-minimal) # Adjust window size based on content characteristics if interaction_sizes: # Calculate max interaction size to ensure we capture full interactions max_interaction_size = max(interaction_sizes) # Use at least 2x the max interaction size to ensure complete interactions window_size = max(base_window, int(max_interaction_size * 2)) logger.info(f"Adjusting window size based on max interaction size ({max_interaction_size} chars): {window_size}") overlap_ratio = 0.08 # 8% overlap to ensure we don't split interactions (still very minimal) elif struct_density > 0.5: # High structured data density (JSON, XML) - use maximum windows window_size = 400000 # Use maximum window size overlap_ratio = 0.03 # 3% overlap for structured data (minimal) trace_type = "structured_data" elif code_density > 0.5: # High code density - use maximum windows to capture entire modules window_size = 400000 # Use maximum window size overlap_ratio = 0.05 # 5% overlap for code (minimal) trace_type = "code" elif entity_density > 0.5: # High entity density - use very large windows with minimal overlap window_size = int(base_window * 1.14) # ~400K window size overlap_ratio = 0.05 # 5% overlap for entity relationships trace_type = "entity_rich" elif avg_line_length > 150: # Very long lines (likely logs) - use maximum window size with minimal overlap window_size = 400000 # Maximum window size overlap_ratio = 0.03 # 3% overlap for logs (very minimal) trace_type = "log" elif avg_line_length < 50: # Short lines (conversation logs) - still use very large windows window_size = int(base_window * 1.14) # ~400K window size overlap_ratio = 0.08 # 8% overlap for context (still minimal) trace_type = "conversation" else: # Default case - use base parameters window_size = base_window overlap_ratio = base_overlap_ratio trace_type = "general" # Ultra-aggressive scaling for large documents if total_length > 800000: # Very large document - force to maximum window size window_size = 400000 # Maximum possible overlap_ratio = 0.03 # Minimal overlap logger.info(f"Ultra-large document detected ({total_length:,} chars): using maximum window size with minimal overlap") elif total_length > 400000: # Large document - use near-maximum window sizes window_size = min(400000, int(total_length * 0.6)) # 60% of total length or max overlap_ratio = 0.04 # Very minimal overlap logger.info(f"Large document detected ({total_length:,} chars): using near-maximum window size") elif total_length > 200000: # Medium-large document - use large windows window_size = min(400000, int(total_length * 0.8)) # 80% of total length or max overlap_ratio = 0.05 # Minimal overlap # Calculate overlap size from ratio overlap_size = int(window_size * overlap_ratio) # Ultra-aggressive constraints - push to absolute limits window_size = max(100000, min(400000, window_size)) # Between 100K-400K chars (25K-100K tokens) overlap_size = max(2000, min(20000, overlap_size)) # Between 2K-20K chars (minimal overlap) # Estimate number of windows needed estimated_windows = max(1, int((total_length - overlap_size) / (window_size - overlap_size) + 1)) # Determine trace type if it wasn't set above if 'trace_type' not in locals(): if human_turns > 0 and ai_turns > 0: trace_type = "conversation" elif code_density > 0.2: trace_type = "code" elif struct_density > 0.2: trace_type = "structured_data" else: trace_type = "general" # Return comprehensive trace analysis return { "total_length": total_length, "line_count": line_count, "avg_line_length": round(avg_line_length, 1), "code_density": round(code_density, 2), "struct_density": round(struct_density, 2), "entity_density": round(entity_density, 2), "interaction_count": len(interaction_sizes) if interaction_sizes else conversation_turn_count, "avg_interaction_size": round(sum(interaction_sizes) / len(interaction_sizes), 1) if interaction_sizes else 0, "human_turns": human_turns, "ai_turns": ai_turns, "system_messages": system_messages, "trace_type": trace_type, "recommended_window_size": window_size, "recommended_overlap_size": overlap_size, "estimated_windows": estimated_windows, "processing_complexity": "high" if estimated_windows > 10 else "medium" if estimated_windows > 3 else "low", "optimization_stats": optimization_stats } def display_trace_summary(analysis: Dict[str, Any]) -> None: """ Display a formatted summary of trace characteristics to the user. Args: analysis: Dictionary containing trace analysis data """ print("\n" + "=" * 80) print(f"TRACE ANALYSIS SUMMARY") print("=" * 80) # Basic statistics print(f"\nBASIC STATISTICS:") print(f" Total length: {analysis['total_length']:,} characters") print(f" Line count: {analysis['line_count']:,} lines") print(f" Average line length: {analysis['avg_line_length']:.1f} characters") # Content type print(f"\nCONTENT CHARACTERISTICS:") print(f" Detected trace type: {analysis['trace_type'].replace('_', ' ').title()}") if analysis['trace_type'] == 'conversation': print(f" Conversation turns: {analysis['interaction_count']} ") print(f" - Human messages: {analysis['human_turns']}") print(f" - AI messages: {analysis['ai_turns']}") print(f" - System messages: {analysis['system_messages']}") if analysis['avg_interaction_size'] > 0: print(f" Average message size: {analysis['avg_interaction_size']:.1f} characters") print(f" Content density metrics:") print(f" - Code density: {analysis['code_density']:.2f}") print(f" - Structured data density: {analysis['struct_density']:.2f}") print(f" - Entity density: {analysis['entity_density']:.2f}") # Processing recommendations print(f"\nPROCESSING RECOMMENDATIONS:") print(f" Recommended window size: {analysis['recommended_window_size']:,} characters") print(f" Recommended overlap size: {analysis['recommended_overlap_size']:,} characters") print(f" Estimated number of windows: {analysis['estimated_windows']}") print(f" Processing complexity: {analysis['processing_complexity'].title()}") # Processing time estimate base_time_per_window = 15 # seconds per window baseline if analysis['processing_complexity'] == 'high': time_factor = 1.5 elif analysis['processing_complexity'] == 'medium': time_factor = 1.0 else: time_factor = 0.8 estimated_time = base_time_per_window * analysis['estimated_windows'] * time_factor minutes = int(estimated_time / 60) seconds = int(estimated_time % 60) print(f" Estimated processing time: ~{minutes}m {seconds}s") print("\n" + "=" * 80) def preprocess_content_for_cost_optimization(content: str, aggressive: bool = True) -> Tuple[str, Dict[str, Any]]: """ Preprocess content to reduce character count while preserving semantic meaning. This can significantly reduce API costs by removing redundant data. Args: content: Original content aggressive: Whether to apply aggressive optimization Returns: Tuple of (optimized_content, optimization_stats) """ original_length = len(content) # Start with basic optimizations optimized = content # Remove excessive whitespace and normalize line endings optimized = re.sub(r'\n\s*\n\s*\n+', '\n\n', optimized) # Convert 3+ newlines to 2 optimized = re.sub(r'[ \t]+', ' ', optimized) # Multiple spaces/tabs to single space optimized = re.sub(r' +\n', '\n', optimized) # Remove trailing spaces if aggressive: # More aggressive optimizations that may slightly reduce semantic richness # but significantly reduce costs # Remove debug/verbose logging patterns optimized = re.sub(r'DEBUG:.*?\n', '', optimized) optimized = re.sub(r'TRACE:.*?\n', '', optimized) optimized = re.sub(r'\[DEBUG\].*?\n', '', optimized) # Remove timestamp patterns (keep date for context) optimized = re.sub(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized) optimized = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized) # Remove excessive JSON indentation while preserving structure optimized = re.sub(r'\n {8,}', '\n ', optimized) # Reduce deep indentation # Remove empty JSON objects/arrays that add no semantic value optimized = re.sub(r'{\s*}', '{}', optimized) optimized = re.sub(r'\[\s*\]', '[]', optimized) # Normalize repeated patterns (like multiple "---" separators) optimized = re.sub(r'-{5,}', '-----', optimized) optimized = re.sub(r'={5,}', '=====', optimized) # Remove excessive blank lines in code/JSON blocks optimized = re.sub(r'(\{[^}]*)\n\n+([^}]*\})', r'\1\n\2', optimized) # Final cleanup optimized = optimized.strip() # Calculate statistics final_length = len(optimized) reduction = original_length - final_length reduction_percentage = (reduction / original_length) * 100 if original_length > 0 else 0 stats = { "original_length": original_length, "optimized_length": final_length, "characters_removed": reduction, "reduction_percentage": round(reduction_percentage, 2), "aggressive_mode": aggressive } logger.info(f"Content optimization complete: {reduction:,} characters removed ({reduction_percentage:.1f}% reduction)") return optimized, stats