|
|
import threading |
|
|
import contextvars |
|
|
import time |
|
|
from tqdm import tqdm |
|
|
|
|
|
from typing import Callable, Optional, Any, List, Union, Tuple |
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
|
import asyncio |
|
|
from tqdm.asyncio import tqdm_asyncio |
|
|
|
|
|
from ..core.logging import logger |
|
|
from ..core.message import Message |
|
|
from ..models.base_model import BaseLLM |
|
|
from ..benchmark.benchmark import Benchmark |
|
|
from ..workflow.workflow import WorkFlow |
|
|
from ..workflow.action_graph import ActionGraph |
|
|
from ..workflow.workflow_graph import WorkFlowGraph |
|
|
from ..agents.agent_manager import AgentManager |
|
|
|
|
|
|
|
|
class Evaluator: |
|
|
""" |
|
|
A class for evaluating the performance of a workflow. |
|
|
""" |
|
|
def __init__( |
|
|
self, |
|
|
llm: BaseLLM, |
|
|
num_workers: int = 1, |
|
|
agent_manager: Optional[AgentManager] = None, |
|
|
collate_func: Optional[Callable] = None, |
|
|
output_postprocess_func: Optional[Callable] = None, |
|
|
verbose: Optional[bool] = None, |
|
|
**kwargs |
|
|
): |
|
|
""" |
|
|
Initialize the Evaluator. |
|
|
|
|
|
Args: |
|
|
llm (BaseLLM): The LLM to use for evaluation. |
|
|
num_workers (int): The number of parallel workers to use for evaluation. Default is 1. |
|
|
agent_manager (AgentManager, optional): The agent manager used to construct the workflow. Only used when the workflow graph is a WorkFlowGraph. |
|
|
collate_func (Callable, optional): A function to collate the benchmark data. |
|
|
It receives a single example from the benchmark and the output (which should be a dictionary) will serve as inputs |
|
|
to the `execute` function of an WorkFlow (or ActionGraph) instance. |
|
|
Note that the keys in the collated output should match the inputs of the workflow. |
|
|
The default is a lambda function that returns the example itself. |
|
|
output_postprocess_func (Callable, optional): A function to postprocess the output of the workflow. |
|
|
It receives the output of an WorkFlow instance (str) or an ActionGraph instance (dict) as input |
|
|
and the output will be passed to the `evaluate` function of the benchmark. |
|
|
The default is a lambda function that returns the output itself. |
|
|
verbose (bool, optional): Whether to print the evaluation progress. |
|
|
""" |
|
|
self.llm = llm |
|
|
self.num_workers = num_workers |
|
|
self.agent_manager = agent_manager |
|
|
self._thread_agent_managers = {} |
|
|
self.collate_func = collate_func or (lambda x: x) |
|
|
self.output_postprocess_func = output_postprocess_func or (lambda x: x) |
|
|
self.verbose = verbose |
|
|
|
|
|
self._evaluation_records = {} |
|
|
self.kwargs = kwargs |
|
|
|
|
|
def _get_eval_data(self, benchmark: Benchmark, eval_mode: str = "test", indices: Optional[List[int]] = None, sample_k: Optional[int] = None, seed: Optional[int] = None) -> List[dict]: |
|
|
|
|
|
assert eval_mode in ["test", "dev", "train"], f"Invalid eval_mode: {eval_mode}. Choices: ['test', 'dev', 'train']" |
|
|
if eval_mode == "test": |
|
|
data = benchmark.get_test_data(indices=indices, sample_k=sample_k, seed=seed) |
|
|
elif eval_mode == "dev": |
|
|
data = benchmark.get_dev_data(indices=indices, sample_k=sample_k, seed=seed) |
|
|
else: |
|
|
data = benchmark.get_train_data(indices=indices, sample_k=sample_k, seed=seed) |
|
|
return data |
|
|
|
|
|
def evaluate( |
|
|
self, |
|
|
graph: Union[WorkFlowGraph, ActionGraph], |
|
|
benchmark: Benchmark, |
|
|
eval_mode: str = "test", |
|
|
indices: Optional[List[int]] = None, |
|
|
sample_k: Optional[int] = None, |
|
|
seed: Optional[int] = None, |
|
|
verbose: Optional[bool] = None, |
|
|
update_agents: Optional[bool] = False, |
|
|
**kwargs |
|
|
) -> dict: |
|
|
""" |
|
|
Evaluate the performance of the workflow on the benchmark. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph or ActionGraph): The workflow to evaluate. |
|
|
benchmark (Benchmark): The benchmark to evaluate the workflow on. |
|
|
eval_mode (str): which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"]. |
|
|
indices (List[int], optional): The indices of the data to evaluate the workflow on. |
|
|
sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used. |
|
|
verbose (bool, optional): Whether to print the evaluation progress. If not provided, the `self.verbose` will be used. |
|
|
update_agents (bool, optional): Whether to update the agents in the agent manager. Only used when the workflow graph is a WorkFlowGraph. |
|
|
Returns: |
|
|
dict: The average metrics of the workflow evaluation. |
|
|
""" |
|
|
|
|
|
self._evaluation_records.clear() |
|
|
|
|
|
|
|
|
if isinstance(graph, WorkFlowGraph) and update_agents: |
|
|
if self.agent_manager is None: |
|
|
raise ValueError(f"`agent_manager` is not provided in {type(self).__name__}. Please provide an agent manager when evaluating a WorkFlowGraph.") |
|
|
self.agent_manager.update_agents_from_workflow(workflow_graph=graph, llm_config=self.llm.config, **kwargs) |
|
|
|
|
|
data = self._get_eval_data(benchmark=benchmark, eval_mode=eval_mode, indices=indices, sample_k=sample_k, seed=seed) |
|
|
results = self._evaluate_graph(graph=graph, data=data, benchmark=benchmark, verbose=verbose, **kwargs) |
|
|
return results |
|
|
|
|
|
def _execute_workflow_graph(self, graph: WorkFlowGraph, inputs: dict, return_trajectory: bool = False, **kwargs) -> Union[str, Tuple[str, List[Message]]]: |
|
|
""" |
|
|
Execute the workflow graph and return the output. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph): The workflow graph to execute |
|
|
inputs (dict): The inputs to the workflow graph |
|
|
**kwargs: Additional arguments for workflow graph execution |
|
|
|
|
|
Returns: |
|
|
str: The output of the workflow graph |
|
|
""" |
|
|
if self.agent_manager is None: |
|
|
raise ValueError(f"`agent_manager` is not provided in {type(self).__name__}. Please provide an agent manager when evaluating a WorkFlowGraph.") |
|
|
|
|
|
|
|
|
graph_copy = WorkFlowGraph(goal=graph.goal, graph=graph) |
|
|
graph_copy.reset_graph() |
|
|
workflow = WorkFlow(llm=self.llm, graph=graph_copy, agent_manager=self.agent_manager, **kwargs) |
|
|
output: str = workflow.execute(inputs=inputs, **kwargs) |
|
|
if return_trajectory: |
|
|
return output, workflow.environment.get() |
|
|
return output |
|
|
|
|
|
def _execute_action_graph(self, graph: ActionGraph, inputs: dict, **kwargs) -> dict: |
|
|
""" |
|
|
Execute the action graph and return the output. |
|
|
|
|
|
Args: |
|
|
graph (ActionGraph): The action graph to execute |
|
|
inputs (dict): The inputs to the action graph |
|
|
**kwargs: Additional arguments for action graph execution |
|
|
|
|
|
Returns: |
|
|
dict: The output of the action graph |
|
|
""" |
|
|
output: dict = graph.execute(**inputs, **kwargs) |
|
|
return output |
|
|
|
|
|
def _evaluate_single_example(self, graph: Union[WorkFlowGraph, ActionGraph], example: dict, benchmark: Benchmark, **kwargs) -> Optional[dict]: |
|
|
""" |
|
|
Evaluate a single data example through the workflow and save the evaluation metrics to the evaluation records. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph or ActionGraph): The workflow to execute |
|
|
example (dict): Single input data example |
|
|
**kwargs: Additional arguments for workflow execution |
|
|
|
|
|
Returns: |
|
|
Optional[dict]: Evaluation metrics for this example, None if failed |
|
|
""" |
|
|
try: |
|
|
|
|
|
inputs: dict = self.collate_func(example) |
|
|
if not isinstance(inputs, dict): |
|
|
raise ValueError(f"The collate_func should return a dictionary. Got {type(inputs)}.") |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(graph, ActionGraph): |
|
|
output: dict = self._execute_action_graph(graph=graph, inputs=inputs, **kwargs) |
|
|
elif isinstance(graph, WorkFlowGraph): |
|
|
workflow_graph_outputs = self._execute_workflow_graph(graph=graph, inputs=inputs, return_trajectory=True, **kwargs) |
|
|
output: str = workflow_graph_outputs[0] |
|
|
trajectory: List[Message] = workflow_graph_outputs[1] |
|
|
else: |
|
|
raise ValueError(f"Invalid workflow type: {type(graph)}. Must be WorkFlowGraph or ActionGraph.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output = self.output_postprocess_func(output) |
|
|
|
|
|
|
|
|
label = benchmark.get_label(example) |
|
|
metrics = benchmark.evaluate(prediction=output, label=label) |
|
|
print("metrics", metrics) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
example_id = benchmark.get_id(example=example) |
|
|
self._evaluation_records[example_id] = { |
|
|
"prediction": output, |
|
|
"label": label, |
|
|
"metrics": metrics |
|
|
} |
|
|
if isinstance(graph, WorkFlowGraph): |
|
|
self._evaluation_records[example_id]["trajectory"] = trajectory |
|
|
except Exception as e: |
|
|
logger.warning(f"Error evaluating example and set the metrics to None:\nExample: {example}\nError: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return metrics |
|
|
|
|
|
def _single_evaluate(self, graph: Union[WorkFlowGraph, ActionGraph], data: List[dict], benchmark: Benchmark, verbose: Optional[bool] = None, **kwargs) -> List[dict]: |
|
|
""" |
|
|
Evaluate workflow on data using single thread. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph or ActionGraph): The workflow to evaluate |
|
|
data (List[dict]): List of input data |
|
|
benchmark (Benchmark): The benchmark to evaluate the workflow on |
|
|
verbose (bool): Whether to show progress bar |
|
|
**kwargs: Additional arguments for workflow execution |
|
|
|
|
|
Returns: |
|
|
List[dict]: List of valid evaluation metrics |
|
|
""" |
|
|
if not data: |
|
|
logger.warning("No data to evaluate. Return an empty list.") |
|
|
return [] |
|
|
|
|
|
results = [] |
|
|
if verbose: |
|
|
progress_bar = tqdm(data, desc="Evaluating workflow", total=len(data)) |
|
|
for example in data: |
|
|
result = self._evaluate_single_example(graph, example, benchmark, **kwargs) |
|
|
|
|
|
|
|
|
results.append(result) |
|
|
if verbose: |
|
|
progress_bar.update(1) |
|
|
if verbose: |
|
|
progress_bar.close() |
|
|
return results |
|
|
|
|
|
def _create_new_agent_manager(self) -> AgentManager: |
|
|
"""Create a new agent manager with the same configuration but new locks""" |
|
|
if self.agent_manager is None: |
|
|
return None |
|
|
|
|
|
new_manager = AgentManager(agents=self.agent_manager.agents, storage_handler=self.agent_manager.storage_handler) |
|
|
return new_manager |
|
|
|
|
|
def _get_thread_agent_manager(self) -> AgentManager: |
|
|
"""Get or create thread-specific agent manager""" |
|
|
if self.agent_manager is None: |
|
|
return None |
|
|
thread_id = threading.get_ident() |
|
|
if thread_id not in self._thread_agent_managers: |
|
|
new_manager = self._create_new_agent_manager() |
|
|
self._thread_agent_managers[thread_id] = new_manager |
|
|
return self._thread_agent_managers[thread_id] |
|
|
|
|
|
def _evaluate_single_example_with_context(self, graph: Union[WorkFlowGraph, ActionGraph], example: dict, benchmark: Benchmark, **kwargs) -> Optional[dict]: |
|
|
"""Wrapper that sets up thread-specific context before running evaluation""" |
|
|
thread_agent_manager = self._get_thread_agent_manager() |
|
|
if thread_agent_manager is None: |
|
|
return self._evaluate_single_example(graph, example, benchmark, **kwargs) |
|
|
|
|
|
|
|
|
original_agent_manager = self.agent_manager |
|
|
try: |
|
|
|
|
|
self.agent_manager = thread_agent_manager |
|
|
return self._evaluate_single_example(graph, example, benchmark, **kwargs) |
|
|
finally: |
|
|
|
|
|
self.agent_manager = original_agent_manager |
|
|
|
|
|
def _parallel_evaluate(self, graph: Union[WorkFlowGraph, ActionGraph], data: List[dict], benchmark: Benchmark, verbose: Optional[bool] = None, **kwargs) -> List[dict]: |
|
|
if not data: |
|
|
logger.warning("No data to evaluate. Return an empty list.") |
|
|
return [] |
|
|
|
|
|
results = [] |
|
|
with ThreadPoolExecutor(max_workers=self.num_workers) as executor: |
|
|
futures = { |
|
|
executor.submit( |
|
|
contextvars.copy_context().run, |
|
|
self._evaluate_single_example_with_context, |
|
|
graph, example, benchmark, **kwargs |
|
|
): example |
|
|
for example in data |
|
|
} |
|
|
|
|
|
if verbose: |
|
|
progress_bar = tqdm(desc="Evaluating workflow", total=len(futures)) |
|
|
|
|
|
for future in as_completed(futures): |
|
|
result = future.result(timeout=120) |
|
|
if result is not None: |
|
|
results.append(result) |
|
|
if verbose: |
|
|
progress_bar.update(1) |
|
|
|
|
|
if verbose: |
|
|
progress_bar.close() |
|
|
return results |
|
|
|
|
|
def _calculate_average_score(self, scores: List[dict]) -> dict: |
|
|
""" |
|
|
Calculate the average score from a list of scores. |
|
|
|
|
|
Args: |
|
|
scores (List[dict]): List of evaluation scores |
|
|
|
|
|
Returns: |
|
|
dict: Average metrics |
|
|
""" |
|
|
if not scores: |
|
|
logger.warning("No scores found. Return an empty dictionary.") |
|
|
return {} |
|
|
num_total_items = len(scores) |
|
|
first_valid_score = None |
|
|
for score in scores: |
|
|
if score is not None: |
|
|
first_valid_score = score |
|
|
break |
|
|
if first_valid_score is None: |
|
|
logger.warning("No valid scores found. Return an empty dictionary.") |
|
|
return {} |
|
|
|
|
|
return {k: sum(d[k] for d in scores if d is not None) / num_total_items for k in first_valid_score} |
|
|
|
|
|
|
|
|
def _evaluate_graph(self, graph: Union[WorkFlowGraph, ActionGraph], data: List[dict], benchmark: Benchmark, verbose: Optional[bool] = None, **kwargs) -> dict: |
|
|
""" |
|
|
Evaluate the workflow on the data. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph or ActionGraph): The workflow to evaluate |
|
|
data (List[dict]): List of input data to evaluate |
|
|
benchmark (Benchmark): The benchmark to evaluate the workflow on |
|
|
verbose (bool, optional): Whether to print the evaluation progress. If not provided, the `self.verbose` will be used. |
|
|
**kwargs: Additional arguments passed to workflow execution |
|
|
|
|
|
Returns: |
|
|
dict: The average metrics of the workflow evaluation |
|
|
""" |
|
|
if not data: |
|
|
logger.warning("No data to evaluate. Return an empty dictionary.") |
|
|
return {} |
|
|
|
|
|
verbose = verbose if verbose is not None else self.verbose |
|
|
if self.num_workers > 1: |
|
|
results = self._parallel_evaluate(graph, data, benchmark, verbose, **kwargs) |
|
|
else: |
|
|
results = self._single_evaluate(graph, data, benchmark, verbose, **kwargs) |
|
|
|
|
|
return self._calculate_average_score(results) |
|
|
|
|
|
def get_example_evaluation_record(self, benchmark: Benchmark, example: Any) -> Optional[dict]: |
|
|
""" |
|
|
Get the evaluation record for a given example. |
|
|
""" |
|
|
example_id = benchmark.get_id(example=example) |
|
|
return self._evaluation_records.get(example_id, None) |
|
|
|
|
|
def get_evaluation_record_by_id(self, benchmark: Benchmark, example_id: str, eval_mode: str = "test") -> Optional[dict]: |
|
|
""" |
|
|
Get the evaluation record for a given example id. |
|
|
""" |
|
|
example = benchmark.get_example_by_id(example_id=example_id, mode=eval_mode) |
|
|
return self.get_example_evaluation_record(benchmark=benchmark, example=example) |
|
|
|
|
|
def get_all_evaluation_records(self) -> dict: |
|
|
""" |
|
|
Get all the evaluation records. |
|
|
""" |
|
|
return self._evaluation_records.copy() |
|
|
|
|
|
async def async_evaluate( |
|
|
self, |
|
|
graph: Union[WorkFlowGraph, ActionGraph], |
|
|
benchmark: Benchmark, |
|
|
eval_mode: str = "test", |
|
|
indices: Optional[List[int]] = None, |
|
|
sample_k: Optional[int] = None, |
|
|
seed: Optional[int] = None, |
|
|
verbose: Optional[bool] = None, |
|
|
**kwargs |
|
|
) -> dict: |
|
|
""" |
|
|
Asynchronously evaluate the performance of the workflow on the benchmark. |
|
|
|
|
|
Args: |
|
|
graph (WorkFlowGraph or ActionGraph): The workflow to evaluate. |
|
|
benchmark (Benchmark): The benchmark to evaluate the workflow on. |
|
|
eval_mode (str): which split of the benchmark to evaluate the workflow on. Choices: ["test", "dev", "train"]. |
|
|
indices (List[int], optional): The indices of the data to evaluate the workflow on. |
|
|
sample_k (int, optional): The number of data to evaluate the workflow on. If provided, a random sample of size `sample_k` will be used. |
|
|
verbose (bool, optional): Whether to print the evaluation progress. If not provided, the `self.verbose` will be used. |
|
|
|
|
|
Returns: |
|
|
dict: The average metrics of the workflow evaluation. |
|
|
""" |
|
|
|
|
|
self._evaluation_records.clear() |
|
|
data = self._get_eval_data(benchmark=benchmark, eval_mode=eval_mode, indices=indices, sample_k=sample_k, seed=seed) |
|
|
|
|
|
if not data: |
|
|
logger.warning("No data to evaluate. Return an empty dictionary.") |
|
|
return {} |
|
|
|
|
|
verbose = verbose if verbose is not None else self.verbose |
|
|
|
|
|
|
|
|
sem = asyncio.Semaphore(self.num_workers) |
|
|
|
|
|
async def process_with_semaphore(example): |
|
|
async with sem: |
|
|
try: |
|
|
return await self._async_evaluate_single_example( |
|
|
graph=graph, |
|
|
example=example, |
|
|
benchmark=benchmark, |
|
|
**kwargs |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Async evaluation failed for example with semaphore: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
tasks = [process_with_semaphore(example) for example in data] |
|
|
|
|
|
|
|
|
if verbose: |
|
|
results = await tqdm_asyncio.gather( |
|
|
*tasks, |
|
|
desc=f"Evaluating {benchmark.name}", |
|
|
total=len(data) |
|
|
) |
|
|
else: |
|
|
results = await asyncio.gather(*tasks) |
|
|
|
|
|
return self._calculate_average_score(results) |
|
|
|
|
|
async def _async_evaluate_single_example(self, graph: Union[WorkFlowGraph, ActionGraph], example: dict, benchmark: Benchmark, **kwargs) -> Optional[dict]: |
|
|
""" |
|
|
Asynchronously evaluate a single example. |
|
|
""" |
|
|
import time |
|
|
try: |
|
|
|
|
|
inputs: dict = self.collate_func(example) |
|
|
if not isinstance(inputs, dict): |
|
|
raise ValueError(f"The collate_func should return a dictionary. Got {type(inputs)}.") |
|
|
|
|
|
|
|
|
t1 = time.time() |
|
|
if isinstance(graph, ActionGraph): |
|
|
output: dict = await self._async_execute_action_graph(graph=graph, inputs=inputs, **kwargs) |
|
|
elif isinstance(graph, WorkFlowGraph): |
|
|
workflow_graph_outputs = await self._async_execute_workflow_graph(graph=graph, inputs=inputs, return_trajectory=True, **kwargs) |
|
|
output: str = workflow_graph_outputs[0] |
|
|
trajectory: List[Message] = workflow_graph_outputs[1] |
|
|
else: |
|
|
raise ValueError(f"Invalid workflow type: {type(graph)}. Must be WorkFlowGraph or ActionGraph.") |
|
|
|
|
|
|
|
|
output = self.output_postprocess_func(output) |
|
|
t2 = time.time() |
|
|
print("time for query", t2 - t1) |
|
|
|
|
|
label = benchmark.get_label(example) |
|
|
|
|
|
t1 = time.time() |
|
|
|
|
|
if hasattr(benchmark, 'async_evaluate') and callable(getattr(benchmark, 'async_evaluate')): |
|
|
metrics = await benchmark.async_evaluate(prediction=output, label=label) |
|
|
else: |
|
|
metrics = benchmark.evaluate(prediction=output, label=label) |
|
|
|
|
|
t2 = time.time() |
|
|
print("time for run code", t2 - t1) |
|
|
|
|
|
|
|
|
example_id = benchmark.get_id(example=example) |
|
|
self._evaluation_records[example_id] = { |
|
|
"prediction": output, |
|
|
"label": label, |
|
|
"metrics": metrics |
|
|
} |
|
|
if isinstance(graph, WorkFlowGraph): |
|
|
self._evaluation_records[example_id]["trajectory"] = trajectory |
|
|
except Exception as e: |
|
|
logger.warning(f"Error evaluating example and set the metrics to None:\nExample: {example}\nError: {str(e)}") |
|
|
return None |
|
|
return metrics |
|
|
|
|
|
async def _async_execute_action_graph(self, graph: ActionGraph, inputs: dict, **kwargs) -> dict: |
|
|
""" |
|
|
Asynchronously execute the action graph. |
|
|
""" |
|
|
return await graph.async_execute(**inputs, **kwargs) |
|
|
|
|
|
async def _async_execute_workflow_graph(self, graph: WorkFlowGraph, inputs: dict, return_trajectory: bool = False, **kwargs) -> Union[str, Tuple[str, List[Message]]]: |
|
|
""" |
|
|
Asynchronously execute the workflow graph. |
|
|
""" |
|
|
if self.agent_manager is None: |
|
|
raise ValueError("`agent_manager` is not provided. Please provide an agent manager when evaluating a WorkFlowGraph.") |
|
|
|
|
|
|
|
|
graph_copy = WorkFlowGraph(goal=graph.goal, graph=graph) |
|
|
graph_copy.reset_graph() |
|
|
|
|
|
|
|
|
local_agent_manager = AgentManager( |
|
|
agents=self.agent_manager.agents, |
|
|
storage_handler=self.agent_manager.storage_handler |
|
|
) |
|
|
|
|
|
workflow = WorkFlow( |
|
|
llm=self.llm, |
|
|
graph=graph_copy, |
|
|
agent_manager=local_agent_manager, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
output: str = await workflow.async_execute(inputs=inputs, **kwargs) |
|
|
if return_trajectory: |
|
|
return output, workflow.environment.get() |
|
|
return output |
|
|
|
|
|
|