Spaces:
No application file
No application file
| """ | |
| Copyright (c) 2025 Joshua Hendricks Cole (DBA: Corporation of Light). All Rights Reserved. PATENT PENDING. | |
| Experiment Workflows - Complex Workflow Composition System | |
| Chains together multiple experiments in sequence with data flow and dependencies. | |
| Supports chemical synthesis workflows, analytical pipelines, and multi-step processes. | |
| """ | |
| from dataclasses import dataclass, field | |
| from typing import Dict, List, Any, Optional, Callable, Union | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| from enum import Enum | |
| import json | |
| from experiment_taxonomy import experiment_taxonomy, ExperimentTemplate | |
| from experiment_protocols import experiment_protocols | |
| class WorkflowStepType(Enum): | |
| """Types of workflow steps""" | |
| EXPERIMENT_EXECUTION = "experiment_execution" | |
| DATA_TRANSFORMATION = "data_transformation" | |
| CONDITIONAL_BRANCHING = "conditional_branching" | |
| PARALLEL_EXECUTION = "parallel_execution" | |
| QUALITY_CHECK = "quality_check" | |
| class DataFlow(Enum): | |
| """Types of data flow between workflow steps""" | |
| DIRECT_PASS = "direct_pass" # Pass result directly to next step | |
| PARAMETER_MAPPING = "parameter_mapping" # Map specific parameters | |
| TRANSFORMATION = "transformation" # Apply transformation function | |
| ACCUMULATION = "accumulation" # Accumulate results across steps | |
| class WorkflowStep: | |
| """Individual step in a workflow""" | |
| step_id: str | |
| name: str | |
| step_type: WorkflowStepType | |
| experiment_id: Optional[str] = None | |
| parameters: Dict[str, Any] = field(default_factory=dict) | |
| data_flow: DataFlow = DataFlow.DIRECT_PASS | |
| transformation_function: Optional[Callable] = None | |
| dependencies: List[str] = field(default_factory=list) # Step IDs this depends on | |
| timeout_minutes: Optional[int] = None | |
| retry_count: int = 0 | |
| critical_step: bool = False # If this fails, abort workflow | |
| class WorkflowResult: | |
| """Result of a workflow execution""" | |
| workflow_id: str | |
| success: bool | |
| total_steps: int | |
| completed_steps: int | |
| failed_steps: int | |
| execution_time_seconds: float | |
| results: Dict[str, Any] = field(default_factory=dict) | |
| errors: List[str] = field(default_factory=list) | |
| step_results: List[Dict[str, Any]] = field(default_factory=list) | |
| timestamp: datetime = field(default_factory=datetime.now) | |
| class ExperimentWorkflow: | |
| """Complex workflow combining multiple experiments""" | |
| workflow_id: str | |
| name: str | |
| description: str | |
| category: str # e.g., "organic_synthesis", "drug_discovery", "material_characterization" | |
| steps: List[WorkflowStep] = field(default_factory=list) | |
| input_parameters: Dict[str, Any] = field(default_factory=dict) | |
| output_schema: Dict[str, Any] = field(default_factory=dict) | |
| estimated_duration: Optional[timedelta] = None | |
| required_equipment: List[str] = field(default_factory=list) | |
| safety_requirements: List[str] = field(default_factory=list) | |
| quality_checks: List[str] = field(default_factory=list) | |
| def validate_workflow(self) -> List[str]: | |
| """Validate workflow structure and dependencies""" | |
| errors = [] | |
| # Check for duplicate step IDs | |
| step_ids = [step.step_id for step in self.steps] | |
| if len(step_ids) != len(set(step_ids)): | |
| errors.append("Duplicate step IDs found") | |
| # Check dependency validity | |
| for step in self.steps: | |
| for dep in step.dependencies: | |
| if dep not in step_ids: | |
| errors.append(f"Step {step.step_id} depends on unknown step {dep}") | |
| # Check for circular dependencies | |
| if self._has_circular_dependencies(): | |
| errors.append("Circular dependencies detected") | |
| # Validate experiment IDs exist | |
| for step in self.steps: | |
| if step.experiment_id and not experiment_taxonomy.get_experiment(step.experiment_id): | |
| errors.append(f"Unknown experiment ID: {step.experiment_id}") | |
| return errors | |
| def _has_circular_dependencies(self) -> bool: | |
| """Check for circular dependencies using topological sort""" | |
| # Simplified circular dependency check | |
| visited = set() | |
| visiting = set() | |
| def has_cycle(step_id: str) -> bool: | |
| if step_id in visiting: | |
| return True | |
| if step_id in visited: | |
| return False | |
| visiting.add(step_id) | |
| step = next((s for s in self.steps if s.step_id == step_id), None) | |
| if step: | |
| for dep in step.dependencies: | |
| if has_cycle(dep): | |
| return True | |
| visiting.remove(step_id) | |
| visited.add(step_id) | |
| return False | |
| for step in self.steps: | |
| if has_cycle(step.step_id): | |
| return True | |
| return False | |
| def get_execution_order(self) -> List[str]: | |
| """Get steps in execution order (topological sort)""" | |
| # Simple topological sort implementation | |
| result = [] | |
| visited = set() | |
| visiting = set() | |
| def visit(step_id: str): | |
| if step_id in visiting: | |
| raise ValueError(f"Circular dependency involving {step_id}") | |
| if step_id in visited: | |
| return | |
| visiting.add(step_id) | |
| step = next((s for s in self.steps if s.step_id == step_id), None) | |
| if step: | |
| for dep in step.dependencies: | |
| visit(dep) | |
| visiting.remove(step_id) | |
| visited.add(step_id) | |
| result.append(step_id) | |
| # Visit all steps | |
| for step in self.steps: | |
| if step.step_id not in visited: | |
| visit(step.step_id) | |
| return result | |
| class WorkflowExecutor: | |
| """ | |
| Executes complex workflows by orchestrating multiple experiments. | |
| Handles data flow, error recovery, and parallel execution. | |
| """ | |
| def __init__(self, mcp_server=None): | |
| self.mcp_server = mcp_server | |
| self.active_workflows: Dict[str, WorkflowResult] = {} | |
| async def execute_workflow(self, workflow: ExperimentWorkflow, | |
| input_data: Dict[str, Any] = None) -> WorkflowResult: | |
| """Execute a complete workflow""" | |
| start_time = datetime.now() | |
| workflow_id = f"{workflow.workflow_id}_{start_time.strftime('%Y%m%d_%H%M%S')}" | |
| result = WorkflowResult( | |
| workflow_id=workflow_id, | |
| success=False, | |
| total_steps=len(workflow.steps), | |
| completed_steps=0, | |
| failed_steps=0, | |
| execution_time_seconds=0, | |
| results={}, | |
| errors=[] | |
| ) | |
| self.active_workflows[workflow_id] = result | |
| try: | |
| # Validate workflow | |
| validation_errors = workflow.validate_workflow() | |
| if validation_errors: | |
| result.errors.extend(validation_errors) | |
| result.execution_time_seconds = (datetime.now() - start_time).total_seconds() | |
| return result | |
| # Get execution order | |
| execution_order = workflow.get_execution_order() | |
| step_results = {} | |
| # Execute steps in order | |
| for step_id in execution_order: | |
| step = next((s for s in workflow.steps if s.step_id == step_id), None) | |
| if not step: | |
| continue | |
| # Prepare step parameters | |
| step_params = self._prepare_step_parameters(step, step_results, input_data or {}) | |
| # Execute step | |
| step_result = await self._execute_workflow_step(step, step_params) | |
| step_results[step_id] = step_result | |
| result.step_results.append({ | |
| 'step_id': step_id, | |
| 'success': step_result.get('success', False), | |
| 'result': step_result, | |
| 'execution_time': step_result.get('execution_time', 0) | |
| }) | |
| if step_result.get('success'): | |
| result.completed_steps += 1 | |
| else: | |
| result.failed_steps += 1 | |
| result.errors.append(f"Step {step_id} failed: {step_result.get('error', 'Unknown error')}") | |
| # Abort if critical step fails | |
| if step.critical_step: | |
| break | |
| # Aggregate results | |
| result.success = result.failed_steps == 0 | |
| result.results = self._aggregate_workflow_results(workflow, step_results) | |
| except Exception as e: | |
| result.errors.append(f"Workflow execution error: {str(e)}") | |
| result.execution_time_seconds = (datetime.now() - start_time).total_seconds() | |
| return result | |
| def _prepare_step_parameters(self, step: WorkflowStep, | |
| previous_results: Dict[str, Any], | |
| input_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Prepare parameters for a workflow step""" | |
| params = step.parameters.copy() | |
| if step.data_flow == DataFlow.DIRECT_PASS: | |
| # Pass results from dependencies directly | |
| for dep in step.dependencies: | |
| if dep in previous_results: | |
| dep_result = previous_results[dep] | |
| if 'result' in dep_result and 'results' in dep_result['result']: | |
| # Extract product or key output from dependency | |
| exp_results = dep_result['result']['results'] | |
| if 'product' in exp_results: | |
| params['input_data'] = exp_results['product'] | |
| elif 'output' in exp_results: | |
| params['input_data'] = exp_results['output'] | |
| elif step.data_flow == DataFlow.PARAMETER_MAPPING: | |
| # Map specific parameters from dependencies | |
| for dep in step.dependencies: | |
| if dep in previous_results: | |
| dep_result = previous_results[dep] | |
| # Custom parameter mapping logic here | |
| pass | |
| elif step.data_flow == DataFlow.TRANSFORMATION: | |
| # Apply transformation function | |
| if step.transformation_function: | |
| transformed_data = step.transformation_function(previous_results) | |
| params.update(transformed_data) | |
| # Add input data | |
| params.update(input_data) | |
| return params | |
| async def _execute_workflow_step(self, step: WorkflowStep, | |
| parameters: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute a single workflow step""" | |
| start_time = datetime.now() | |
| try: | |
| if step.step_type == WorkflowStepType.EXPERIMENT_EXECUTION: | |
| # Execute experiment via MCP server | |
| if self.mcp_server and step.experiment_id: | |
| from qulab_mcp_server import MCPRequest | |
| request = MCPRequest( | |
| tool=f"experiment.{step.experiment_id}", | |
| parameters=parameters, | |
| request_id=f"workflow_{step.step_id}_{datetime.now().strftime('%H%M%S')}" | |
| ) | |
| response = await self.mcp_server.execute_tool(request) | |
| execution_time = (datetime.now() - start_time).total_seconds() | |
| return { | |
| 'success': response.status == 'success', | |
| 'result': response.result if response.result else {}, | |
| 'error': response.error, | |
| 'execution_time': execution_time | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': 'MCP server not available or experiment ID missing', | |
| 'execution_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| elif step.step_type == WorkflowStepType.DATA_TRANSFORMATION: | |
| # Apply data transformation | |
| if step.transformation_function: | |
| result = step.transformation_function(parameters) | |
| return { | |
| 'success': True, | |
| 'result': result, | |
| 'execution_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': 'No transformation function provided', | |
| 'execution_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': f'Unsupported step type: {step.step_type}', | |
| 'execution_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'execution_time': (datetime.now() - start_time).total_seconds() | |
| } | |
| def _aggregate_workflow_results(self, workflow: ExperimentWorkflow, | |
| step_results: Dict[str, Any]) -> Dict[str, Any]: | |
| """Aggregate results from all workflow steps""" | |
| aggregated = { | |
| 'workflow_name': workflow.name, | |
| 'category': workflow.category, | |
| 'final_outputs': {}, | |
| 'quality_metrics': {}, | |
| 'execution_summary': { | |
| 'total_steps': len(workflow.steps), | |
| 'successful_steps': sum(1 for r in step_results.values() if r.get('success')), | |
| 'failed_steps': sum(1 for r in step_results.values() if not r.get('success')) | |
| } | |
| } | |
| # Extract key outputs from final steps | |
| final_step_ids = [s.step_id for s in workflow.steps if not any(s.step_id in other.dependencies for other in workflow.steps)] | |
| for step_id in final_step_ids: | |
| if step_id in step_results: | |
| step_result = step_results[step_id] | |
| if step_result.get('success') and 'result' in step_result: | |
| exp_result = step_result['result'].get('results', {}) | |
| aggregated['final_outputs'][step_id] = exp_result | |
| return aggregated | |
| class WorkflowTemplates: | |
| """ | |
| Pre-defined workflow templates for common scientific processes. | |
| """ | |
| def create_organic_synthesis_workflow(target_molecule: str) -> ExperimentWorkflow: | |
| """Create a multi-step organic synthesis workflow""" | |
| return ExperimentWorkflow( | |
| workflow_id="organic_synthesis_workflow", | |
| name=f"Synthesis of {target_molecule}", | |
| description=f"Multi-step synthesis workflow for {target_molecule}", | |
| category="organic_synthesis", | |
| steps=[ | |
| WorkflowStep( | |
| step_id="retrosynthesis", | |
| name="Retrosynthetic Analysis", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="organic_synthesis", | |
| parameters={"target": target_molecule, "strategy": "retrosynthesis"}, | |
| critical_step=True | |
| ), | |
| WorkflowStep( | |
| step_id="step1_reaction", | |
| name="First Synthetic Step", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="aldol_condensation", | |
| parameters={"temperature": 0, "solvent": "ethanol"}, | |
| dependencies=["retrosynthesis"], | |
| data_flow=DataFlow.PARAMETER_MAPPING | |
| ), | |
| WorkflowStep( | |
| step_id="purification", | |
| name="Product Purification", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="recrystallization", | |
| dependencies=["step1_reaction"], | |
| data_flow=DataFlow.DIRECT_PASS | |
| ), | |
| WorkflowStep( | |
| step_id="characterization", | |
| name="Product Characterization", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="nmr_spectroscopy", | |
| dependencies=["purification"], | |
| data_flow=DataFlow.DIRECT_PASS | |
| ) | |
| ], | |
| estimated_duration=timedelta(hours=8), | |
| required_equipment=["reaction_vessel", "spectrometer", "chromatography_system"], | |
| safety_requirements=["chemical_handling", "instrument_safety"], | |
| quality_checks=["purity_check", "yield_calculation", "spectral_analysis"] | |
| ) | |
| def create_drug_discovery_workflow() -> ExperimentWorkflow: | |
| """Create a drug discovery workflow""" | |
| return ExperimentWorkflow( | |
| workflow_id="drug_discovery_workflow", | |
| name="High-Throughput Drug Discovery", | |
| description="Complete drug discovery pipeline from virtual screening to optimization", | |
| category="drug_discovery", | |
| steps=[ | |
| WorkflowStep( | |
| step_id="virtual_screening", | |
| name="Virtual Screening", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="molecular_docking", | |
| parameters={"library_size": 1000000, "target_protein": "PDB_ID"} | |
| ), | |
| WorkflowStep( | |
| step_id="hit_identification", | |
| name="Hit Identification", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="binding_assay", | |
| dependencies=["virtual_screening"], | |
| data_flow=DataFlow.TRANSFORMATION | |
| ), | |
| WorkflowStep( | |
| step_id="lead_optimization", | |
| name="Lead Optimization", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="structure_activity_relationship", | |
| dependencies=["hit_identification"], | |
| data_flow=DataFlow.PARAMETER_MAPPING | |
| ), | |
| WorkflowStep( | |
| step_id="admet_profiling", | |
| name="ADMET Profiling", | |
| step_type=WorkflowStepType.EXPERIMENT_EXECUTION, | |
| experiment_id="pharmacokinetic_modeling", | |
| dependencies=["lead_optimization"], | |
| data_flow=DataFlow.DIRECT_PASS | |
| ) | |
| ], | |
| estimated_duration=timedelta(days=30), | |
| required_equipment=["computational_cluster", "high_throughput_screening", "analytical_instruments"], | |
| safety_requirements=["biological_safety", "chemical_handling"], | |
| quality_checks=["potency_assay", "selectivity_test", "toxicity_screening"] | |
| ) | |
| # Global workflow executor instance | |
| workflow_executor = WorkflowExecutor() | |
| if __name__ == '__main__': | |
| # Create a sample organic synthesis workflow | |
| synthesis_workflow = WorkflowTemplates.create_organic_synthesis_workflow("ibuprofen") | |
| print(f"Created workflow: {synthesis_workflow.name}") | |
| print(f"Steps: {len(synthesis_workflow.steps)}") | |
| print(f"Estimated duration: {synthesis_workflow.estimated_duration}") | |
| # Validate workflow | |
| errors = synthesis_workflow.validate_workflow() | |
| if errors: | |
| print(f"Validation errors: {errors}") | |
| else: | |
| print("Workflow validation passed") | |
| # Show execution order | |
| execution_order = synthesis_workflow.get_execution_order() | |
| print(f"Execution order: {execution_order}") | |
| print("\nWorkflow steps:") | |
| for step in synthesis_workflow.steps: | |
| print(f" - {step.step_id}: {step.name} ({step.step_type.value})") | |
| if step.dependencies: | |
| print(f" Dependencies: {step.dependencies}") |