Spaces:
Running
Running
| #!/usr/bin/env python | |
| """ | |
| Pure Trace Analysis Functions | |
| This module provides pure functions for analyzing trace content characteristics | |
| without any database dependencies. These functions can be used independently | |
| or by the backend services for trace management. | |
| """ | |
| import re | |
| import logging | |
| from typing import Dict, Any, Tuple | |
| logger = logging.getLogger(__name__) | |
| def analyze_trace_characteristics( | |
| trace_content: str, | |
| optimize_content: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze the trace content to determine its characteristics and optimal parameters | |
| for sliding window processing. | |
| Args: | |
| trace_content: The full content of the trace | |
| optimize_content: Whether to apply content optimization for cost savings | |
| Returns: | |
| Dictionary with trace characteristics and recommended parameters | |
| """ | |
| logger.info("Analyzing trace characteristics...") | |
| # Apply content optimization if requested | |
| optimization_stats = None | |
| if optimize_content: | |
| logger.info("Applying content optimization for cost savings...") | |
| trace_content, optimization_stats = preprocess_content_for_cost_optimization(trace_content, aggressive=True) | |
| # Calculate basic content metrics (on optimized content) | |
| total_length = len(trace_content) | |
| lines = trace_content.split('\n') | |
| line_count = len(lines) | |
| # Calculate average line length | |
| avg_line_length = sum(len(line) for line in lines) / max(1, line_count) | |
| # Check for code density (common programming keywords and syntax) | |
| code_pattern = r'function|class|def|return|import|export|const|var|let|async|await|if|else|for|while' | |
| code_matches = len(re.findall(code_pattern, trace_content)) | |
| code_density = code_matches / max(1, total_length/100) # Normalize per 100 chars | |
| # Check for structured data (JSON, XML, etc.) | |
| struct_pattern = r'[{}\[\],":]+|<\/?[a-z]+>|<\/[a-z]+>' | |
| struct_matches = len(re.findall(struct_pattern, trace_content)) | |
| struct_density = struct_matches / max(1, total_length/100) # Normalize per 100 chars | |
| # Calculate entity patterns (camelCase, PascalCase, snake_case identifiers) | |
| entity_pattern = r'\b[A-Z][a-z]+[A-Z][a-zA-Z]*\b|\b[a-z]+_[a-z_]+\b|\b[a-z]+[A-Z][a-zA-Z]*\b' | |
| entity_matches = len(re.findall(entity_pattern, trace_content)) | |
| entity_density = entity_matches / max(1, total_length/100) # Normalize per 100 chars | |
| # Check for complete interaction patterns - IMPROVED PATTERN | |
| # Look for full interaction blocks with system/user/assistant messages | |
| interaction_sizes = [] | |
| conversation_turn_count = 0 | |
| # Try to identify input/output blocks in JSON format | |
| input_output_pattern = r'"input":.*?"output":' | |
| io_blocks = re.findall(input_output_pattern, trace_content, re.DOTALL) | |
| if io_blocks: | |
| for block in io_blocks: | |
| interaction_sizes.append(len(block)) | |
| avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0 | |
| logger.info(f"Found {len(io_blocks)} input/output blocks with average size of {avg_interaction_size:.0f} characters") | |
| # If we couldn't find structured blocks, try to find conversation turns | |
| if not interaction_sizes: | |
| conversation_pattern = r'(user:|assistant:|system:).*?(user:|assistant:|system:|\Z)' | |
| conversation_blocks = re.findall(conversation_pattern, trace_content, re.DOTALL) | |
| if conversation_blocks: | |
| for block in conversation_blocks: | |
| # The full match is in the first element | |
| full_text = block[0] if isinstance(block, tuple) else block | |
| interaction_sizes.append(len(full_text)) | |
| avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0 | |
| conversation_turn_count = len(conversation_blocks) | |
| logger.info(f"Found {len(conversation_blocks)} conversation blocks with average size of {avg_interaction_size:.0f} characters") | |
| # Count patterns for conversation analysis | |
| human_turns = len(re.findall(r'\buser:', trace_content, re.IGNORECASE)) | |
| ai_turns = len(re.findall(r'\bassistant:', trace_content, re.IGNORECASE)) | |
| system_messages = len(re.findall(r'\bsystem:', trace_content, re.IGNORECASE)) | |
| # Base parameters (ultra-aggressive optimization for 1M token limit) | |
| base_window = 350000 # Increased from 200K to 350K characters (~87K tokens) | |
| base_overlap_ratio = 0.05 # Reduced from 12.5% to 5% overlap (ultra-minimal) | |
| # Adjust window size based on content characteristics | |
| if interaction_sizes: | |
| # Calculate max interaction size to ensure we capture full interactions | |
| max_interaction_size = max(interaction_sizes) | |
| # Use at least 2x the max interaction size to ensure complete interactions | |
| window_size = max(base_window, int(max_interaction_size * 2)) | |
| logger.info(f"Adjusting window size based on max interaction size ({max_interaction_size} chars): {window_size}") | |
| overlap_ratio = 0.08 # 8% overlap to ensure we don't split interactions (still very minimal) | |
| elif struct_density > 0.5: | |
| # High structured data density (JSON, XML) - use maximum windows | |
| window_size = 400000 # Use maximum window size | |
| overlap_ratio = 0.03 # 3% overlap for structured data (minimal) | |
| trace_type = "structured_data" | |
| elif code_density > 0.5: | |
| # High code density - use maximum windows to capture entire modules | |
| window_size = 400000 # Use maximum window size | |
| overlap_ratio = 0.05 # 5% overlap for code (minimal) | |
| trace_type = "code" | |
| elif entity_density > 0.5: | |
| # High entity density - use very large windows with minimal overlap | |
| window_size = int(base_window * 1.14) # ~400K window size | |
| overlap_ratio = 0.05 # 5% overlap for entity relationships | |
| trace_type = "entity_rich" | |
| elif avg_line_length > 150: | |
| # Very long lines (likely logs) - use maximum window size with minimal overlap | |
| window_size = 400000 # Maximum window size | |
| overlap_ratio = 0.03 # 3% overlap for logs (very minimal) | |
| trace_type = "log" | |
| elif avg_line_length < 50: | |
| # Short lines (conversation logs) - still use very large windows | |
| window_size = int(base_window * 1.14) # ~400K window size | |
| overlap_ratio = 0.08 # 8% overlap for context (still minimal) | |
| trace_type = "conversation" | |
| else: | |
| # Default case - use base parameters | |
| window_size = base_window | |
| overlap_ratio = base_overlap_ratio | |
| trace_type = "general" | |
| # Ultra-aggressive scaling for large documents | |
| if total_length > 800000: | |
| # Very large document - force to maximum window size | |
| window_size = 400000 # Maximum possible | |
| overlap_ratio = 0.03 # Minimal overlap | |
| logger.info(f"Ultra-large document detected ({total_length:,} chars): using maximum window size with minimal overlap") | |
| elif total_length > 400000: | |
| # Large document - use near-maximum window sizes | |
| window_size = min(400000, int(total_length * 0.6)) # 60% of total length or max | |
| overlap_ratio = 0.04 # Very minimal overlap | |
| logger.info(f"Large document detected ({total_length:,} chars): using near-maximum window size") | |
| elif total_length > 200000: | |
| # Medium-large document - use large windows | |
| window_size = min(400000, int(total_length * 0.8)) # 80% of total length or max | |
| overlap_ratio = 0.05 # Minimal overlap | |
| # Calculate overlap size from ratio | |
| overlap_size = int(window_size * overlap_ratio) | |
| # Ultra-aggressive constraints - push to absolute limits | |
| window_size = max(100000, min(400000, window_size)) # Between 100K-400K chars (25K-100K tokens) | |
| overlap_size = max(2000, min(20000, overlap_size)) # Between 2K-20K chars (minimal overlap) | |
| # Estimate number of windows needed | |
| estimated_windows = max(1, int((total_length - overlap_size) / (window_size - overlap_size) + 1)) | |
| # Determine trace type if it wasn't set above | |
| if 'trace_type' not in locals(): | |
| if human_turns > 0 and ai_turns > 0: | |
| trace_type = "conversation" | |
| elif code_density > 0.2: | |
| trace_type = "code" | |
| elif struct_density > 0.2: | |
| trace_type = "structured_data" | |
| else: | |
| trace_type = "general" | |
| # Return comprehensive trace analysis | |
| return { | |
| "total_length": total_length, | |
| "line_count": line_count, | |
| "avg_line_length": round(avg_line_length, 1), | |
| "code_density": round(code_density, 2), | |
| "struct_density": round(struct_density, 2), | |
| "entity_density": round(entity_density, 2), | |
| "interaction_count": len(interaction_sizes) if interaction_sizes else conversation_turn_count, | |
| "avg_interaction_size": round(sum(interaction_sizes) / len(interaction_sizes), 1) if interaction_sizes else 0, | |
| "human_turns": human_turns, | |
| "ai_turns": ai_turns, | |
| "system_messages": system_messages, | |
| "trace_type": trace_type, | |
| "recommended_window_size": window_size, | |
| "recommended_overlap_size": overlap_size, | |
| "estimated_windows": estimated_windows, | |
| "processing_complexity": "high" if estimated_windows > 10 else "medium" if estimated_windows > 3 else "low", | |
| "optimization_stats": optimization_stats | |
| } | |
| def display_trace_summary(analysis: Dict[str, Any]) -> None: | |
| """ | |
| Display a formatted summary of trace characteristics to the user. | |
| Args: | |
| analysis: Dictionary containing trace analysis data | |
| """ | |
| print("\n" + "=" * 80) | |
| print(f"TRACE ANALYSIS SUMMARY") | |
| print("=" * 80) | |
| # Basic statistics | |
| print(f"\nBASIC STATISTICS:") | |
| print(f" Total length: {analysis['total_length']:,} characters") | |
| print(f" Line count: {analysis['line_count']:,} lines") | |
| print(f" Average line length: {analysis['avg_line_length']:.1f} characters") | |
| # Content type | |
| print(f"\nCONTENT CHARACTERISTICS:") | |
| print(f" Detected trace type: {analysis['trace_type'].replace('_', ' ').title()}") | |
| if analysis['trace_type'] == 'conversation': | |
| print(f" Conversation turns: {analysis['interaction_count']} ") | |
| print(f" - Human messages: {analysis['human_turns']}") | |
| print(f" - AI messages: {analysis['ai_turns']}") | |
| print(f" - System messages: {analysis['system_messages']}") | |
| if analysis['avg_interaction_size'] > 0: | |
| print(f" Average message size: {analysis['avg_interaction_size']:.1f} characters") | |
| print(f" Content density metrics:") | |
| print(f" - Code density: {analysis['code_density']:.2f}") | |
| print(f" - Structured data density: {analysis['struct_density']:.2f}") | |
| print(f" - Entity density: {analysis['entity_density']:.2f}") | |
| # Processing recommendations | |
| print(f"\nPROCESSING RECOMMENDATIONS:") | |
| print(f" Recommended window size: {analysis['recommended_window_size']:,} characters") | |
| print(f" Recommended overlap size: {analysis['recommended_overlap_size']:,} characters") | |
| print(f" Estimated number of windows: {analysis['estimated_windows']}") | |
| print(f" Processing complexity: {analysis['processing_complexity'].title()}") | |
| # Processing time estimate | |
| base_time_per_window = 15 # seconds per window baseline | |
| if analysis['processing_complexity'] == 'high': | |
| time_factor = 1.5 | |
| elif analysis['processing_complexity'] == 'medium': | |
| time_factor = 1.0 | |
| else: | |
| time_factor = 0.8 | |
| estimated_time = base_time_per_window * analysis['estimated_windows'] * time_factor | |
| minutes = int(estimated_time / 60) | |
| seconds = int(estimated_time % 60) | |
| print(f" Estimated processing time: ~{minutes}m {seconds}s") | |
| print("\n" + "=" * 80) | |
| def preprocess_content_for_cost_optimization(content: str, aggressive: bool = True) -> Tuple[str, Dict[str, Any]]: | |
| """ | |
| Preprocess content to reduce character count while preserving semantic meaning. | |
| This can significantly reduce API costs by removing redundant data. | |
| Args: | |
| content: Original content | |
| aggressive: Whether to apply aggressive optimization | |
| Returns: | |
| Tuple of (optimized_content, optimization_stats) | |
| """ | |
| original_length = len(content) | |
| # Start with basic optimizations | |
| optimized = content | |
| # Remove excessive whitespace and normalize line endings | |
| optimized = re.sub(r'\n\s*\n\s*\n+', '\n\n', optimized) # Convert 3+ newlines to 2 | |
| optimized = re.sub(r'[ \t]+', ' ', optimized) # Multiple spaces/tabs to single space | |
| optimized = re.sub(r' +\n', '\n', optimized) # Remove trailing spaces | |
| if aggressive: | |
| # More aggressive optimizations that may slightly reduce semantic richness | |
| # but significantly reduce costs | |
| # Remove debug/verbose logging patterns | |
| optimized = re.sub(r'DEBUG:.*?\n', '', optimized) | |
| optimized = re.sub(r'TRACE:.*?\n', '', optimized) | |
| optimized = re.sub(r'\[DEBUG\].*?\n', '', optimized) | |
| # Remove timestamp patterns (keep date for context) | |
| optimized = re.sub(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized) | |
| optimized = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized) | |
| # Remove excessive JSON indentation while preserving structure | |
| optimized = re.sub(r'\n {8,}', '\n ', optimized) # Reduce deep indentation | |
| # Remove empty JSON objects/arrays that add no semantic value | |
| optimized = re.sub(r'{\s*}', '{}', optimized) | |
| optimized = re.sub(r'\[\s*\]', '[]', optimized) | |
| # Normalize repeated patterns (like multiple "---" separators) | |
| optimized = re.sub(r'-{5,}', '-----', optimized) | |
| optimized = re.sub(r'={5,}', '=====', optimized) | |
| # Remove excessive blank lines in code/JSON blocks | |
| optimized = re.sub(r'(\{[^}]*)\n\n+([^}]*\})', r'\1\n\2', optimized) | |
| # Final cleanup | |
| optimized = optimized.strip() | |
| # Calculate statistics | |
| final_length = len(optimized) | |
| reduction = original_length - final_length | |
| reduction_percentage = (reduction / original_length) * 100 if original_length > 0 else 0 | |
| stats = { | |
| "original_length": original_length, | |
| "optimized_length": final_length, | |
| "characters_removed": reduction, | |
| "reduction_percentage": round(reduction_percentage, 2), | |
| "aggressive_mode": aggressive | |
| } | |
| logger.info(f"Content optimization complete: {reduction:,} characters removed ({reduction_percentage:.1f}% reduction)") | |
| return optimized, stats |