wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
#!/usr/bin/env python
"""
Pure Trace Analysis Functions
This module provides pure functions for analyzing trace content characteristics
without any database dependencies. These functions can be used independently
or by the backend services for trace management.
"""
import re
import logging
from typing import Dict, Any, Tuple
logger = logging.getLogger(__name__)
def analyze_trace_characteristics(
trace_content: str,
optimize_content: bool = True
) -> Dict[str, Any]:
"""
Analyze the trace content to determine its characteristics and optimal parameters
for sliding window processing.
Args:
trace_content: The full content of the trace
optimize_content: Whether to apply content optimization for cost savings
Returns:
Dictionary with trace characteristics and recommended parameters
"""
logger.info("Analyzing trace characteristics...")
# Apply content optimization if requested
optimization_stats = None
if optimize_content:
logger.info("Applying content optimization for cost savings...")
trace_content, optimization_stats = preprocess_content_for_cost_optimization(trace_content, aggressive=True)
# Calculate basic content metrics (on optimized content)
total_length = len(trace_content)
lines = trace_content.split('\n')
line_count = len(lines)
# Calculate average line length
avg_line_length = sum(len(line) for line in lines) / max(1, line_count)
# Check for code density (common programming keywords and syntax)
code_pattern = r'function|class|def|return|import|export|const|var|let|async|await|if|else|for|while'
code_matches = len(re.findall(code_pattern, trace_content))
code_density = code_matches / max(1, total_length/100) # Normalize per 100 chars
# Check for structured data (JSON, XML, etc.)
struct_pattern = r'[{}\[\],":]+|<\/?[a-z]+>|<\/[a-z]+>'
struct_matches = len(re.findall(struct_pattern, trace_content))
struct_density = struct_matches / max(1, total_length/100) # Normalize per 100 chars
# Calculate entity patterns (camelCase, PascalCase, snake_case identifiers)
entity_pattern = r'\b[A-Z][a-z]+[A-Z][a-zA-Z]*\b|\b[a-z]+_[a-z_]+\b|\b[a-z]+[A-Z][a-zA-Z]*\b'
entity_matches = len(re.findall(entity_pattern, trace_content))
entity_density = entity_matches / max(1, total_length/100) # Normalize per 100 chars
# Check for complete interaction patterns - IMPROVED PATTERN
# Look for full interaction blocks with system/user/assistant messages
interaction_sizes = []
conversation_turn_count = 0
# Try to identify input/output blocks in JSON format
input_output_pattern = r'"input":.*?"output":'
io_blocks = re.findall(input_output_pattern, trace_content, re.DOTALL)
if io_blocks:
for block in io_blocks:
interaction_sizes.append(len(block))
avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0
logger.info(f"Found {len(io_blocks)} input/output blocks with average size of {avg_interaction_size:.0f} characters")
# If we couldn't find structured blocks, try to find conversation turns
if not interaction_sizes:
conversation_pattern = r'(user:|assistant:|system:).*?(user:|assistant:|system:|\Z)'
conversation_blocks = re.findall(conversation_pattern, trace_content, re.DOTALL)
if conversation_blocks:
for block in conversation_blocks:
# The full match is in the first element
full_text = block[0] if isinstance(block, tuple) else block
interaction_sizes.append(len(full_text))
avg_interaction_size = sum(interaction_sizes) / len(interaction_sizes) if interaction_sizes else 0
conversation_turn_count = len(conversation_blocks)
logger.info(f"Found {len(conversation_blocks)} conversation blocks with average size of {avg_interaction_size:.0f} characters")
# Count patterns for conversation analysis
human_turns = len(re.findall(r'\buser:', trace_content, re.IGNORECASE))
ai_turns = len(re.findall(r'\bassistant:', trace_content, re.IGNORECASE))
system_messages = len(re.findall(r'\bsystem:', trace_content, re.IGNORECASE))
# Base parameters (ultra-aggressive optimization for 1M token limit)
base_window = 350000 # Increased from 200K to 350K characters (~87K tokens)
base_overlap_ratio = 0.05 # Reduced from 12.5% to 5% overlap (ultra-minimal)
# Adjust window size based on content characteristics
if interaction_sizes:
# Calculate max interaction size to ensure we capture full interactions
max_interaction_size = max(interaction_sizes)
# Use at least 2x the max interaction size to ensure complete interactions
window_size = max(base_window, int(max_interaction_size * 2))
logger.info(f"Adjusting window size based on max interaction size ({max_interaction_size} chars): {window_size}")
overlap_ratio = 0.08 # 8% overlap to ensure we don't split interactions (still very minimal)
elif struct_density > 0.5:
# High structured data density (JSON, XML) - use maximum windows
window_size = 400000 # Use maximum window size
overlap_ratio = 0.03 # 3% overlap for structured data (minimal)
trace_type = "structured_data"
elif code_density > 0.5:
# High code density - use maximum windows to capture entire modules
window_size = 400000 # Use maximum window size
overlap_ratio = 0.05 # 5% overlap for code (minimal)
trace_type = "code"
elif entity_density > 0.5:
# High entity density - use very large windows with minimal overlap
window_size = int(base_window * 1.14) # ~400K window size
overlap_ratio = 0.05 # 5% overlap for entity relationships
trace_type = "entity_rich"
elif avg_line_length > 150:
# Very long lines (likely logs) - use maximum window size with minimal overlap
window_size = 400000 # Maximum window size
overlap_ratio = 0.03 # 3% overlap for logs (very minimal)
trace_type = "log"
elif avg_line_length < 50:
# Short lines (conversation logs) - still use very large windows
window_size = int(base_window * 1.14) # ~400K window size
overlap_ratio = 0.08 # 8% overlap for context (still minimal)
trace_type = "conversation"
else:
# Default case - use base parameters
window_size = base_window
overlap_ratio = base_overlap_ratio
trace_type = "general"
# Ultra-aggressive scaling for large documents
if total_length > 800000:
# Very large document - force to maximum window size
window_size = 400000 # Maximum possible
overlap_ratio = 0.03 # Minimal overlap
logger.info(f"Ultra-large document detected ({total_length:,} chars): using maximum window size with minimal overlap")
elif total_length > 400000:
# Large document - use near-maximum window sizes
window_size = min(400000, int(total_length * 0.6)) # 60% of total length or max
overlap_ratio = 0.04 # Very minimal overlap
logger.info(f"Large document detected ({total_length:,} chars): using near-maximum window size")
elif total_length > 200000:
# Medium-large document - use large windows
window_size = min(400000, int(total_length * 0.8)) # 80% of total length or max
overlap_ratio = 0.05 # Minimal overlap
# Calculate overlap size from ratio
overlap_size = int(window_size * overlap_ratio)
# Ultra-aggressive constraints - push to absolute limits
window_size = max(100000, min(400000, window_size)) # Between 100K-400K chars (25K-100K tokens)
overlap_size = max(2000, min(20000, overlap_size)) # Between 2K-20K chars (minimal overlap)
# Estimate number of windows needed
estimated_windows = max(1, int((total_length - overlap_size) / (window_size - overlap_size) + 1))
# Determine trace type if it wasn't set above
if 'trace_type' not in locals():
if human_turns > 0 and ai_turns > 0:
trace_type = "conversation"
elif code_density > 0.2:
trace_type = "code"
elif struct_density > 0.2:
trace_type = "structured_data"
else:
trace_type = "general"
# Return comprehensive trace analysis
return {
"total_length": total_length,
"line_count": line_count,
"avg_line_length": round(avg_line_length, 1),
"code_density": round(code_density, 2),
"struct_density": round(struct_density, 2),
"entity_density": round(entity_density, 2),
"interaction_count": len(interaction_sizes) if interaction_sizes else conversation_turn_count,
"avg_interaction_size": round(sum(interaction_sizes) / len(interaction_sizes), 1) if interaction_sizes else 0,
"human_turns": human_turns,
"ai_turns": ai_turns,
"system_messages": system_messages,
"trace_type": trace_type,
"recommended_window_size": window_size,
"recommended_overlap_size": overlap_size,
"estimated_windows": estimated_windows,
"processing_complexity": "high" if estimated_windows > 10 else "medium" if estimated_windows > 3 else "low",
"optimization_stats": optimization_stats
}
def display_trace_summary(analysis: Dict[str, Any]) -> None:
"""
Display a formatted summary of trace characteristics to the user.
Args:
analysis: Dictionary containing trace analysis data
"""
print("\n" + "=" * 80)
print(f"TRACE ANALYSIS SUMMARY")
print("=" * 80)
# Basic statistics
print(f"\nBASIC STATISTICS:")
print(f" Total length: {analysis['total_length']:,} characters")
print(f" Line count: {analysis['line_count']:,} lines")
print(f" Average line length: {analysis['avg_line_length']:.1f} characters")
# Content type
print(f"\nCONTENT CHARACTERISTICS:")
print(f" Detected trace type: {analysis['trace_type'].replace('_', ' ').title()}")
if analysis['trace_type'] == 'conversation':
print(f" Conversation turns: {analysis['interaction_count']} ")
print(f" - Human messages: {analysis['human_turns']}")
print(f" - AI messages: {analysis['ai_turns']}")
print(f" - System messages: {analysis['system_messages']}")
if analysis['avg_interaction_size'] > 0:
print(f" Average message size: {analysis['avg_interaction_size']:.1f} characters")
print(f" Content density metrics:")
print(f" - Code density: {analysis['code_density']:.2f}")
print(f" - Structured data density: {analysis['struct_density']:.2f}")
print(f" - Entity density: {analysis['entity_density']:.2f}")
# Processing recommendations
print(f"\nPROCESSING RECOMMENDATIONS:")
print(f" Recommended window size: {analysis['recommended_window_size']:,} characters")
print(f" Recommended overlap size: {analysis['recommended_overlap_size']:,} characters")
print(f" Estimated number of windows: {analysis['estimated_windows']}")
print(f" Processing complexity: {analysis['processing_complexity'].title()}")
# Processing time estimate
base_time_per_window = 15 # seconds per window baseline
if analysis['processing_complexity'] == 'high':
time_factor = 1.5
elif analysis['processing_complexity'] == 'medium':
time_factor = 1.0
else:
time_factor = 0.8
estimated_time = base_time_per_window * analysis['estimated_windows'] * time_factor
minutes = int(estimated_time / 60)
seconds = int(estimated_time % 60)
print(f" Estimated processing time: ~{minutes}m {seconds}s")
print("\n" + "=" * 80)
def preprocess_content_for_cost_optimization(content: str, aggressive: bool = True) -> Tuple[str, Dict[str, Any]]:
"""
Preprocess content to reduce character count while preserving semantic meaning.
This can significantly reduce API costs by removing redundant data.
Args:
content: Original content
aggressive: Whether to apply aggressive optimization
Returns:
Tuple of (optimized_content, optimization_stats)
"""
original_length = len(content)
# Start with basic optimizations
optimized = content
# Remove excessive whitespace and normalize line endings
optimized = re.sub(r'\n\s*\n\s*\n+', '\n\n', optimized) # Convert 3+ newlines to 2
optimized = re.sub(r'[ \t]+', ' ', optimized) # Multiple spaces/tabs to single space
optimized = re.sub(r' +\n', '\n', optimized) # Remove trailing spaces
if aggressive:
# More aggressive optimizations that may slightly reduce semantic richness
# but significantly reduce costs
# Remove debug/verbose logging patterns
optimized = re.sub(r'DEBUG:.*?\n', '', optimized)
optimized = re.sub(r'TRACE:.*?\n', '', optimized)
optimized = re.sub(r'\[DEBUG\].*?\n', '', optimized)
# Remove timestamp patterns (keep date for context)
optimized = re.sub(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized)
optimized = re.sub(r'\d{2}:\d{2}:\d{2}\.\d{3,6}', '', optimized)
# Remove excessive JSON indentation while preserving structure
optimized = re.sub(r'\n {8,}', '\n ', optimized) # Reduce deep indentation
# Remove empty JSON objects/arrays that add no semantic value
optimized = re.sub(r'{\s*}', '{}', optimized)
optimized = re.sub(r'\[\s*\]', '[]', optimized)
# Normalize repeated patterns (like multiple "---" separators)
optimized = re.sub(r'-{5,}', '-----', optimized)
optimized = re.sub(r'={5,}', '=====', optimized)
# Remove excessive blank lines in code/JSON blocks
optimized = re.sub(r'(\{[^}]*)\n\n+([^}]*\})', r'\1\n\2', optimized)
# Final cleanup
optimized = optimized.strip()
# Calculate statistics
final_length = len(optimized)
reduction = original_length - final_length
reduction_percentage = (reduction / original_length) * 100 if original_length > 0 else 0
stats = {
"original_length": original_length,
"optimized_length": final_length,
"characters_removed": reduction,
"reduction_percentage": round(reduction_percentage, 2),
"aggressive_mode": aggressive
}
logger.info(f"Content optimization complete: {reduction:,} characters removed ({reduction_percentage:.1f}% reduction)")
return optimized, stats