qulab-infinite / experiment_workflows.py
workofarttattoo's picture
πŸš€ QuLab MCP Server: Complete Experiment Taxonomy Deployment
91994bf
"""
Copyright (c) 2025 Joshua Hendricks Cole (DBA: Corporation of Light). All Rights Reserved. PATENT PENDING.
Experiment Workflows - Complex Workflow Composition System
Chains together multiple experiments in sequence with data flow and dependencies.
Supports chemical synthesis workflows, analytical pipelines, and multi-step processes.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Callable, Union
from datetime import datetime, timedelta
import asyncio
from enum import Enum
import json
from experiment_taxonomy import experiment_taxonomy, ExperimentTemplate
from experiment_protocols import experiment_protocols
class WorkflowStepType(Enum):
"""Types of workflow steps"""
EXPERIMENT_EXECUTION = "experiment_execution"
DATA_TRANSFORMATION = "data_transformation"
CONDITIONAL_BRANCHING = "conditional_branching"
PARALLEL_EXECUTION = "parallel_execution"
QUALITY_CHECK = "quality_check"
class DataFlow(Enum):
"""Types of data flow between workflow steps"""
DIRECT_PASS = "direct_pass" # Pass result directly to next step
PARAMETER_MAPPING = "parameter_mapping" # Map specific parameters
TRANSFORMATION = "transformation" # Apply transformation function
ACCUMULATION = "accumulation" # Accumulate results across steps
@dataclass
class WorkflowStep:
"""Individual step in a workflow"""
step_id: str
name: str
step_type: WorkflowStepType
experiment_id: Optional[str] = None
parameters: Dict[str, Any] = field(default_factory=dict)
data_flow: DataFlow = DataFlow.DIRECT_PASS
transformation_function: Optional[Callable] = None
dependencies: List[str] = field(default_factory=list) # Step IDs this depends on
timeout_minutes: Optional[int] = None
retry_count: int = 0
critical_step: bool = False # If this fails, abort workflow
@dataclass
class WorkflowResult:
"""Result of a workflow execution"""
workflow_id: str
success: bool
total_steps: int
completed_steps: int
failed_steps: int
execution_time_seconds: float
results: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
step_results: List[Dict[str, Any]] = field(default_factory=list)
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class ExperimentWorkflow:
"""Complex workflow combining multiple experiments"""
workflow_id: str
name: str
description: str
category: str # e.g., "organic_synthesis", "drug_discovery", "material_characterization"
steps: List[WorkflowStep] = field(default_factory=list)
input_parameters: Dict[str, Any] = field(default_factory=dict)
output_schema: Dict[str, Any] = field(default_factory=dict)
estimated_duration: Optional[timedelta] = None
required_equipment: List[str] = field(default_factory=list)
safety_requirements: List[str] = field(default_factory=list)
quality_checks: List[str] = field(default_factory=list)
def validate_workflow(self) -> List[str]:
"""Validate workflow structure and dependencies"""
errors = []
# Check for duplicate step IDs
step_ids = [step.step_id for step in self.steps]
if len(step_ids) != len(set(step_ids)):
errors.append("Duplicate step IDs found")
# Check dependency validity
for step in self.steps:
for dep in step.dependencies:
if dep not in step_ids:
errors.append(f"Step {step.step_id} depends on unknown step {dep}")
# Check for circular dependencies
if self._has_circular_dependencies():
errors.append("Circular dependencies detected")
# Validate experiment IDs exist
for step in self.steps:
if step.experiment_id and not experiment_taxonomy.get_experiment(step.experiment_id):
errors.append(f"Unknown experiment ID: {step.experiment_id}")
return errors
def _has_circular_dependencies(self) -> bool:
"""Check for circular dependencies using topological sort"""
# Simplified circular dependency check
visited = set()
visiting = set()
def has_cycle(step_id: str) -> bool:
if step_id in visiting:
return True
if step_id in visited:
return False
visiting.add(step_id)
step = next((s for s in self.steps if s.step_id == step_id), None)
if step:
for dep in step.dependencies:
if has_cycle(dep):
return True
visiting.remove(step_id)
visited.add(step_id)
return False
for step in self.steps:
if has_cycle(step.step_id):
return True
return False
def get_execution_order(self) -> List[str]:
"""Get steps in execution order (topological sort)"""
# Simple topological sort implementation
result = []
visited = set()
visiting = set()
def visit(step_id: str):
if step_id in visiting:
raise ValueError(f"Circular dependency involving {step_id}")
if step_id in visited:
return
visiting.add(step_id)
step = next((s for s in self.steps if s.step_id == step_id), None)
if step:
for dep in step.dependencies:
visit(dep)
visiting.remove(step_id)
visited.add(step_id)
result.append(step_id)
# Visit all steps
for step in self.steps:
if step.step_id not in visited:
visit(step.step_id)
return result
class WorkflowExecutor:
"""
Executes complex workflows by orchestrating multiple experiments.
Handles data flow, error recovery, and parallel execution.
"""
def __init__(self, mcp_server=None):
self.mcp_server = mcp_server
self.active_workflows: Dict[str, WorkflowResult] = {}
async def execute_workflow(self, workflow: ExperimentWorkflow,
input_data: Dict[str, Any] = None) -> WorkflowResult:
"""Execute a complete workflow"""
start_time = datetime.now()
workflow_id = f"{workflow.workflow_id}_{start_time.strftime('%Y%m%d_%H%M%S')}"
result = WorkflowResult(
workflow_id=workflow_id,
success=False,
total_steps=len(workflow.steps),
completed_steps=0,
failed_steps=0,
execution_time_seconds=0,
results={},
errors=[]
)
self.active_workflows[workflow_id] = result
try:
# Validate workflow
validation_errors = workflow.validate_workflow()
if validation_errors:
result.errors.extend(validation_errors)
result.execution_time_seconds = (datetime.now() - start_time).total_seconds()
return result
# Get execution order
execution_order = workflow.get_execution_order()
step_results = {}
# Execute steps in order
for step_id in execution_order:
step = next((s for s in workflow.steps if s.step_id == step_id), None)
if not step:
continue
# Prepare step parameters
step_params = self._prepare_step_parameters(step, step_results, input_data or {})
# Execute step
step_result = await self._execute_workflow_step(step, step_params)
step_results[step_id] = step_result
result.step_results.append({
'step_id': step_id,
'success': step_result.get('success', False),
'result': step_result,
'execution_time': step_result.get('execution_time', 0)
})
if step_result.get('success'):
result.completed_steps += 1
else:
result.failed_steps += 1
result.errors.append(f"Step {step_id} failed: {step_result.get('error', 'Unknown error')}")
# Abort if critical step fails
if step.critical_step:
break
# Aggregate results
result.success = result.failed_steps == 0
result.results = self._aggregate_workflow_results(workflow, step_results)
except Exception as e:
result.errors.append(f"Workflow execution error: {str(e)}")
result.execution_time_seconds = (datetime.now() - start_time).total_seconds()
return result
def _prepare_step_parameters(self, step: WorkflowStep,
previous_results: Dict[str, Any],
input_data: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare parameters for a workflow step"""
params = step.parameters.copy()
if step.data_flow == DataFlow.DIRECT_PASS:
# Pass results from dependencies directly
for dep in step.dependencies:
if dep in previous_results:
dep_result = previous_results[dep]
if 'result' in dep_result and 'results' in dep_result['result']:
# Extract product or key output from dependency
exp_results = dep_result['result']['results']
if 'product' in exp_results:
params['input_data'] = exp_results['product']
elif 'output' in exp_results:
params['input_data'] = exp_results['output']
elif step.data_flow == DataFlow.PARAMETER_MAPPING:
# Map specific parameters from dependencies
for dep in step.dependencies:
if dep in previous_results:
dep_result = previous_results[dep]
# Custom parameter mapping logic here
pass
elif step.data_flow == DataFlow.TRANSFORMATION:
# Apply transformation function
if step.transformation_function:
transformed_data = step.transformation_function(previous_results)
params.update(transformed_data)
# Add input data
params.update(input_data)
return params
async def _execute_workflow_step(self, step: WorkflowStep,
parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a single workflow step"""
start_time = datetime.now()
try:
if step.step_type == WorkflowStepType.EXPERIMENT_EXECUTION:
# Execute experiment via MCP server
if self.mcp_server and step.experiment_id:
from qulab_mcp_server import MCPRequest
request = MCPRequest(
tool=f"experiment.{step.experiment_id}",
parameters=parameters,
request_id=f"workflow_{step.step_id}_{datetime.now().strftime('%H%M%S')}"
)
response = await self.mcp_server.execute_tool(request)
execution_time = (datetime.now() - start_time).total_seconds()
return {
'success': response.status == 'success',
'result': response.result if response.result else {},
'error': response.error,
'execution_time': execution_time
}
else:
return {
'success': False,
'error': 'MCP server not available or experiment ID missing',
'execution_time': (datetime.now() - start_time).total_seconds()
}
elif step.step_type == WorkflowStepType.DATA_TRANSFORMATION:
# Apply data transformation
if step.transformation_function:
result = step.transformation_function(parameters)
return {
'success': True,
'result': result,
'execution_time': (datetime.now() - start_time).total_seconds()
}
else:
return {
'success': False,
'error': 'No transformation function provided',
'execution_time': (datetime.now() - start_time).total_seconds()
}
else:
return {
'success': False,
'error': f'Unsupported step type: {step.step_type}',
'execution_time': (datetime.now() - start_time).total_seconds()
}
except Exception as e:
return {
'success': False,
'error': str(e),
'execution_time': (datetime.now() - start_time).total_seconds()
}
def _aggregate_workflow_results(self, workflow: ExperimentWorkflow,
step_results: Dict[str, Any]) -> Dict[str, Any]:
"""Aggregate results from all workflow steps"""
aggregated = {
'workflow_name': workflow.name,
'category': workflow.category,
'final_outputs': {},
'quality_metrics': {},
'execution_summary': {
'total_steps': len(workflow.steps),
'successful_steps': sum(1 for r in step_results.values() if r.get('success')),
'failed_steps': sum(1 for r in step_results.values() if not r.get('success'))
}
}
# Extract key outputs from final steps
final_step_ids = [s.step_id for s in workflow.steps if not any(s.step_id in other.dependencies for other in workflow.steps)]
for step_id in final_step_ids:
if step_id in step_results:
step_result = step_results[step_id]
if step_result.get('success') and 'result' in step_result:
exp_result = step_result['result'].get('results', {})
aggregated['final_outputs'][step_id] = exp_result
return aggregated
class WorkflowTemplates:
"""
Pre-defined workflow templates for common scientific processes.
"""
@staticmethod
def create_organic_synthesis_workflow(target_molecule: str) -> ExperimentWorkflow:
"""Create a multi-step organic synthesis workflow"""
return ExperimentWorkflow(
workflow_id="organic_synthesis_workflow",
name=f"Synthesis of {target_molecule}",
description=f"Multi-step synthesis workflow for {target_molecule}",
category="organic_synthesis",
steps=[
WorkflowStep(
step_id="retrosynthesis",
name="Retrosynthetic Analysis",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="organic_synthesis",
parameters={"target": target_molecule, "strategy": "retrosynthesis"},
critical_step=True
),
WorkflowStep(
step_id="step1_reaction",
name="First Synthetic Step",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="aldol_condensation",
parameters={"temperature": 0, "solvent": "ethanol"},
dependencies=["retrosynthesis"],
data_flow=DataFlow.PARAMETER_MAPPING
),
WorkflowStep(
step_id="purification",
name="Product Purification",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="recrystallization",
dependencies=["step1_reaction"],
data_flow=DataFlow.DIRECT_PASS
),
WorkflowStep(
step_id="characterization",
name="Product Characterization",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="nmr_spectroscopy",
dependencies=["purification"],
data_flow=DataFlow.DIRECT_PASS
)
],
estimated_duration=timedelta(hours=8),
required_equipment=["reaction_vessel", "spectrometer", "chromatography_system"],
safety_requirements=["chemical_handling", "instrument_safety"],
quality_checks=["purity_check", "yield_calculation", "spectral_analysis"]
)
@staticmethod
def create_drug_discovery_workflow() -> ExperimentWorkflow:
"""Create a drug discovery workflow"""
return ExperimentWorkflow(
workflow_id="drug_discovery_workflow",
name="High-Throughput Drug Discovery",
description="Complete drug discovery pipeline from virtual screening to optimization",
category="drug_discovery",
steps=[
WorkflowStep(
step_id="virtual_screening",
name="Virtual Screening",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="molecular_docking",
parameters={"library_size": 1000000, "target_protein": "PDB_ID"}
),
WorkflowStep(
step_id="hit_identification",
name="Hit Identification",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="binding_assay",
dependencies=["virtual_screening"],
data_flow=DataFlow.TRANSFORMATION
),
WorkflowStep(
step_id="lead_optimization",
name="Lead Optimization",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="structure_activity_relationship",
dependencies=["hit_identification"],
data_flow=DataFlow.PARAMETER_MAPPING
),
WorkflowStep(
step_id="admet_profiling",
name="ADMET Profiling",
step_type=WorkflowStepType.EXPERIMENT_EXECUTION,
experiment_id="pharmacokinetic_modeling",
dependencies=["lead_optimization"],
data_flow=DataFlow.DIRECT_PASS
)
],
estimated_duration=timedelta(days=30),
required_equipment=["computational_cluster", "high_throughput_screening", "analytical_instruments"],
safety_requirements=["biological_safety", "chemical_handling"],
quality_checks=["potency_assay", "selectivity_test", "toxicity_screening"]
)
# Global workflow executor instance
workflow_executor = WorkflowExecutor()
if __name__ == '__main__':
# Create a sample organic synthesis workflow
synthesis_workflow = WorkflowTemplates.create_organic_synthesis_workflow("ibuprofen")
print(f"Created workflow: {synthesis_workflow.name}")
print(f"Steps: {len(synthesis_workflow.steps)}")
print(f"Estimated duration: {synthesis_workflow.estimated_duration}")
# Validate workflow
errors = synthesis_workflow.validate_workflow()
if errors:
print(f"Validation errors: {errors}")
else:
print("Workflow validation passed")
# Show execution order
execution_order = synthesis_workflow.get_execution_order()
print(f"Execution order: {execution_order}")
print("\nWorkflow steps:")
for step in synthesis_workflow.steps:
print(f" - {step.step_id}: {step.name} ({step.step_type.value})")
if step.dependencies:
print(f" Dependencies: {step.dependencies}")