| import traceback | |
| from copy import deepcopy | |
| from typing import Dict, Any | |
| from flows.base_flows import AtomicFlow | |
| class DoNothingAtomicFlow(AtomicFlow): | |
| def __init__(self, | |
| **kwargs): | |
| super().__init__(**kwargs) | |
| def instantiate_from_config(cls, config): | |
| flow_config = deepcopy(config) | |
| kwargs = {"flow_config": flow_config} | |
| # ~~~ Instantiate flow ~~~ | |
| return cls(**kwargs) | |
| def run( | |
| self, | |
| input_data: Dict[str, Any]): | |
| response = input_data.copy() | |
| return response | |