from dataclasses import asdict, dataclass, field import os import pickle from typing import Dict, List, Optional, Any import gradio as gr import json import tempfile import asyncio import uuid from dataclasses import dataclass, asdict from typing import List, Dict, Optional, Any from openai import AsyncOpenAI import base64 # Complete Hierarchical Component definitions with Implementation Details COMPONENT_INFO = { "SYSTEM": { "description": "Top-level system architecture containing all components", "color": "#333333", "icon": "🌐", "shape": "folder", "sub_components": ["AGENT", "USER", "TOOL", "DATA", "PROCESSOR", "ROUTER", "INFRASTRUCTURE", "CONFIG"] }, # =================================== # AGENT: Autonomous reasoning units # =================================== "AGENT": { "description": "Autonomous reasoning and decision-making units", "color": "#4CAF50", "icon": "🤖", "shape": "rect", "sub_components": ["REASONING_AGENT", "ACTION_AGENT", "PLANNER_AGENT", "REACT_AGENT", "MULTI_AGENT"] }, "REASONING_AGENT": { "shape": "rect", "color": "#4CAF50", "icon": "🧠", "description": [ "• Performs complex reasoning tasks", "• Uses chain-of-thought or tree-of-thought", "• Can break down complex problems", "• Maintains reasoning traces" ], "implementation": { "python_snippet": """ class ReasoningAgent: def __init__(self, model, tools=None): self.model = model self.tools = tools or [] self.reasoning_history = [] async def process(self, query, context=None): # Chain-of-thought reasoning reasoning_steps = await self.generate_reasoning_steps(query, context) self.reasoning_history.extend(reasoning_steps) # Final answer generation answer = await self.synthesize_answer(reasoning_steps) return answer async def generate_reasoning_steps(self, query, context): prompt = f\"\"\"Analyze this problem step by step: Query: {query} Context: {context} Break down your reasoning:\"\"\" return await self.model.generate(prompt) """, "prompt_template": """ You are a reasoning agent. Analyze the user's query step by step: Query: {user_input} Context: {context} Please: 1. Break down the problem into logical steps 2. Consider different perspectives 3. Evaluate evidence and constraints 4. Synthesize a comprehensive answer Reasoning steps: """, "dependencies": ["openai", "langchain", "pydantic"], "config": { "model": "gpt-4", "temperature": 0.1, "max_tokens": 2000 } } }, "ACTION_AGENT": { "shape": "rect", "color": "#4CAF50", "icon": "⚡", "description": [ "• Executes actions using available tools", "• Monitors action outcomes", "• Handles errors and retries", "• Updates state after actions" ], "implementation": { "python_snippet": """ class ActionAgent: def __init__(self, tools, model): self.tools = {tool.name: tool for tool in tools} self.model = model async def execute_action(self, action_request): tool_name, parameters = self.parse_action(action_request) if tool_name in self.tools: return await self.tools[tool_name].execute(parameters) else: raise ValueError(f"Unknown tool: {tool_name}") def parse_action(self, action_request): # Parse action from model response return action_request['tool'], action_request['parameters'] """, "dependencies": ["pydantic", "asyncio"], "config": { "retry_attempts": 3, "timeout_seconds": 30 } } }, "PLANNER_AGENT": { "shape": "rect", "color": "#4CAF50", "icon": "📋", "description": [ "• Creates multi-step plans to achieve goals", "• Decomposes complex tasks", "• Optimizes execution order", "• Monitors plan progress" ], "implementation": { "python_snippet": """ class PlannerAgent: def __init__(self, model): self.model = model self.plans = [] async def create_plan(self, goal, context): plan_prompt = f\"\"\"Create a step-by-step plan to achieve: Goal: {goal} Context: {context} Return a list of actionable steps:\"\"\" plan_steps = await self.model.generate(plan_prompt) plan = Plan(steps=plan_steps, goal=goal) self.plans.append(plan) return plan """, "dependencies": ["pydantic"], "config": { "max_steps": 20, "planning_temperature": 0.3 } } }, "REACT_AGENT": { "shape": "rect", "color": "#4CAF50", "icon": "🔄", "description": [ "• Implements ReAct (Reason + Act) framework", "• Alternates reasoning and action steps", "• Maintains conversation history", "• Handles tool interactions" ], "implementation": { "python_snippet": """ class ReActAgent: def __init__(self, model, tools): self.model = model self.tools = tools self.conversation_history = [] async def step(self, input_text): # Generate thought thought = await self.generate_thought(input_text) # Decide on action action = await self.decide_action(thought) # Execute action if needed if action: observation = await self.execute_action(action) return {"thought": thought, "action": action, "observation": observation} else: return {"thought": thought, "answer": await self.generate_answer(thought)} """, "dependencies": ["asyncio", "langchain"], "config": { "max_iterations": 10, "react_temperature": 0.7 } } }, "MULTI_AGENT": { "shape": "rect", "color": "#4CAF50", "icon": "👥", "description": [ "• Coordinates multiple specialized agents", "• Manages agent communication", "• Distributes tasks among agents", "• Aggregates results from agents" ], "implementation": { "python_snippet": """ class MultiAgentSystem: def __init__(self, agents, orchestrator): self.agents = {agent.name: agent for agent in agents} self.orchestrator = orchestrator async def coordinate(self, task): # Assign task to appropriate agents agent_assignments = await self.orchestrator.assign(task) # Execute in parallel results = await asyncio.gather(*[ self.agents[agent_name].process(subtask) for agent_name, subtask in agent_assignments.items() ]) return self.orchestrator.aggregate(results) """, "dependencies": ["asyncio", "concurrent.futures"], "config": { "max_concurrent_agents": 10, "communication_protocol": "message_queue" } } }, # =================================== # USER: Interaction interfaces # =================================== "USER": { "description": "User interaction points and interfaces", "color": "#9C27B0", "icon": "👤", "shape": "ellipse", "sub_components": ["USER_INPUT", "USER_OUTPUT", "MULTIMODAL_INTERFACE"] }, "USER_INPUT": { "shape": "ellipse", "color": "#9C27B0", "icon": "⌨️", "description": [ "• Accepts text, voice, or gesture input", "• Validates and sanitizes input", "• Converts to structured format", "• Handles multiple input channels" ], "implementation": { "python_snippet": """ class UserInputHandler: def __init__(self): self.input_validators = { 'text': self.validate_text, 'voice': self.validate_voice, 'gesture': self.validate_gesture } async def process_input(self, input_type, raw_input): validator = self.input_validators.get(input_type) if validator: return await validator(raw_input) else: raise ValueError(f"Unsupported input type: {input_type}") async def validate_text(self, text): # Sanitize and structure text input return {"type": "text", "content": text.strip()} """, "dependencies": ["validators", "pydantic"], "config": { "max_input_length": 10000, "allowed_input_types": ["text", "voice", "gesture"] } } }, "USER_OUTPUT": { "shape": "ellipse", "color": "#9C27B0", "icon": "🔊", "description": [ "• Formats responses for user consumption", "• Supports multiple output formats", "• Handles accessibility features", "• Manages response timing" ], "implementation": { "python_snippet": """ class UserOutputHandler: def __init__(self): self.formatters = { 'text': self.format_text, 'audio': self.format_audio, 'visual': self.format_visual } async def deliver_response(self, response_data, output_format): formatter = self.formatters.get(output_format) if formatter: formatted_response = await formatter(response_data) return await self.send_to_user(formatted_response) async def format_text(self, data): # Format response as structured text return {"format": "text", "content": data} """, "dependencies": ["jinja2", "markdown"], "config": { "default_format": "text", "max_response_length": 5000 } } }, "MULTIMODAL_INTERFACE": { "shape": "ellipse", "color": "#9C27B0", "icon": "🖼️", "description": [ "• Handles multiple input/output modalities", "• Integrates text, image, audio, video", "• Manages modality conversion", "• Supports rich media responses" ], "implementation": { "python_snippet": """ class MultimodalInterface: def __init__(self): self.input_processors = { 'image': ImageProcessor(), 'audio': AudioProcessor(), 'text': TextProcessor() } self.output_formatters = { 'rich_text': RichTextFormatter(), 'multimedia': MultimediaFormatter() } async def process_multimodal_input(self, inputs): processed_inputs = {} for input_type, input_data in inputs.items(): processor = self.input_processors.get(input_type) if processor: processed_inputs[input_type] = await processor.process(input_data) return processed_inputs """, "dependencies": ["pillow", "pyaudio", "opencv-python"], "config": { "supported_modalities": ["text", "image", "audio", "video"], "max_file_size_mb": 50 } } }, # =================================== # TOOL: External functions and capabilities # =================================== "TOOL": { "description": "External functions and capabilities", "color": "#795548", "icon": "🔧", "shape": "hexagon", "sub_components": ["MCP_TOOL", "API_TOOL", "LOCAL_TOOL", "AGENT_TOOL", "FUNCTION_TOOL"] }, "MCP_TOOL": { "shape": "hexagon", "color": "#795548", "icon": "🔌", "description": [ "• Model Context Protocol server", "• Standardized tool interface", "• Dynamic tool discovery", "• Secure resource access" ], "implementation": { "python_snippet": """ # MCP Server implementation from mcp import MCPServer, Tool class FileSystemTool: @Tool async def read_file(self, path: str) -> str: \"\"\"Read content from a file\"\"\" with open(path, 'r') as f: return f.read() @Tool async def write_file(self, path: str, content: str) -> str: \"\"\"Write content to a file\"\"\" with open(path, 'w') as f: f.write(content) return f"Written to {path}" # MCP Client usage async def use_mcp_tool(agent, tool_name, parameters): result = await agent.use_tool(tool_name, parameters) return result """, "protocol_spec": { "version": "1.0", "transport": ["stdio", "sse"], "authentication": ["none", "bearer"] }, "example_tools": ["filesystem", "calculator", "web_search", "database"] } }, "API_TOOL": { "shape": "hexagon", "color": "#795548", "icon": "🔗", "description": [ "• Wraps external REST/gRPC APIs", "• Handles authentication and rate limits", "• Manages request/response mapping", "• Provides error handling and retries" ], "implementation": { "python_snippet": """ class APITool: def __init__(self, base_url, auth_token=None, rate_limit=10): self.base_url = base_url self.auth_token = auth_token self.rate_limit = rate_limit self.session = aiohttp.ClientSession() async def call(self, endpoint, method='GET', data=None): headers = {"Authorization": f"Bearer {self.auth_token}"} if self.auth_token else {} url = f"{self.base_url}/{endpoint}" async with self.session.request(method, url, json=data, headers=headers) as response: return await response.json() """, "dependencies": ["aiohttp", "requests"], "config": { "timeout": 30, "max_retries": 3, "retry_delay": 1.0 } } }, "LOCAL_TOOL": { "shape": "hexagon", "color": "#795548", "icon": "💻", "description": [ "• Locally executed utility functions", "• File operations, math calculations", "• System utilities and helpers", "• Fast execution without network calls" ], "implementation": { "python_snippet": """ class LocalTool: @staticmethod async def file_operations(action, **kwargs): if action == 'read': with open(kwargs['path'], 'r') as f: return f.read() elif action == 'write': with open(kwargs['path'], 'w') as f: f.write(kwargs['content']) return f"File written to {kwargs['path']}" @staticmethod async def math_operations(operation, **kwargs): if operation == 'add': return kwargs['a'] + kwargs['b'] elif operation == 'multiply': return kwargs['a'] * kwargs['b'] """, "dependencies": ["os", "math"], "config": { "max_execution_time": 5.0, "allowed_operations": ["file", "math", "system"] } } }, "AGENT_TOOL": { "shape": "hexagon", "color": "#795548", "icon": "🛠️", "description": [ "• Allows one agent to act as a tool for another", "• Wraps agent functionality for external use", "• Handles agent-to-agent communication", "• Manages agent state and context" ], "implementation": { "python_snippet": """ class AgentTool: def __init__(self, agent): self.agent = agent async def execute(self, query, context=None): # Wrap agent execution as a tool call result = await self.agent.process(query, context) return { "result": result, "agent_name": self.agent.name, "execution_time": time.time() } """, "dependencies": ["asyncio", "time"], "config": { "max_concurrent_calls": 5, "timeout_seconds": 60 } } }, "FUNCTION_TOOL": { "shape": "hexagon", "color": "#795548", "icon": "🧮", "description": [ "• Generic callable function exposed to agents", "• Wraps Python functions for tool use", "• Handles parameter validation", "• Provides type safety and documentation" ], "implementation": { "python_snippet": """ from pydantic import BaseModel, create_model class FunctionTool: def __init__(self, func, description, param_schema=None): self.func = func self.description = description self.param_schema = param_schema or self._infer_schema(func) async def execute(self, **kwargs): validated_params = self.param_schema(**kwargs) return await self.func(**validated_params.dict()) def _infer_schema(self, func): # Infer schema from function signature sig = inspect.signature(func) fields = {} for name, param in sig.parameters.items(): fields[name] = (param.annotation, param.default if param.default != param.empty else ...) return create_model(f"{func.__name__}Params", **fields) """, "dependencies": ["pydantic", "inspect"], "config": { "max_params": 10, "validation_enabled": True } } }, # =================================== # DATA: Storage and knowledge systems # =================================== "DATA": { "description": "Data sources and storage systems", "color": "#009688", "icon": "💾", "shape": "cylinder", "sub_components": ["KNOWLEDGE_BASE", "VECTOR_DB", "DOCUMENT_STORE", "CACHE", "MEMORY"] }, "KNOWLEDGE_BASE": { "shape": "cylinder", "color": "#009688", "icon": "📘", "description": [ "• Curated domain-specific facts and rules", "• Structured knowledge representation", "• Supports inference and reasoning", "• Maintains consistency and accuracy" ], "implementation": { "python_snippet": """ class KnowledgeBase: def __init__(self, storage_backend): self.storage = storage_backend self.index = {} async def query(self, query_text, context=None): # Query knowledge base with optional context results = await self.storage.search(query_text) return self._format_results(results) async def update(self, fact, metadata=None): # Add or update knowledge fact await self.storage.insert(fact, metadata) self._update_index(fact) """, "dependencies": ["sqlite3", "nltk"], "config": { "max_facts": 100000, "update_frequency": "daily" } } }, "VECTOR_DB": { "shape": "cylinder", "color": "#009688", "icon": "🔍", "description": [ "• Embedding-based database for semantic search", "• Stores vector representations of text", "• Enables similarity-based retrieval", "• Supports semantic understanding" ], "implementation": { "python_snippet": """ import numpy as np from sentence_transformers import SentenceTransformer class VectorDB: def __init__(self, embedding_model="all-MiniLM-L6-v2"): self.model = SentenceTransformer(embedding_model) self.vectors = {} self.metadata = {} async def add_document(self, doc_id, text, metadata=None): embedding = self.model.encode(text) self.vectors[doc_id] = embedding self.metadata[doc_id] = metadata or {} async def search(self, query, top_k=5): query_embedding = self.model.encode(query) similarities = [] for doc_id, vector in self.vectors.items(): similarity = np.dot(query_embedding, vector) / ( np.linalg.norm(query_embedding) * np.linalg.norm(vector) ) similarities.append((doc_id, similarity)) return sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k] """, "dependencies": ["sentence-transformers", "numpy"], "config": { "embedding_model": "all-MiniLM-L6-v2", "max_documents": 10000 } } }, "DOCUMENT_STORE": { "shape": "cylinder", "color": "#009688", "icon": "🗂️", "description": [ "• Raw document repository (PDFs, web pages, etc.)", "• Handles various document formats", "• Provides document parsing and extraction", "• Manages document lifecycle and metadata" ], "implementation": { "python_snippet": """ class DocumentStore: def __init__(self, storage_path): self.storage_path = storage_path self.parsers = { '.pdf': self._parse_pdf, '.txt': self._parse_text, '.docx': self._parse_docx } async def store_document(self, filename, content): # Parse and store document with metadata ext = os.path.splitext(filename)[1].lower() parser = self.parsers.get(ext) if parser: parsed_content = await parser(content) # Store in database with metadata return await self._save_to_db(filename, parsed_content) async def _parse_pdf(self, content): # Extract text from PDF import PyPDF2 pdf_reader = PyPDF2.PdfReader(content) text = "" for page in pdf_reader.pages: text += page.extract_text() return text """, "dependencies": ["PyPDF2", "python-docx"], "config": { "supported_formats": [".pdf", ".txt", ".docx", ".html"], "max_file_size_mb": 100 } } }, "CACHE": { "shape": "cylinder", "color": "#009688", "icon": "⏱️", "description": [ "• Temporary fast-access storage for responses or embeddings", "• Implements LRU or TTL eviction policies", "• Reduces computation and API costs", "• Improves response times" ], "implementation": { "python_snippet": """ import time from collections import OrderedDict class Cache: def __init__(self, max_size=1000, ttl_seconds=3600): self.cache = OrderedDict() self.max_size = max_size self.ttl = ttl_seconds async def get(self, key): if key in self.cache: value, timestamp = self.cache[key] if time.time() - timestamp < self.ttl: return value else: del self.cache[key] return None async def set(self, key, value): if len(self.cache) >= self.max_size: self.cache.popitem(last=False) self.cache[key] = (value, time.time()) """, "dependencies": ["time", "collections"], "config": { "max_size": 1000, "ttl_seconds": 3600, "eviction_policy": "lru" } } }, "MEMORY": { "shape": "cylinder", "color": "#009688", "icon": "🧠", "description": [ "• Short-term context memory (conversation history, scratchpad)", "• Maintains session state and context", "• Supports conversation continuity", "• Manages memory lifecycle" ], "implementation": { "python_snippet": """ class Memory: def __init__(self, max_context_length=2000): self.conversation_history = [] self.scratchpad = {} self.max_context_length = max_context_length async def add_interaction(self, user_input, agent_response): interaction = { "timestamp": time.time(), "user": user_input, "agent": agent_response } self.conversation_history.append(interaction) self._trim_history() def _trim_history(self): # Trim history to maintain context length total_length = sum(len(str(item)) for item in self.conversation_history) while total_length > self.max_context_length and len(self.conversation_history) > 1: removed = self.conversation_history.pop(0) total_length -= len(str(removed)) """, "dependencies": ["time"], "config": { "max_context_length": 2000, "history_retention_hours": 24 } } }, # =================================== # PROCESSOR: Data processing units # =================================== "PROCESSOR": { "description": "Data processing and transformation units", "color": "#2196F3", "icon": "⚙️", "shape": "rect", "sub_components": ["QUERY_PROCESSOR", "CONTENT_RETRIEVAL", "PROMPT_TEMPLATE", "RESPONSE_FORMATTER"] }, "QUERY_PROCESSOR": { "shape": "rect", "color": "#2196F3", "icon": "🔎", "description": [ "• Parses and enriches incoming queries", "• Extracts intent and entities", "• Normalizes query structure", "• Handles query validation" ], "implementation": { "python_snippet": """ class QueryProcessor: def __init__(self): self.intent_classifier = IntentClassifier() self.entity_extractor = EntityExtractor() async def process_query(self, query_text): # Classify intent and extract entities intent = await self.intent_classifier.classify(query_text) entities = await self.entity_extractor.extract(query_text) return { "original_query": query_text, "intent": intent, "entities": entities, "processed_query": self._normalize_query(query_text, entities) } def _normalize_query(self, query, entities): # Normalize query for downstream processing normalized = query for entity, value in entities.items(): normalized = normalized.replace(value, f"[{entity}]") return normalized """, "dependencies": ["spacy", "transformers"], "config": { "max_query_length": 1000, "confidence_threshold": 0.7 } } }, "CONTENT_RETRIEVAL": { "shape": "rect", "color": "#2196F3", "icon": "📤", "description": [ "• Fetches relevant content from data stores", "• Implements semantic and keyword search", "• Ranks and filters retrieved content", "• Handles multi-source retrieval" ], "implementation": { "python_snippet": """ class ContentRetrieval: def __init__(self, data_sources): self.data_sources = data_sources async def retrieve(self, query, top_k=5, sources=None): all_results = [] for source_name, source in self.data_sources.items(): if sources is None or source_name in sources: results = await source.search(query, top_k) all_results.extend(results) # Rank and deduplicate results ranked_results = self._rank_results(all_results, query) return ranked_results[:top_k] def _rank_results(self, results, query): # Implement ranking algorithm return sorted(results, key=lambda x: x.get('relevance_score', 0), reverse=True) """, "dependencies": ["numpy", "scikit-learn"], "config": { "top_k": 5, "max_sources": 10, "relevance_threshold": 0.5 } } }, "PROMPT_TEMPLATE": { "shape": "rect", "color": "#2196F3", "icon": "📝", "description": [ "• Template-based prompt construction", "• Supports variable substitution", "• Handles different prompt formats", "• Manages prompt versioning" ], "implementation": { "python_snippet": """ from jinja2 import Template class PromptTemplate: def __init__(self, template_string): self.template = Template(template_string) async def format(self, **kwargs): return self.template.render(**kwargs) @classmethod def load_from_file(cls, file_path): with open(file_path, 'r') as f: template_string = f.read() return cls(template_string) def validate_variables(self, required_vars): # Validate that all required variables are provided pass """, "dependencies": ["jinja2"], "config": { "default_template": "You are a helpful assistant. User: {query}", "max_template_length": 5000 } } }, "RESPONSE_FORMATTER": { "shape": "rect", "color": "#2196F3", "icon": "📄", "description": [ "• Structures final output (JSON, XML, markdown, etc.)", "• Applies formatting rules and styles", "• Validates response structure", "• Supports multiple output formats" ], "implementation": { "python_snippet": """ class ResponseFormatter: def __init__(self): self.formatters = { 'json': self._format_json, 'xml': self._format_xml, 'markdown': self._format_markdown, 'text': self._format_text } async def format(self, data, format_type='json'): formatter = self.formatters.get(format_type) if formatter: return formatter(data) else: raise ValueError(f"Unsupported format: {format_type}") def _format_json(self, data): import json return json.dumps(data, indent=2) """, "dependencies": ["json", "xml.etree.ElementTree"], "config": { "default_format": "json", "max_output_length": 10000 } } }, # =================================== # ROUTER: Decision points and workflow routing # =================================== "ROUTER": { "description": "Decision points and workflow routing", "color": "#FF9800", "icon": "🎯", "shape": "diamond", "sub_components": ["INTENT_DISCOVERY", "MODEL_SELECTOR", "WORKFLOW_ROUTER", "VALIDATOR"] }, "INTENT_DISCOVERY": { "shape": "diamond", "color": "#FF9800", "icon": "🎯", "description": [ "• Identifies user intent from input", "• Uses machine learning classification", "• Handles intent confidence scoring", "• Supports intent hierarchy" ], "implementation": { "python_snippet": """ class IntentDiscovery: def __init__(self, model_path): self.model = self.load_model(model_path) async def discover_intent(self, text): # Classify intent using trained model predictions = await self.model.predict(text) top_intent = max(predictions, key=predictions.get) confidence = predictions[top_intent] return { "intent": top_intent, "confidence": confidence, "all_predictions": predictions } """, "dependencies": ["transformers", "torch"], "config": { "confidence_threshold": 0.8, "fallback_intent": "unknown" } } }, "MODEL_SELECTOR": { "shape": "diamond", "color": "#FF9800", "icon": "🧠", "description": [ "• Selects appropriate model based on task", "• Considers task complexity and cost", "• Handles model availability and load", "• Supports A/B testing of models" ], "implementation": { "python_snippet": """ class ModelSelector: def __init__(self, models): self.models = models self.model_performance = {} async def select_model(self, task_description, context=None): # Select best model based on task requirements suitable_models = self._filter_suitable_models(task_description) # Choose based on performance metrics and availability best_model = self._select_best_model(suitable_models) return best_model def _filter_suitable_models(self, task_description): # Filter models based on task compatibility return [model for model in self.models if model.can_handle(task_description)] """, "dependencies": ["numpy"], "config": { "selection_strategy": "performance_weighted", "max_model_candidates": 5 } } }, "WORKFLOW_ROUTER": { "shape": "diamond", "color": "#FF9800", "icon": "🔄", "description": [ "• Routes requests through appropriate workflows", "• Manages workflow state and transitions", "• Handles parallel and sequential execution", "• Supports workflow versioning" ], "implementation": { "python_snippet": """ class WorkflowRouter: def __init__(self, workflows): self.workflows = workflows self.current_executions = {} async def route(self, request, workflow_name=None): if workflow_name: workflow = self.workflows.get(workflow_name) else: workflow = await self._auto_select_workflow(request) execution_id = str(uuid.uuid4()) self.current_executions[execution_id] = workflow result = await workflow.execute(request) del self.current_executions[execution_id] return result """, "dependencies": ["uuid", "asyncio"], "config": { "max_concurrent_workflows": 100, "workflow_timeout": 300 } } }, "VALIDATOR": { "shape": "diamond", "color": "#FF9800", "icon": "✅", "description": [ "• Validates inputs, outputs, and intermediate results", "• Implements schema and business rule validation", "• Handles data quality checks", "• Provides validation feedback" ], "implementation": { "python_snippet": """ from pydantic import BaseModel, ValidationError class Validator: def __init__(self, schema_class: BaseModel): self.schema_class = schema_class async def validate(self, data): try: validated_data = self.schema_class(**data) return { "valid": True, "data": validated_data.dict(), "errors": [] } except ValidationError as e: return { "valid": False, "data": None, "errors": e.errors() } """, "dependencies": ["pydantic"], "config": { "strict_validation": True, "validation_timeout": 10 } } }, # =================================== # INFRASTRUCTURE: System services # =================================== "INFRASTRUCTURE": { "description": "System infrastructure and services", "color": "#FF5722", "icon": "🌐", "shape": "rect", "sub_components": ["PROVIDER", "MONITOR", "FALLBACK", "ORCHESTRATOR"] }, "PROVIDER": { "shape": "rect", "color": "#FF5722", "icon": "🌐", "description": [ "• API connection to LLM service", "• Manages authentication and rate limits", "• Handles retries and error recovery", "• Tracks usage and costs" ], "implementation": { "python_snippet": """ class LLMProvider: def __init__(self, base_url: str, api_key: str = None, model: str = "default"): self.base_url = base_url self.api_key = api_key self.model = model self.client = AsyncOpenAI(base_url=base_url, api_key=api_key) self.usage_tracker = UsageTracker() async def generate(self, prompt: str, **kwargs) -> str: try: response = await self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], **kwargs ) self.usage_tracker.record_usage(response.usage) return response.choices[0].message.content except Exception as e: raise ProviderError(f"Generation failed: {e}") def get_cost_estimate(self) -> float: return self.usage_tracker.calculate_cost() """, "supported_providers": { "openai": {"base_url": "https://api.openai.com/v1", "models": ["gpt-4", "gpt-3.5-turbo"]}, "anthropic": {"base_url": "https://api.anthropic.com/v1", "models": ["claude-3", "claude-2"]}, "local": {"base_url": "http://localhost:1234/v1", "models": ["local-model"]}, "azure": {"base_url": "https://your-resource.openai.azure.com/", "models": ["gpt-4", "gpt-35-turbo"]} }, "config_template": { "base_url": "https://api.openai.com/v1", "api_key": "your-api-key-here", "model": "gpt-4", "max_retries": 3, "timeout": 30 } } }, "MONITOR": { "shape": "rect", "color": "#FF5722", "icon": "📊", "description": [ "• Tracks system performance and metrics", "• Monitors resource usage and errors", "• Provides health checks and alerts", "• Supports logging and analytics" ], "implementation": { "python_snippet": """ import time import logging from collections import defaultdict class Monitor: def __init__(self): self.metrics = defaultdict(list) self.logger = logging.getLogger(__name__) async def record_metric(self, name, value, timestamp=None): if timestamp is None: timestamp = time.time() self.metrics[name].append((timestamp, value)) async def get_health_status(self): recent_errors = [m for m in self.metrics['errors'] if time.time() - m[0] < 300] avg_response_time = self._calculate_avg_time('response_time', 300) return { "status": "healthy" if len(recent_errors) == 0 else "degraded", "recent_errors": len(recent_errors), "avg_response_time": avg_response_time } """, "dependencies": ["logging", "time"], "config": { "metrics_retention_hours": 24, "alert_thresholds": {"error_rate": 0.05, "response_time": 5.0} } } }, "FALLBACK": { "shape": "rect", "color": "#FF5722", "icon": "🔄", "description": [ "• Provides alternative execution paths", "• Handles primary system failures", "• Implements graceful degradation", "• Maintains service availability" ], "implementation": { "python_snippet": """ class FallbackHandler: def __init__(self, primary_handler, fallback_handlers): self.primary = primary_handler self.fallbacks = fallback_handlers async def execute_with_fallback(self, *args, **kwargs): try: return await self.primary(*args, **kwargs) except PrimaryError as e: self.logger.warning(f"Primary failed: {e}, trying fallbacks") for fallback in self.fallbacks: try: return await fallback(*args, **kwargs) except FallbackError: continue raise ServiceUnavailableError("All fallbacks exhausted") """, "dependencies": ["logging"], "config": { "max_fallback_attempts": 3, "fallback_timeout": 10 } } }, "ORCHESTRATOR": { "shape": "rect", "color": "#FF5722", "icon": "🎬", "description": [ "• Coordinates complex multi-step processes", "• Manages component interactions", "• Handles state and error propagation", "• Supports distributed execution" ], "implementation": { "python_snippet": """ class Orchestrator: def __init__(self, components): self.components = components self.state = {} async def orchestrate(self, workflow_definition, input_data): current_state = input_data.copy() for step in workflow_definition.steps: component = self.components[step.component] step_result = await component.execute(current_state, step.config) current_state.update(step_result) return current_state """, "dependencies": ["asyncio"], "config": { "max_workflow_steps": 100, "step_timeout": 60 } } } } # Hierarchical Component definitions COMPONENT_HIERARCHY = { "HIGH_LEVEL": { "AGENT": { "description": "Autonomous reasoning and decision-making units", "color": "#4CAF50", "icon": "🤖", "shape": "rect", "sub_components": ["REASONING_AGENT", "ACTION_AGENT", "PLANNER_AGENT", "REACT_AGENT", "MULTI_AGENT"] }, "USER": { "description": "User interaction points and interfaces", "color": "#9C27B0", "icon": "👤", "shape": "ellipse", "sub_components": ["USER_INPUT", "USER_OUTPUT", "MULTIMODAL_INTERFACE"] }, "TOOL": { "description": "External functions and capabilities", "color": "#795548", "icon": "🔧", "shape": "hexagon", "sub_components": ["MCP_TOOL", "API_TOOL", "LOCAL_TOOL", "AGENT_TOOL", "FUNCTION_TOOL"] }, "DATA": { "description": "Data sources and storage systems", "color": "#009688", "icon": "💾", "shape": "cylinder", "sub_components": ["KNOWLEDGE_BASE", "VECTOR_DB", "DOCUMENT_STORE", "CACHE", "MEMORY"] }, "PROCESSOR": { "description": "Data processing and transformation units", "color": "#2196F3", "icon": "⚙️", "shape": "rect", "sub_components": ["QUERY_PROCESSOR", "CONTENT_RETRIEVAL", "PROMPT_TEMPLATE", "RESPONSE_FORMATTER"] }, "ROUTER": { "description": "Decision points and workflow routing", "color": "#FF9800", "icon": "🎯", "shape": "diamond", "sub_components": ["INTENT_DISCOVERY", "MODEL_SELECTOR", "WORKFLOW_ROUTER", "VALIDATOR"] }, "INFRASTRUCTURE": { "description": "System infrastructure and services", "color": "#FF5722", "icon": "🌐", "shape": "rect", "sub_components": ["PROVIDER", "MONITOR", "FALLBACK", "ORCHESTRATOR"] } } } # Enhanced Example workflows EXAMPLE_WORKFLOWS = { "Simple Chat Agent": { "description": "Basic conversational agent with single LLM call", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 150, "y": 200}, {"id": "agent_1", "type": "REASONING_AGENT", "x": 400, "y": 200}, {"id": "provider_1", "type": "PROVIDER", "x": 650, "y": 200}, {"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 200} ], "connections": [ {"from": "user_1", "to": "agent_1"}, {"from": "agent_1", "to": "provider_1"}, {"from": "provider_1", "to": "output_1"} ] }, "Intent-Driven Routing": { "description": "Routes to specialized agents based on user intent", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 150, "y": 300}, {"id": "intent_1", "type": "INTENT_DISCOVERY", "x": 400, "y": 300}, {"id": "agent_1", "type": "REASONING_AGENT", "x": 650, "y": 150}, {"id": "agent_2", "type": "ACTION_AGENT", "x": 650, "y": 450}, {"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 300} ], "connections": [ {"from": "user_1", "to": "intent_1"}, {"from": "intent_1", "to": "agent_1"}, {"from": "intent_1", "to": "agent_2"}, {"from": "agent_1", "to": "output_1"}, {"from": "agent_2", "to": "output_1"} ] }, "RAG Pipeline": { "description": "Retrieval-Augmented Generation with context", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 250}, {"id": "query_1", "type": "QUERY_PROCESSOR", "x": 250, "y": 250}, {"id": "content_1", "type": "CONTENT_RETRIEVAL", "x": 400, "y": 250}, {"id": "prompt_1", "type": "PROMPT_TEMPLATE", "x": 550, "y": 250}, {"id": "agent_1", "type": "REASONING_AGENT", "x": 700, "y": 250}, {"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 250} ], "connections": [ {"from": "user_1", "to": "query_1"}, {"from": "query_1", "to": "content_1"}, {"from": "content_1", "to": "prompt_1"}, {"from": "prompt_1", "to": "agent_1"}, {"from": "agent_1", "to": "output_1"} ] }, "Multi-Agent with Tools": { "description": "Coordinated agents with tool access and validation", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 300}, {"id": "intent_1", "type": "INTENT_DISCOVERY", "x": 280, "y": 300}, {"id": "agent_1", "type": "REASONING_AGENT", "x": 460, "y": 150}, {"id": "agent_2", "type": "ACTION_AGENT", "x": 460, "y": 450}, {"id": "tool_1", "type": "MCP_TOOL", "x": 640, "y": 150}, {"id": "tool_2", "type": "API_TOOL", "x": 640, "y": 450}, {"id": "validator_1", "type": "VALIDATOR", "x": 820, "y": 300}, {"id": "output_1", "type": "USER_OUTPUT", "x": 980, "y": 300} ], "connections": [ {"from": "user_1", "to": "intent_1"}, {"from": "intent_1", "to": "agent_1"}, {"from": "intent_1", "to": "agent_2"}, {"from": "agent_1", "to": "tool_1"}, {"from": "agent_2", "to": "tool_2"}, {"from": "tool_1", "to": "validator_1"}, {"from": "tool_2", "to": "validator_1"}, {"from": "validator_1", "to": "output_1"} ] }, "Advanced RAG with Cache": { "description": "Enhanced RAG with caching and monitoring", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 200}, {"id": "query_1", "type": "QUERY_PROCESSOR", "x": 250, "y": 200}, {"id": "cache_1", "type": "CACHE", "x": 400, "y": 100}, {"id": "knowledge_1", "type": "KNOWLEDGE_BASE", "x": 400, "y": 300}, {"id": "prompt_1", "type": "PROMPT_TEMPLATE", "x": 550, "y": 200}, {"id": "agent_1", "type": "REASONING_AGENT", "x": 700, "y": 200}, {"id": "monitor_1", "type": "MONITOR", "x": 850, "y": 100}, {"id": "output_1", "type": "USER_OUTPUT", "x": 850, "y": 300} ], "connections": [ {"from": "user_1", "to": "query_1"}, {"from": "query_1", "to": "cache_1"}, {"from": "query_1", "to": "knowledge_1"}, {"from": "cache_1", "to": "prompt_1"}, {"from": "knowledge_1", "to": "prompt_1"}, {"from": "prompt_1", "to": "agent_1"}, {"from": "agent_1", "to": "monitor_1"}, {"from": "agent_1", "to": "output_1"} ] }, "MCP Tool Agent": { "description": "Agent using MCP tools for extended capabilities", "nodes": [ {"id": "user_1", "type": "USER_INPUT", "x": 100, "y": 250}, {"id": "agent_1", "type": "REACT_AGENT", "x": 300, "y": 250}, {"id": "mcp_tool_1", "type": "MCP_TOOL", "x": 500, "y": 150}, {"id": "mcp_tool_2", "type": "MCP_TOOL", "x": 500, "y": 350}, {"id": "memory_1", "type": "MEMORY", "x": 700, "y": 250}, {"id": "output_1", "type": "USER_OUTPUT", "x": 900, "y": 250} ], "connections": [ {"from": "user_1", "to": "agent_1"}, {"from": "agent_1", "to": "mcp_tool_1"}, {"from": "agent_1", "to": "mcp_tool_2"}, {"from": "mcp_tool_1", "to": "agent_1"}, {"from": "mcp_tool_2", "to": "agent_1"}, {"from": "agent_1", "to": "memory_1"}, {"from": "agent_1", "to": "output_1"} ] } } @dataclass class ComponentData: """Complete component information""" type: str shape: str color: str icon: str description: List[str] category: Optional[str] = None sub_category: Optional[str] = None @dataclass class AgentNode: id: str type: str x: int y: int component_data: ComponentData = field(default_factory=lambda: ComponentData("", "", "", "", [])) @dataclass class Connection: from_node: str to_node: str class CustomNodeManager: def __init__(self, storage_path: str = "custom_nodes.pkl"): self.storage_path = storage_path self.custom_nodes: Dict[str, Dict[str, Any]] = {} self.load_custom_nodes() def load_custom_nodes(self): """Load custom nodes from storage""" if os.path.exists(self.storage_path): try: with open(self.storage_path, 'rb') as f: self.custom_nodes = pickle.load(f) except Exception as e: print(f"Error loading custom nodes: {e}") self.custom_nodes = {} def save_custom_nodes(self): """Save custom nodes to storage""" try: with open(self.storage_path, 'wb') as f: pickle.dump(self.custom_nodes, f) except Exception as e: print(f"Error saving custom nodes: {e}") def create_custom_node(self, name: str, config: Dict[str, Any]): """Create a new custom node""" node_id = f"custom_{name.lower().replace(' ', '_')}" self.custom_nodes[node_id] = { "id": node_id, "name": name, "type": "CUSTOM", "config": config, "created_at": __import__('datetime').datetime.now().isoformat() } self.save_custom_nodes() return node_id def get_custom_node_info(self, node_id: str) -> Dict[str, Any]: """Get information for a custom node""" return self.custom_nodes.get(node_id, {}) def delete_custom_node(self, node_id: str): """Delete a custom node""" if node_id in self.custom_nodes: del self.custom_nodes[node_id] self.save_custom_nodes() # Initialize custom node manager custom_node_manager = CustomNodeManager() class WorkflowDesigner: def __init__(self): self.nodes: Dict[str, AgentNode] = {} self.connections: List[Connection] = [] self.node_counter = 0 self.selected_node: Optional[str] = None def select_node(self, node_id: str) -> None: """Select a node and deselect others""" self.selected_node = node_id if node_id in self.nodes else None def move_selected_node(self, dx: int, dy: int) -> None: """Move selected node by delta""" if self.selected_node and self.selected_node in self.nodes: node = self.nodes[self.selected_node] node.x = max(0, node.x + dx) node.y = max(0, node.y + dy) def add_custom_node(self, custom_config: Dict[str, Any]) -> AgentNode: """Add a custom node to the workflow""" self.node_counter += 1 node_id = f"custom_{self.node_counter}" # Create custom node configuration custom_node_config = { "shape": custom_config.get("shape", "rect"), "color": custom_config.get("color", "#666666"), "icon": custom_config.get("icon", "🔧"), "description": custom_config.get("description", ["Custom node"]), "implementation": custom_config.get("implementation", {}) } # Add to COMPONENT_INFO for rendering COMPONENT_INFO[node_id] = custom_node_config col = len(self.nodes) % 3 row = len(self.nodes) // 3 x_pos = 200 + (col * 350) y_pos = 150 + (row * 200) node = AgentNode( id=node_id, type=node_id, # Use node_id as type for custom nodes x=x_pos, y=y_pos ) self.nodes[node_id] = node self.selected_node = node_id return node def get_workflow_json(self) -> Dict[str, Any]: """Get complete workflow data including component implementations""" nodes_data = [] for node in self.nodes.values(): node_info = COMPONENT_INFO.get(node.type, {}) nodes_data.append({ "id": node.id, "type": node.type, "x": node.x, "y": node.y, "component_info": node_info, "implementation": node_info.get("implementation", {}) }) return { "nodes": nodes_data, "connections": [asdict(c) for c in self.connections], "selected_node": self.selected_node, "metadata": { "total_nodes": len(self.nodes), "total_connections": len(self.connections), "generated_at": __import__('datetime').datetime.now().isoformat() } } def add_node(self, node_type: str) -> AgentNode: self.node_counter += 1 node_id = f"{node_type}_{self.node_counter}" col = len(self.nodes) % 3 row = len(self.nodes) // 3 x_pos = 200 + (col * 350) y_pos = 150 + (row * 200) # Get complete component information component_info = COMPONENT_INFO.get(node_type, { "shape": "rect", "color": "#666666", "icon": "❓", "description": ["Unknown component type"] }) # Create component data with full information component_data = ComponentData( type=node_type, shape=component_info["shape"], color=component_info["color"], icon=component_info["icon"], description=component_info["description"], category=self._find_component_category(node_type), sub_category=self._find_component_sub_category(node_type) ) node = AgentNode( id=node_id, type=node_type, x=x_pos, y=y_pos, component_data=component_data ) self.nodes[node_id] = node self.selected_node = node_id return node def _find_component_category(self, node_type: str) -> Optional[str]: """Find which high-level category this component belongs to""" for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items(): if node_type == category or node_type in components.get('sub_components', []): return category return None def _find_component_sub_category(self, node_type: str) -> Optional[str]: """Determine if this is a high-level or sub-component""" for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items(): if node_type == category: return "HIGH_LEVEL" elif node_type in components.get('sub_components', []): return "SUB_COMPONENT" return None def load_example(self, example_name: str): if example_name not in EXAMPLE_WORKFLOWS: return example = EXAMPLE_WORKFLOWS[example_name] self.nodes.clear() self.connections.clear() for node_data in example["nodes"]: node_type = node_data["type"] # Get complete component information for example nodes too component_info = COMPONENT_INFO.get(node_type, { "shape": "rect", "color": "#666666", "icon": "❓", "description": ["Unknown component type"] }) component_data = ComponentData( type=node_type, shape=component_info["shape"], color=component_info["color"], icon=component_info["icon"], description=component_info["description"], category=self._find_component_category(node_type), sub_category=self._find_component_sub_category(node_type) ) node = AgentNode( id=node_data["id"], type=node_type, x=node_data["x"], y=node_data["y"], component_data=component_data ) self.nodes[node.id] = node for conn_data in example["connections"]: conn = Connection( from_node=conn_data["from"], to_node=conn_data["to"] ) self.connections.append(conn) if self.nodes: self.selected_node = list(self.nodes.keys())[0] def get_workflow_json(self) -> Dict[str, Any]: """Get complete workflow data including full component information""" return { "metadata": { "total_nodes": len(self.nodes), "total_connections": len(self.connections), "selected_node": self.selected_node, "generated_with": "Agent Workflow Designer" }, "nodes": [ { "id": node.id, "type": node.type, "position": {"x": node.x, "y": node.y}, "component_data": { "type": node.component_data.type, "shape": node.component_data.shape, "color": node.component_data.color, "icon": node.component_data.icon, "description": node.component_data.description, "category": node.component_data.category, "sub_category": node.component_data.sub_category } } for node in self.nodes.values() ], "connections": [ { "from": conn.from_node, "to": conn.to_node } for conn in self.connections ] } def render_svg(self) -> str: """Render workflow as beautiful SVG with selection support""" if not self.nodes: return ''' 🚀 Start Building Your Workflow Add components from the library on the left ''' width = 1200 height = max(600, max([n.y for n in self.nodes.values()], default=0) + 200) svg_parts = [ f'', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '' ] # Draw connections with glow for conn in self.connections: if conn.from_node in self.nodes and conn.to_node in self.nodes: from_node = self.nodes[conn.from_node] to_node = self.nodes[conn.to_node] from_x = from_node.x + 85 from_y = from_node.y + 60 to_x = to_node.x + 15 to_y = to_node.y + 60 mid_x = (from_x + to_x) / 2 # Glow path svg_parts.append( f'' ) # Main path svg_parts.append( f'' ) # Draw nodes with selection support for node in self.nodes.values(): # Use the stored component data instead of looking it up shape = node.component_data.shape color = node.component_data.color icon = node.component_data.icon cx = node.x + 85 cy = node.y + 60 label = node.id.replace("_", " ").title() is_selected = (node.id == self.selected_node) selection_glow = 'filter="url(#selected-glow)"' if is_selected else 'filter="url(#shadow)"' selection_stroke = "6" if is_selected else "4" # Node background with selection highlight if shape == "ellipse": svg_parts.append( f'' ) elif shape == "diamond": size = 70 points = f"{cx},{cy-size} {cx+size},{cy} {cx},{cy+size} {cx-size},{cy}" svg_parts.append( f'' ) elif shape == "hexagon": w, h = 70, 50 points = f"{cx-w},{cy-h/2} {cx-w/2},{cy-h} {cx+w/2},{cy-h} {cx+w},{cy-h/2} {cx+w},{cy+h/2} {cx+w/2},{cy+h} {cx-w/2},{cy+h} {cx-w},{cy+h/2}" svg_parts.append( f'' ) elif shape == "cylinder": svg_parts.append( f'' ) svg_parts.append( f'' ) svg_parts.append( f'' ) svg_parts.append( f'' ) svg_parts.append( f'' ) else: # rect svg_parts.append( f'' ) # Icon svg_parts.append( f'{icon}' ) # Label svg_parts.append( f'{label}' ) # Add JavaScript for drag and drop svg_parts.append(''' ''') svg_parts.append('') return '\n'.join(svg_parts) workflow = WorkflowDesigner() # Report generation class class WorkflowReporter: def __init__(self): try: self.client = AsyncOpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio") except Exception as e: print("LM Studio client init failed:", e) async def generate_report(self, workflow_json: str) -> str: prompt = f""" Generate a comprehensive system design report based on the following workflow: {workflow_json} The report should include a detailed repost and system breif with full examples and implimentations where possible and explanaion of requirement in cases where the workflow is complexed and need further deconstruction, as well as example usages : 1. A high-level system overview 2. User stories for each component or connection expetation 3. Use case briefs for each component interaction and component relationship 4. Pseudocode for the implementation for each component and for the overall workflow 5. Component responsibilities and interfaces 6. Data flow description and example use-cases """ try: response = await self.client.chat.completions.create( model="leroydyer/qwen/qwen3-0.6b-q4_k_m.gguf", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=2048 ) return response.choices[0].message.content except Exception as e: return f"Error generating report: {str(e)}" # Initialize reporter reporter = WorkflowReporter() def create_workflow_ui(): with gr.Blocks(title="Agent Workflow Designer", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🎓 Agentic System Workflow Designer") gr.Markdown("**Educational tool for planning and understanding agent architectures**") # Hidden components for JavaScript communication select_node_trigger = gr.Textbox(visible=False) move_node_trigger = gr.Textbox(visible=False) # Define all UI components first with gr.Row(): # Left Sidebar - Component Library with gr.Column(scale=1): gr.Markdown("## 📚 Component Library") # Store component buttons for later connection component_buttons = [] # High-level component accordions for category, components in COMPONENT_HIERARCHY["HIGH_LEVEL"].items(): with gr.Accordion(f"{components['icon']} {category}", open=False): # High-level component button high_level_btn = gr.Button( f"{components['icon']} {category}", size="sm", variant="primary" ) component_buttons.append((high_level_btn, category)) # Sub-components if components['sub_components']: gr.Markdown("**Sub-components:**") for sub_comp in components['sub_components']: sub_info = COMPONENT_INFO[sub_comp] sub_btn = gr.Button( f"{sub_info['icon']} {sub_comp.replace('_', ' ').title()}", size="sm" ) component_buttons.append((sub_btn, sub_comp)) gr.Markdown("---") gr.Markdown("## 🔗 Connect Nodes") from_node = gr.Dropdown(label="From", choices=[], interactive=True) to_node = gr.Dropdown(label="To", choices=[], interactive=True) connect_btn = gr.Button("➡️ Connect", variant="secondary") gr.Markdown("---") gr.Markdown("## 📋 Examples") example_dropdown = gr.Dropdown( choices=list(EXAMPLE_WORKFLOWS.keys()), label="Load Example Workflow", interactive=True ) load_example_btn = gr.Button("📥 Load Example") gr.Markdown("---") with gr.Row(): download_json_btn = gr.Button("💾 Download JSON", variant="primary", size="sm") download_svg_btn = gr.Button("🖼️ Download SVG", variant="primary", size="sm") clear_btn = gr.Button("🗑️ Clear All", variant="stop", size="sm") # Output for multiple downloadable files download_files = gr.Files(label="📥 Download Files", visible=True) # Center - Canvas with gr.Column(scale=3): gr.Markdown("## 🎨 Workflow Canvas") gr.Markdown("**💡 Tip:** Click nodes to select, then drag or use arrow keys") canvas = gr.HTML() gr.Markdown("## 📖 Component Information") component_info = gr.Markdown("Select a component to see its description") # Right Sidebar - Movement Controls with gr.Column(scale=1): gr.Markdown("## 🎯 Selection & Movement") gr.Markdown("**Navigation:**") with gr.Row(): select_prev_btn = gr.Button("⬅️ Prev", size="sm") select_next_btn = gr.Button("➡️ Next", size="sm") deselect_btn = gr.Button("❌ Deselect", size="sm") gr.Markdown("**Selected Node:**") selected_node_info = gr.Markdown("No node selected") gr.Markdown("**Move Selected:**") with gr.Row(): move_left_btn = gr.Button("⬅️", size="sm") move_up_btn = gr.Button("⬆️", size="sm") move_down_btn = gr.Button("⬇️", size="sm") move_right_btn = gr.Button("➡️", size="sm") gr.Markdown("**Movement Modes:**") with gr.Row(): move_fine_btn = gr.Button("🎯 Fine (5px)", size="sm") move_coarse_btn = gr.Button("🚀 Coarse (50px)", size="sm") gr.Markdown("---") with gr.Accordion("🗑️ Delete Selected", open=False): delete_selected_btn = gr.Button("❌ Delete Selected Node", variant="stop", size="sm") gr.Markdown("---") with gr.Accordion("📊 Workflow Data", open=False): json_output = gr.Code(language="json", label="Workflow JSON", lines=10) gr.Markdown("---") with gr.Accordion("📋 Generate Report", open=False): report_btn = gr.Button("📄 Generate System Report", variant="primary") report_output = gr.Textbox(label="System Design Report", lines=15, interactive=False) download_report_btn = gr.Button("📝 Download Report", variant="secondary", size="sm") # Now define all the handler functions after UI components are defined def get_full_state(): svg = workflow.render_svg() node_choices = list(workflow.nodes.keys()) workflow_json = json.dumps({ "nodes": [asdict(n) for n in workflow.nodes.values()], "connections": [asdict(c) for c in workflow.connections], "selected_node": workflow.selected_node }, indent=2) selected_info = "**No node selected**" comp_info = "Select a component to see its description" if workflow.selected_node and workflow.selected_node in workflow.nodes: node = workflow.nodes[workflow.selected_node] info = COMPONENT_INFO[node.type] selected_info = f"**Selected:** `{node.id}` ({info['icon']} {node.type.replace('_', ' ').title()}) at position ({node.x}, {node.y})" comp_info = f"### {node.type.replace('_', ' ').title()} {info['icon']}\n\n" + "\n".join(info["description"]) return svg, workflow_json, node_choices, selected_info, comp_info def add_node_handler(node_type): node = workflow.add_node(node_type) svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, comp_info # Connect all component buttons for btn, comp_type in component_buttons: btn.click( lambda ct=comp_type: add_node_handler(ct), outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info] ) # Selection handlers def select_node_handler(node_id): if node_id: workflow.select_node(node_id) svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info return workflow.render_svg(), "", "No node selected", "Select a component to see its description" def select_next_node(): if workflow.nodes: node_ids = list(workflow.nodes.keys()) if not workflow.selected_node: workflow.selected_node = node_ids[0] else: current_idx = node_ids.index(workflow.selected_node) next_idx = (current_idx + 1) % len(node_ids) workflow.selected_node = node_ids[next_idx] svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info def select_prev_node(): if workflow.nodes: node_ids = list(workflow.nodes.keys()) if not workflow.selected_node: workflow.selected_node = node_ids[-1] else: current_idx = node_ids.index(workflow.selected_node) prev_idx = (current_idx - 1) % len(node_ids) workflow.selected_node = node_ids[prev_idx] svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info def deselect_all(): workflow.selected_node = None svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info # Delete selected node def delete_selected_node(): if workflow.selected_node and workflow.selected_node in workflow.nodes: workflow.connections = [ c for c in workflow.connections if c.from_node != workflow.selected_node and c.to_node != workflow.selected_node ] del workflow.nodes[workflow.selected_node] workflow.selected_node = None svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, comp_info # Movement handlers def move_selected_node(dx, dy): if workflow.selected_node: workflow.move_selected_node(dx, dy) svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info return workflow.render_svg(), "", "No node selected", component_info.value # Connect selection events select_node_trigger.change( select_node_handler, inputs=[select_node_trigger], outputs=[canvas, json_output, selected_node_info, component_info] ) select_next_btn.click(select_next_node, outputs=[canvas, json_output, selected_node_info, component_info]) select_prev_btn.click(select_prev_node, outputs=[canvas, json_output, selected_node_info, component_info]) deselect_btn.click(deselect_all, outputs=[canvas, json_output, selected_node_info, component_info]) # Movement buttons move_left_btn.click(lambda: move_selected_node(-20, 0), outputs=[canvas, json_output, selected_node_info, component_info]) move_right_btn.click(lambda: move_selected_node(20, 0), outputs=[canvas, json_output, selected_node_info, component_info]) move_up_btn.click(lambda: move_selected_node(0, -20), outputs=[canvas, json_output, selected_node_info, component_info]) move_down_btn.click(lambda: move_selected_node(0, 20), outputs=[canvas, json_output, selected_node_info, component_info]) move_fine_btn.click(lambda: move_selected_node(-5, 0), outputs=[canvas, json_output, selected_node_info, component_info]) move_coarse_btn.click(lambda: move_selected_node(-50, 0), outputs=[canvas, json_output, selected_node_info, component_info]) # Delete button delete_selected_btn.click( delete_selected_node, outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info] ) # Drag handler def handle_node_drag(move_data): try: data = json.loads(move_data) node_id = data.get('node_id') dx = data.get('dx', 0) dy = data.get('dy', 0) if node_id and node_id in workflow.nodes: workflow.select_node(node_id) workflow.move_selected_node(dx, dy) svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info, comp_info except Exception as e: print("Drag error:", e) return workflow.render_svg(), "", "Drag completed", component_info.value move_node_trigger.change( handle_node_drag, inputs=[move_node_trigger], outputs=[canvas, json_output, selected_node_info, component_info] ) # Connection handler def connect_nodes(from_n, to_n): if from_n and to_n and from_n != to_n: existing = [c for c in workflow.connections if c.from_node == from_n and c.to_node == to_n] if not existing: workflow.connections.append(Connection(from_node=from_n, to_node=to_n)) svg, wf_json, choices, selected_info, comp_info = get_full_state() return svg, wf_json, selected_info return workflow.render_svg(), "", selected_node_info.value connect_btn.click(connect_nodes, inputs=[from_node, to_node], outputs=[canvas, json_output, selected_node_info]) # Example loading def load_example_handler(example_name): if example_name: workflow.load_example(example_name) svg, wf_json, choices, selected_info, comp_info = get_full_state() desc = EXAMPLE_WORKFLOWS[example_name]["description"] info_text = f"### {example_name}\n\n{desc}" return svg, wf_json, gr.Dropdown(choices=choices), gr.Dropdown(choices=choices), selected_info, info_text return workflow.render_svg(), "", gr.Dropdown(choices=[]), gr.Dropdown(choices=[]), "No node selected", "Select a component to see its description" load_example_btn.click( load_example_handler, inputs=[example_dropdown], outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info] ) # Unified download handler (returns list of files) def download_all_files(): file_list = [] fid = str(uuid.uuid4()) # JSON json_data = { ... } json_path = tempfile.mktemp(suffix=f"_{fid}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(json_data, f, indent=2) file_list.append(json_path) # SVG svg_path = tempfile.mktemp(suffix=f"_{fid}.svg") with open(svg_path, "w", encoding="utf-8") as f: f.write(workflow.render_svg()) file_list.append(svg_path) return file_list # In your download_json function, replace: def download_json(): fid = str(uuid.uuid4()) # Use the new method that includes full component data json_data = workflow.get_workflow_json() json_path = tempfile.mktemp(suffix=f"_{fid}.json") with open(json_path, "w", encoding="utf-8") as f: json.dump(json_data, f, indent=2) return [json_path] # Download SVG only def download_svg(): fid = str(uuid.uuid4()) svg_content = workflow.render_svg() svg_path = tempfile.mktemp(suffix=f"_{fid}.svg") with open(svg_path, "w", encoding="utf-8") as f: # ←← KEY CHANGE: encoding="utf-8" f.write(svg_content) return [svg_path] # Report generation (sync wrapper) def sync_generate_report(): workflow_data = { "nodes": [asdict(n) for n in workflow.nodes.values()], "connections": [asdict(c) for c in workflow.connections], "selected_node": workflow.selected_node } json_str = json.dumps(workflow_data, indent=2) try: report = asyncio.run(reporter.generate_report(json_str)) except Exception as e: report = f"Failed to generate report: {e}" return report def download_report(): report_text = sync_generate_report() fid = str(uuid.uuid4()) txt_path = tempfile.mktemp(suffix=f"_report_{fid}.txt") with open(txt_path, "w", encoding="utf-8") as f: # ←← f.write(f"Agentic Workflow Design Report\nGenerated on: {str(__import__('datetime').datetime.now())}\n\n") f.write(report_text) return [txt_path] # Attach handlers download_json_btn.click(download_json, outputs=[download_files]) download_svg_btn.click(download_svg, outputs=[download_files]) report_btn.click(sync_generate_report, outputs=[report_output]) download_report_btn.click(download_report, outputs=[download_files]) # Clear handler def clear_all(): workflow.nodes.clear() workflow.connections.clear() workflow.node_counter = 0 workflow.selected_node = None svg = workflow.render_svg() return ( svg, "{}", gr.Dropdown(choices=[]), gr.Dropdown(choices=[]), "No node selected", "Canvas cleared. Ready to build!" ) clear_btn.click(clear_all, outputs=[canvas, json_output, from_node, to_node, selected_node_info, component_info]) # Initialize with JavaScript support def init_app(): svg = workflow.render_svg() js_code = ''' ''' return svg + js_code demo.load(init_app, outputs=[canvas]) return demo if __name__ == "__main__": demo = create_workflow_ui() demo.launch()