| """A simple script to run a Flow that can be used for development and debugging.""" |
|
|
| import os |
|
|
| import hydra |
|
|
| import flows |
| from flows.flow_launchers import FlowLauncher, ApiInfo |
| from flows.utils.general_helpers import read_yaml_file |
|
|
| from flows import logging |
| from flows.flow_cache import CACHING_PARAMETERS, clear_cache |
|
|
| CACHING_PARAMETERS.do_caching = False |
| |
|
|
| logging.set_verbosity_debug() |
|
|
|
|
| if __name__ == "__main__": |
| |
| |
| |
| |
| api_information = ApiInfo("azure", os.getenv("AZURE_OPENAI_KEY"), os.getenv("AZURE_OPENAI_ENDPOINT")) |
|
|
| |
| root_dir = "." |
| cfg_path = os.path.join(root_dir, "FlowName.yaml") |
| cfg = read_yaml_file(cfg_path) |
|
|
| flow_with_interfaces = { |
| "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"), |
| "input_interface": ( |
| None |
| if getattr(cfg, "input_interface", None) is None |
| else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False) |
| ), |
| "output_interface": ( |
| None |
| if getattr(cfg, "output_interface", None) is None |
| else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False) |
| ), |
| } |
|
|
| |
| |
| data = {"id": 0} |
|
|
| |
| path_to_output_file = None |
| |
|
|
| _, outputs = FlowLauncher.launch( |
| flow_with_interfaces=flow_with_interfaces, |
| data=data, |
| path_to_output_file=path_to_output_file, |
| api_information=api_information, |
| ) |
|
|
| |
| flow_output_data = outputs[0] |
| print(flow_output_data) |
|
|