| import time |
| from typing import Dict, List, Any, Optional |
| from dataclasses import dataclass, asdict |
| import re |
|
|
| |
| try: |
| import networkx as nx |
| import matplotlib.pyplot as plt |
| plt.switch_backend('Agg') |
| import matplotlib |
| matplotlib.use('Agg') |
| import warnings |
| warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib') |
| except ImportError: |
| print("Warning: Visualization deps missing. Install with: pip install networkx matplotlib") |
| nx = None |
| plt = None |
|
|
| @dataclass |
| class WorkflowStep: |
| step_id: str |
| step_type: str |
| timestamp: float |
| content: str |
| metadata: Dict[str, Any] |
| duration: Optional[float] = None |
| status: str = 'pending' |
| parent_step: Optional[str] = None |
| details: Optional[Dict[str, Any]] = None |
| mcp_server: Optional[str] = None |
| tool_name: Optional[str] = None |
|
|
| class EnhancedWorkflowVisualizer: |
| def __init__(self): |
| self.steps: List[WorkflowStep] = [] |
| self.current_step: Optional[WorkflowStep] = None |
| self.start_time = time.time() |
| self.step_counter = 0 |
| |
| |
| self.server_display_names = { |
| "7860": "Semantic Server", |
| "7861": "Token Counter", |
| "7862": "Sentiment Analysis", |
| "7863": "Health Monitor" |
| } |
| |
| def _extract_mcp_server_from_url(self, url_or_content: str) -> Optional[str]: |
| """Extract MCP server name from URL or content.""" |
| if not url_or_content: |
| return None |
| |
| |
| port_match = re.search(r':(\d{4})', url_or_content) |
| if port_match: |
| port = port_match.group(1) |
| return self.server_display_names.get(port, f"Port {port}") |
| |
| |
| if "semantic" in url_or_content.lower(): |
| return "Semantic Server" |
| elif "token" in url_or_content.lower(): |
| return "Token Counter" |
| elif "sentiment" in url_or_content.lower(): |
| return "Sentiment Analysis" |
| elif "health" in url_or_content.lower(): |
| return "Health Monitor" |
| |
| return None |
| |
| def _extract_tool_name(self, content: str) -> Optional[str]: |
| """Extract tool name from content.""" |
| |
| function_patterns = [ |
| |
| r'\b(sentiment_analysis)\s*\(', |
| r'\b(count_tokens_openai_gpt4)\s*\(', |
| r'\b(count_tokens_openai_gpt3)\s*\(', |
| r'\b(count_tokens_openai_davinci)\s*\(', |
| r'\b(count_tokens_bert_family)\s*\(', |
| r'\b(count_tokens_roberta_family)\s*\(', |
| r'\b(count_tokens_gpt2_family)\s*\(', |
| r'\b(count_tokens_t5_family)\s*\(', |
| r'\b(count_tokens_distilbert)\s*\(', |
| r'\b(semantic_similarity)\s*\(', |
| r'\b(find_similar_sentences)\s*\(', |
| r'\b(extract_semantic_keywords)\s*\(', |
| r'\b(semantic_search_in_text)\s*\(', |
| r'\b(health_check)\s*\(', |
| r'\b(server_status)\s*\(', |
| r'\b(get_server_info)\s*\(', |
| |
| |
| r'(\w*sentiment_analysis\w*)', |
| r'(\w*semantic_similarity\w*)', |
| r'(\w*find_similar_sentences\w*)', |
| r'(\w*extract_semantic_keywords\w*)', |
| r'(\w*semantic_search_in_text\w*)', |
| r'(\w*count_tokens_\w+)', |
| r'(\w*health_check\w*)', |
| r'(\w*server_status\w*)' |
| ] |
| |
| |
| for pattern in function_patterns: |
| match = re.search(pattern, content, re.IGNORECASE) |
| if match: |
| tool_name = match.group(1) |
| |
| if tool_name not in ['print', 'len', 'str', 'int', 'float', 'final_answer', 'sse', 'model']: |
| return tool_name |
| |
| |
| if "count_tokens_openai_gpt4" in content: |
| return "count_tokens_openai_gpt4" |
| elif "sentiment_analysis" in content: |
| return "sentiment_analysis" |
| elif "extract_semantic_keywords" in content: |
| return "extract_semantic_keywords" |
| elif "semantic_similarity" in content: |
| return "semantic_similarity" |
| elif "find_similar_sentences" in content: |
| return "find_similar_sentences" |
| elif "semantic_search_in_text" in content: |
| return "semantic_search_in_text" |
| |
| return None |
| |
| def add_step(self, step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, |
| parent_step: Optional[str] = None, details: Optional[Dict[str, Any]] = None, |
| mcp_server: Optional[str] = None, tool_name: Optional[str] = None) -> str: |
| step_id = f"{step_type}_{self.step_counter}" |
| self.step_counter += 1 |
| |
| |
| if not mcp_server: |
| mcp_server = self._extract_mcp_server_from_url(content) |
| if not tool_name: |
| tool_name = self._extract_tool_name(content) |
| |
| step = WorkflowStep( |
| step_id=step_id, |
| step_type=step_type, |
| timestamp=time.time(), |
| content=content, |
| metadata=metadata or {}, |
| status='running', |
| parent_step=parent_step, |
| details=details or {}, |
| mcp_server=mcp_server, |
| tool_name=tool_name |
| ) |
| self.steps.append(step) |
| self.current_step = step |
| return step_id |
| |
| def complete_step(self, step_id: str, status: str = 'completed', |
| additional_metadata: Optional[Dict[str, Any]] = None, |
| details: Optional[Dict[str, Any]] = None): |
| for step in self.steps: |
| if step.step_id == step_id: |
| step.status = status |
| step.duration = time.time() - step.timestamp |
| if additional_metadata and step.metadata is not None: |
| step.metadata.update(additional_metadata) |
| if details and step.details is not None: |
| step.details.update(details) |
| break |
| |
| def add_communication_step(self, from_component: str, to_component: str, |
| message_type: str, content: str, |
| parent_step: Optional[str] = None) -> str: |
| """Add a communication step between components.""" |
| step_type = f"comm_{from_component}_to_{to_component}" |
| |
| |
| mcp_server = self._extract_mcp_server_from_url(content) |
| tool_name = self._extract_tool_name(content) |
| |
| details = { |
| "from": from_component, |
| "to": to_component, |
| "message_type": message_type, |
| "content_preview": content[:100] + "..." if len(content) > 100 else content |
| } |
| return self.add_step(step_type, f"{message_type}: {from_component} → {to_component}", |
| parent_step=parent_step, details=details, |
| mcp_server=mcp_server, tool_name=tool_name) |
| |
| def add_tool_execution_step(self, tool_name: str, mcp_server: str, |
| input_data: str, parent_step: Optional[str] = None) -> str: |
| """Specialized method for tool execution steps.""" |
| content = f"Executing {tool_name} on {mcp_server}" |
| return self.add_step("tool_execution", content, |
| parent_step=parent_step, |
| mcp_server=mcp_server, |
| tool_name=tool_name, |
| details={"input_preview": input_data[:50] + "..." if len(input_data) > 50 else input_data}) |
| |
| def generate_graph(self) -> Any: |
| if nx is None: |
| return None |
| |
| G = nx.DiGraph() |
| |
| |
| color_map = { |
| 'input': '#e3f2fd', |
| 'agent_init': '#f3e5f5', |
| 'agent_process': '#e8f5e8', |
| 'comm_agent_to_mcp': '#fff3e0', |
| 'comm_mcp_to_server': '#ffebee', |
| 'comm_server_to_mcp': '#e0f2f1', |
| 'comm_mcp_to_agent': '#f9fbe7', |
| 'llm_call': '#fce4ec', |
| 'tool_execution': '#e1f5fe', |
| 'response': '#f1f8e9', |
| 'error': '#ffcdd2' |
| } |
| |
| |
| for step in self.steps: |
| color = color_map.get(step.step_type, '#f5f5f5') |
| |
| |
| duration_str = f" ({step.duration:.2f}s)" if step.duration else "" |
| |
| |
| label_parts = [] |
| |
| |
| step_display = step.step_type.replace('_', ' ').title() |
| label_parts.append(step_display) |
| |
| |
| if step.mcp_server: |
| label_parts.append(f"📡 {step.mcp_server}") |
| |
| |
| if step.tool_name: |
| label_parts.append(f"🔧 {step.tool_name}") |
| |
| |
| content_preview = step.content[:20] + "..." if len(step.content) > 20 else step.content |
| if not step.tool_name or step.tool_name.lower() not in content_preview.lower(): |
| label_parts.append(content_preview) |
| |
| |
| if duration_str: |
| label_parts.append(duration_str) |
| |
| label = "\n".join(label_parts) |
| |
| G.add_node(step.step_id, |
| label=label, |
| color=color, |
| step_type=step.step_type, |
| status=step.status, |
| mcp_server=step.mcp_server, |
| tool_name=step.tool_name) |
| |
| |
| for i, step in enumerate(self.steps): |
| if step.parent_step: |
| |
| G.add_edge(step.parent_step, step.step_id, edge_type='parent') |
| elif i > 0: |
| |
| G.add_edge(self.steps[i-1].step_id, step.step_id, edge_type='sequence') |
| |
| return G |
| |
| def create_matplotlib_visualization(self) -> str: |
| if nx is None or plt is None: |
| return "" |
| |
| G = self.generate_graph() |
| if not G or len(G.nodes()) == 0: |
| return "" |
| |
| |
| fig, ax = plt.subplots(figsize=(20, 12)) |
| |
| |
| try: |
| pos = nx.spring_layout(G, k=3, iterations=150, seed=42) |
| except: |
| pos = nx.circular_layout(G) |
| |
| |
| node_colors = [] |
| node_labels = {} |
| node_sizes = [] |
| |
| for node_id in G.nodes(): |
| step = next(s for s in self.steps if s.step_id == node_id) |
| |
| |
| if step.status == 'error': |
| color = '#ff5252' |
| elif step.status == 'completed': |
| |
| if step.mcp_server == "Semantic Server": |
| base_color = '#4caf50' |
| elif step.mcp_server == "Token Counter": |
| base_color = '#2196f3' |
| elif step.mcp_server == "Sentiment Analysis": |
| base_color = '#ff9800' |
| elif step.mcp_server == "Health Monitor": |
| base_color = '#9c27b0' |
| else: |
| |
| base_colors = { |
| 'input': '#4caf50', |
| 'agent_init': '#9c27b0', |
| 'agent_process': '#2e7d32', |
| 'comm_agent_to_mcp': '#ff9800', |
| 'comm_mcp_to_server': '#f44336', |
| 'comm_server_to_mcp': '#009688', |
| 'comm_mcp_to_agent': '#8bc34a', |
| 'llm_call': '#e91e63', |
| 'tool_execution': '#03a9f4', |
| 'response': '#4caf50' |
| } |
| base_color = base_colors.get(step.step_type, '#607d8b') |
| color = base_color |
| else: |
| color = '#bdbdbd' |
| |
| node_colors.append(color) |
| |
| |
| label_parts = [] |
| |
| |
| step_display = step.step_type.replace('_', ' ').title() |
| label_parts.append(f"**{step_display}**") |
| |
| |
| if step.mcp_server: |
| label_parts.append(f"📡 {step.mcp_server}") |
| |
| |
| if step.tool_name: |
| label_parts.append(f"🔧 **{step.tool_name}**") |
| |
| |
| if step.duration: |
| label_parts.append(f"⏱️ {step.duration:.2f}s") |
| |
| node_labels[node_id] = "\n".join(label_parts) |
| |
| |
| if step.step_type == 'tool_execution': |
| node_sizes.append(5000) |
| elif step.step_type in ['input', 'response']: |
| node_sizes.append(4000) |
| elif 'comm_' in step.step_type: |
| node_sizes.append(2500) |
| else: |
| node_sizes.append(3000) |
| |
| |
| nx.draw(G, pos, |
| node_color=node_colors, |
| node_size=node_sizes, |
| font_size=9, |
| font_weight='bold', |
| arrows=True, |
| arrowsize=20, |
| edge_color='#666666', |
| alpha=0.9, |
| ax=ax, |
| arrowstyle='->') |
| |
| |
| nx.draw_networkx_labels(G, pos, node_labels, font_size=8, ax=ax) |
| |
| |
| ax.set_title("MCP Agent Workflow: Server & Tool Execution Flow", |
| fontsize=20, pad=25, fontweight='bold') |
| ax.axis('off') |
| |
| |
| legend_elements = [ |
| plt.Rectangle((0,0),1,1, facecolor='#4caf50', label='Semantic Server'), |
| plt.Rectangle((0,0),1,1, facecolor='#2196f3', label='Token Counter Server'), |
| plt.Rectangle((0,0),1,1, facecolor='#ff9800', label='Sentiment Analysis Server'), |
| plt.Rectangle((0,0),1,1, facecolor='#9c27b0', label='Health Monitor Server'), |
| plt.Rectangle((0,0),1,1, facecolor='#e91e63', label='LLM Calls'), |
| plt.Rectangle((0,0),1,1, facecolor='#607d8b', label='Agent Processing'), |
| ] |
| ax.legend(handles=legend_elements, loc='upper left', bbox_to_anchor=(0, 1)) |
| |
| fig.set_constrained_layout(True) |
| |
| |
| import tempfile |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') |
| plt.savefig(temp_file.name, format='png', dpi=300, bbox_inches='tight') |
| plt.close(fig) |
| |
| return temp_file.name |
| |
| def get_workflow_summary(self) -> Dict[str, Any]: |
| total_duration = time.time() - self.start_time |
| |
| |
| step_counts = {} |
| server_usage = {} |
| tool_usage = {} |
| communication_steps = [] |
| processing_steps = [] |
| |
| for step in self.steps: |
| step_counts[step.step_type] = step_counts.get(step.step_type, 0) + 1 |
| |
| |
| if step.mcp_server: |
| server_usage[step.mcp_server] = server_usage.get(step.mcp_server, 0) + 1 |
| |
| |
| if step.tool_name: |
| tool_usage[step.tool_name] = tool_usage.get(step.tool_name, 0) + 1 |
| |
| if 'comm_' in step.step_type: |
| communication_steps.append({ |
| 'step_id': step.step_id, |
| 'from': step.details.get('from', 'unknown') if step.details else 'unknown', |
| 'to': step.details.get('to', 'unknown') if step.details else 'unknown', |
| 'message_type': step.details.get('message_type', 'unknown') if step.details else 'unknown', |
| 'mcp_server': step.mcp_server, |
| 'tool_name': step.tool_name, |
| 'duration': step.duration, |
| 'status': step.status |
| }) |
| else: |
| processing_steps.append({ |
| 'step_id': step.step_id, |
| 'type': step.step_type, |
| 'content': step.content[:50] + "..." if len(step.content) > 50 else step.content, |
| 'mcp_server': step.mcp_server, |
| 'tool_name': step.tool_name, |
| 'duration': step.duration, |
| 'status': step.status |
| }) |
| |
| |
| completed_steps = [s for s in self.steps if s.duration is not None] |
| avg_duration = (sum(s.duration or 0 for s in completed_steps) / len(completed_steps)) if completed_steps else 0 |
| |
| return { |
| 'total_steps': len(self.steps), |
| 'total_duration': round(total_duration, 3), |
| 'average_step_duration': round(avg_duration, 3), |
| 'step_counts': step_counts, |
| 'server_usage': server_usage, |
| 'tool_usage': tool_usage, |
| 'communication_flow': communication_steps, |
| 'processing_steps': processing_steps, |
| 'status': 'completed' if all(s.status in ['completed', 'error'] for s in self.steps) else 'running', |
| 'error_count': sum(1 for s in self.steps if s.status == 'error'), |
| 'success_rate': round((sum(1 for s in self.steps if s.status == 'completed') / len(self.steps)) * 100, 1) if self.steps else 0, |
| 'detailed_steps': [asdict(s) for s in self.steps] |
| } |
|
|
| |
| workflow_visualizer = EnhancedWorkflowVisualizer() |
|
|
| |
| def track_workflow_step(step_type: str, content: str, metadata: Optional[Dict[str, Any]] = None, |
| parent_step: Optional[str] = None, mcp_server: Optional[str] = None, |
| tool_name: Optional[str] = None) -> str: |
| return workflow_visualizer.add_step(step_type, content, metadata, parent_step, |
| mcp_server=mcp_server, tool_name=tool_name) |
|
|
| def track_communication(from_component: str, to_component: str, message_type: str, |
| content: str, parent_step: Optional[str] = None) -> str: |
| return workflow_visualizer.add_communication_step(from_component, to_component, |
| message_type, content, parent_step) |
|
|
| def track_tool_execution(tool_name: str, mcp_server: str, input_data: str, |
| parent_step: Optional[str] = None) -> str: |
| """New helper for tracking tool executions with clear server/tool info.""" |
| return workflow_visualizer.add_tool_execution_step(tool_name, mcp_server, input_data, parent_step) |
|
|
| def complete_workflow_step(step_id: str, status: str = 'completed', |
| metadata: Optional[Dict[str, Any]] = None, |
| details: Optional[Dict[str, Any]] = None): |
| workflow_visualizer.complete_step(step_id, status, metadata, details) |
|
|
| def get_workflow_visualization() -> str: |
| return workflow_visualizer.create_matplotlib_visualization() |
|
|
| def get_workflow_summary() -> Dict[str, Any]: |
| return workflow_visualizer.get_workflow_summary() |
|
|
| def reset_workflow(): |
| global workflow_visualizer |
| workflow_visualizer = EnhancedWorkflowVisualizer() |