""" Copyright (c) 2025 Joshua Hendricks Cole (DBA: Corporation of Light). All Rights Reserved. PATENT PENDING. Experiment Workflows - Complex Workflow Composition System Chains together multiple experiments in sequence with data flow and dependencies. Supports chemical synthesis workflows, analytical pipelines, and multi-step processes. """ from dataclasses import dataclass, field from typing import Dict, List, Any, Optional, Callable, Union from datetime import datetime, timedelta import asyncio from enum import Enum import json from experiment_taxonomy import experiment_taxonomy, ExperimentTemplate from experiment_protocols import experiment_protocols class WorkflowStepType(Enum): """Types of workflow steps""" EXPERIMENT_EXECUTION = "experiment_execution" DATA_TRANSFORMATION = "data_transformation" CONDITIONAL_BRANCHING = "conditional_branching" PARALLEL_EXECUTION = "parallel_execution" QUALITY_CHECK = "quality_check" class DataFlow(Enum): """Types of data flow between workflow steps""" DIRECT_PASS = "direct_pass" # Pass result directly to next step PARAMETER_MAPPING = "parameter_mapping" # Map specific parameters TRANSFORMATION = "transformation" # Apply transformation function ACCUMULATION = "accumulation" # Accumulate results across steps @dataclass class WorkflowStep: """Individual step in a workflow""" step_id: str name: str step_type: WorkflowStepType experiment_id: Optional[str] = None parameters: Dict[str, Any] = field(default_factory=dict) data_flow: DataFlow = DataFlow.DIRECT_PASS transformation_function: Optional[Callable] = None dependencies: List[str] = field(default_factory=list) # Step IDs this depends on timeout_minutes: Optional[int] = None retry_count: int = 0 critical_step: bool = False # If this fails, abort workflow @dataclass class WorkflowResult: """Result of a workflow execution""" workflow_id: str success: bool total_steps: int completed_steps: int failed_steps: int execution_time_seconds: float results: Dict[str, Any] = field(default_factory=dict) errors: List[str] = field(default_factory=list) step_results: List[Dict[str, Any]] = field(default_factory=list) timestamp: datetime = field(default_factory=datetime.now) @dataclass class ExperimentWorkflow: """Complex workflow combining multiple experiments""" workflow_id: str name: str description: str category: str # e.g., "organic_synthesis", "drug_discovery", "material_characterization" steps: List[WorkflowStep] = field(default_factory=list) input_parameters: Dict[str, Any] = field(default_factory=dict) output_schema: Dict[str, Any] = field(default_factory=dict) estimated_duration: Optional[timedelta] = None required_equipment: List[str] = field(default_factory=list) safety_requirements: List[str] = field(default_factory=list) quality_checks: List[str] = field(default_factory=list) def validate_workflow(self) -> List[str]: """Validate workflow structure and dependencies""" errors = [] # Check for duplicate step IDs step_ids = [step.step_id for step in self.steps] if len(step_ids) != len(set(step_ids)): errors.append("Duplicate step IDs found") # Check dependency validity for step in self.steps: for dep in step.dependencies: if dep not in step_ids: errors.append(f"Step {step.step_id} depends on unknown step {dep}") # Check for circular dependencies if self._has_circular_dependencies(): errors.append("Circular dependencies detected") # Validate experiment IDs exist for step in self.steps: if step.experiment_id and not experiment_taxonomy.get_experiment(step.experiment_id): errors.append(f"Unknown experiment ID: {step.experiment_id}") return errors def _has_circular_dependencies(self) -> bool: """Check for circular dependencies using topological sort""" # Simplified circular dependency check visited = set() visiting = set() def has_cycle(step_id: str) -> bool: if step_id in visiting: return True if step_id in visited: return False visiting.add(step_id) step = next((s for s in self.steps if s.step_id == step_id), None) if step: for dep in step.dependencies: if has_cycle(dep): return True visiting.remove(step_id) visited.add(step_id) return False for step in self.steps: if has_cycle(step.step_id): return True return False def get_execution_order(self) -> List[str]: """Get steps in execution order (topological sort)""" # Simple topological sort implementation result = [] visited = set() visiting = set() def visit(step_id: str): if step_id in visiting: raise ValueError(f"Circular dependency involving {step_id}") if step_id in visited: return visiting.add(step_id) step = next((s for s in self.steps if s.step_id == step_id), None) if step: for dep in step.dependencies: visit(dep) visiting.remove(step_id) visited.add(step_id) result.append(step_id) # Visit all steps for step in self.steps: if step.step_id not in visited: visit(step.step_id) return result class WorkflowExecutor: """ Executes complex workflows by orchestrating multiple experiments. Handles data flow, error recovery, and parallel execution. """ def __init__(self, mcp_server=None): self.mcp_server = mcp_server self.active_workflows: Dict[str, WorkflowResult] = {} async def execute_workflow(self, workflow: ExperimentWorkflow, input_data: Dict[str, Any] = None) -> WorkflowResult: """Execute a complete workflow""" start_time = datetime.now() workflow_id = f"{workflow.workflow_id}_{start_time.strftime('%Y%m%d_%H%M%S')}" result = WorkflowResult( workflow_id=workflow_id, success=False, total_steps=len(workflow.steps), completed_steps=0, failed_steps=0, execution_time_seconds=0, results={}, errors=[] ) self.active_workflows[workflow_id] = result try: # Validate workflow validation_errors = workflow.validate_workflow() if validation_errors: result.errors.extend(validation_errors) result.execution_time_seconds = (datetime.now() - start_time).total_seconds() return result # Get execution order execution_order = workflow.get_execution_order() step_results = {} # Execute steps in order for step_id in execution_order: step = next((s for s in workflow.steps if s.step_id == step_id), None) if not step: continue # Prepare step parameters step_params = self._prepare_step_parameters(step, step_results, input_data or {}) # Execute step step_result = await self._execute_workflow_step(step, step_params) step_results[step_id] = step_result result.step_results.append({ 'step_id': step_id, 'success': step_result.get('success', False), 'result': step_result, 'execution_time': step_result.get('execution_time', 0) }) if step_result.get('success'): result.completed_steps += 1 else: result.failed_steps += 1 result.errors.append(f"Step {step_id} failed: {step_result.get('error', 'Unknown error')}") # Abort if critical step fails if step.critical_step: break # Aggregate results result.success = result.failed_steps == 0 result.results = self._aggregate_workflow_results(workflow, step_results) except Exception as e: result.errors.append(f"Workflow execution error: {str(e)}") result.execution_time_seconds = (datetime.now() - start_time).total_seconds() return result def _prepare_step_parameters(self, step: WorkflowStep, previous_results: Dict[str, Any], input_data: Dict[str, Any]) -> Dict[str, Any]: """Prepare parameters for a workflow step""" params = step.parameters.copy() if step.data_flow == DataFlow.DIRECT_PASS: # Pass results from dependencies directly for dep in step.dependencies: if dep in previous_results: dep_result = previous_results[dep] if 'result' in dep_result and 'results' in dep_result['result']: # Extract product or key output from dependency exp_results = dep_result['result']['results'] if 'product' in exp_results: params['input_data'] = exp_results['product'] elif 'output' in exp_results: params['input_data'] = exp_results['output'] elif step.data_flow == DataFlow.PARAMETER_MAPPING: # Map specific parameters from dependencies for dep in step.dependencies: if dep in previous_results: dep_result = previous_results[dep] # Custom parameter mapping logic here pass elif step.data_flow == DataFlow.TRANSFORMATION: # Apply transformation function if step.transformation_function: transformed_data = step.transformation_function(previous_results) params.update(transformed_data) # Add input data params.update(input_data) return params async def _execute_workflow_step(self, step: WorkflowStep, parameters: Dict[str, Any]) -> Dict[str, Any]: """Execute a single workflow step""" start_time = datetime.now() try: if step.step_type == WorkflowStepType.EXPERIMENT_EXECUTION: # Execute experiment via MCP server if self.mcp_server and step.experiment_id: from qulab_mcp_server import MCPRequest request = MCPRequest( tool=f"experiment.{step.experiment_id}", parameters=parameters, request_id=f"workflow_{step.step_id}_{datetime.now().strftime('%H%M%S')}" ) response = await self.mcp_server.execute_tool(request) execution_time = (datetime.now() - start_time).total_seconds() return { 'success': response.status == 'success', 'result': response.result if response.result else {}, 'error': response.error, 'execution_time': execution_time } else: return { 'success': False, 'error': 'MCP server not available or experiment ID missing', 'execution_time': (datetime.now() - start_time).total_seconds() } elif step.step_type == WorkflowStepType.DATA_TRANSFORMATION: # Apply data transformation if step.transformation_function: result = step.transformation_function(parameters) return { 'success': True, 'result': result, 'execution_time': (datetime.now() - start_time).total_seconds() } else: return { 'success': False, 'error': 'No transformation function provided', 'execution_time': (datetime.now() - start_time).total_seconds() } else: return { 'success': False, 'error': f'Unsupported step type: {step.step_type}', 'execution_time': (datetime.now() - start_time).total_seconds() } except Exception as e: return { 'success': False, 'error': str(e), 'execution_time': (datetime.now() - start_time).total_seconds() } def _aggregate_workflow_results(self, workflow: ExperimentWorkflow, step_results: Dict[str, Any]) -> Dict[str, Any]: """Aggregate results from all workflow steps""" aggregated = { 'workflow_name': workflow.name, 'category': workflow.category, 'final_outputs': {}, 'quality_metrics': {}, 'execution_summary': { 'total_steps': len(workflow.steps), 'successful_steps': sum(1 for r in step_results.values() if r.get('success')), 'failed_steps': sum(1 for r in step_results.values() if not r.get('success')) } } # Extract key outputs from final steps final_step_ids = [s.step_id for s in workflow.steps if not any(s.step_id in other.dependencies for other in workflow.steps)] for step_id in final_step_ids: if step_id in step_results: step_result = step_results[step_id] if step_result.get('success') and 'result' in step_result: exp_result = step_result['result'].get('results', {}) aggregated['final_outputs'][step_id] = exp_result return aggregated class WorkflowTemplates: """ Pre-defined workflow templates for common scientific processes. """ @staticmethod def create_organic_synthesis_workflow(target_molecule: str) -> ExperimentWorkflow: """Create a multi-step organic synthesis workflow""" return ExperimentWorkflow( workflow_id="organic_synthesis_workflow", name=f"Synthesis of {target_molecule}", description=f"Multi-step synthesis workflow for {target_molecule}", category="organic_synthesis", steps=[ WorkflowStep( step_id="retrosynthesis", name="Retrosynthetic Analysis", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="organic_synthesis", parameters={"target": target_molecule, "strategy": "retrosynthesis"}, critical_step=True ), WorkflowStep( step_id="step1_reaction", name="First Synthetic Step", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="aldol_condensation", parameters={"temperature": 0, "solvent": "ethanol"}, dependencies=["retrosynthesis"], data_flow=DataFlow.PARAMETER_MAPPING ), WorkflowStep( step_id="purification", name="Product Purification", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="recrystallization", dependencies=["step1_reaction"], data_flow=DataFlow.DIRECT_PASS ), WorkflowStep( step_id="characterization", name="Product Characterization", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="nmr_spectroscopy", dependencies=["purification"], data_flow=DataFlow.DIRECT_PASS ) ], estimated_duration=timedelta(hours=8), required_equipment=["reaction_vessel", "spectrometer", "chromatography_system"], safety_requirements=["chemical_handling", "instrument_safety"], quality_checks=["purity_check", "yield_calculation", "spectral_analysis"] ) @staticmethod def create_drug_discovery_workflow() -> ExperimentWorkflow: """Create a drug discovery workflow""" return ExperimentWorkflow( workflow_id="drug_discovery_workflow", name="High-Throughput Drug Discovery", description="Complete drug discovery pipeline from virtual screening to optimization", category="drug_discovery", steps=[ WorkflowStep( step_id="virtual_screening", name="Virtual Screening", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="molecular_docking", parameters={"library_size": 1000000, "target_protein": "PDB_ID"} ), WorkflowStep( step_id="hit_identification", name="Hit Identification", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="binding_assay", dependencies=["virtual_screening"], data_flow=DataFlow.TRANSFORMATION ), WorkflowStep( step_id="lead_optimization", name="Lead Optimization", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="structure_activity_relationship", dependencies=["hit_identification"], data_flow=DataFlow.PARAMETER_MAPPING ), WorkflowStep( step_id="admet_profiling", name="ADMET Profiling", step_type=WorkflowStepType.EXPERIMENT_EXECUTION, experiment_id="pharmacokinetic_modeling", dependencies=["lead_optimization"], data_flow=DataFlow.DIRECT_PASS ) ], estimated_duration=timedelta(days=30), required_equipment=["computational_cluster", "high_throughput_screening", "analytical_instruments"], safety_requirements=["biological_safety", "chemical_handling"], quality_checks=["potency_assay", "selectivity_test", "toxicity_screening"] ) # Global workflow executor instance workflow_executor = WorkflowExecutor() if __name__ == '__main__': # Create a sample organic synthesis workflow synthesis_workflow = WorkflowTemplates.create_organic_synthesis_workflow("ibuprofen") print(f"Created workflow: {synthesis_workflow.name}") print(f"Steps: {len(synthesis_workflow.steps)}") print(f"Estimated duration: {synthesis_workflow.estimated_duration}") # Validate workflow errors = synthesis_workflow.validate_workflow() if errors: print(f"Validation errors: {errors}") else: print("Workflow validation passed") # Show execution order execution_order = synthesis_workflow.get_execution_order() print(f"Execution order: {execution_order}") print("\nWorkflow steps:") for step in synthesis_workflow.steps: print(f" - {step.step_id}: {step.name} ({step.step_type.value})") if step.dependencies: print(f" Dependencies: {step.dependencies}")