DoNothingFlowModule / DoNothingAtomicFlow.py
Haolong Li
Update DoNothingAtomicFlow.py
c3cfca3
raw
history blame contribute delete
589 Bytes
import traceback
from copy import deepcopy
from typing import Dict, Any
from flows.base_flows import AtomicFlow
class DoNothingAtomicFlow(AtomicFlow):
def __init__(self,
**kwargs):
super().__init__(**kwargs)
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def run(
self,
input_data: Dict[str, Any]):
response = input_data.copy()
return response