Spaces:
Running
Running
| import os | |
| import torch | |
| import datasets | |
| import transformers | |
| import json | |
| from .schema import ActionInfo, EnvException, EnhancedJSONEncoder | |
| from reactagent.prompt2model.prompt_parser import MockPromptSpec, TaskType | |
| from reactagent.prompt2model.dataset_retriever import DescriptionDatasetRetriever | |
| from reactagent.prompt2model.dataset_generator import PromptBasedDatasetGenerator, DatasetSplit | |
| from reactagent.prompt2model.dataset_processor import TextualizeProcessor | |
| from reactagent.prompt2model.model_retriever import DescriptionModelRetriever | |
| from reactagent.prompt2model.model_trainer import GenerationModelTrainer | |
| from reactagent.prompt2model.model_executor import GenerationModelExecutor, ModelOutput | |
| from reactagent.prompt2model.model_evaluator import Seq2SeqEvaluator | |
| def generate_dataset(instruction, examples, save_dir, num_train, num_valid, num_test, work_dir = '.', **kwargs): | |
| try: | |
| num_train = int(num_train) | |
| num_valid = int(num_valid) | |
| num_test = int(num_test) | |
| except ValueError: | |
| raise EnvException("Number of examples should be an integer") | |
| prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples=examples) | |
| generator = PromptBasedDatasetGenerator() | |
| dataset_dict = generator.generate_dataset_dict(prompt_spec, { | |
| DatasetSplit.TRAIN: num_train, | |
| DatasetSplit.VAL: num_valid, | |
| DatasetSplit.TEST: num_test | |
| }) | |
| save_path = os.path.join(work_dir, save_dir) | |
| dataset_dict.save_to_disk(save_path) | |
| return f"Dataset successfully generated and saved to {save_path}" | |
| def retrieve_dataset(instruction, save_dir, work_dir = '.', **kwargs): | |
| prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
| retriever = DescriptionDatasetRetriever() | |
| dataset_dict = retriever.retrieve_dataset_dict(prompt_spec) | |
| save_path = os.path.join(work_dir, save_dir) | |
| dataset_dict.save_to_disk(save_path) | |
| return f"Dataset successfully generated and saved to {save_path}" | |
| def retrieve_model(instruction, work_dir = '.', **kwargs): | |
| prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
| retriever = DescriptionModelRetriever(use_bm25=True, use_HyDE=True) | |
| top_models = retriever.retrieve(prompt_spec) | |
| return "Top Models:\n" + "".join(f"{i+1}. {model}\n" for i, model in enumerate(top_models)) | |
| def process_dataset(instruction, load_dirs, save_dirs, work_dir = '.', **kwargs): | |
| prompt_spec = MockPromptSpec(TaskType.TEXT_GENERATION, instruction=instruction, examples="") | |
| load_dirs = load_dirs.split(':') | |
| save_dirs = save_dirs.split(':') | |
| if len(load_dirs) != len(save_dirs): | |
| raise EnvException("Number of load directories should match number of save directories") | |
| load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
| save_paths = [os.path.join(work_dir, save_dir) for save_dir in save_dirs] | |
| # load the datasets | |
| dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
| # process the datasets | |
| processor = TextualizeProcessor(has_encoder=True) | |
| modified_dataset_dicts = processor.process_dataset_dict(prompt_spec, dataset_dicts) | |
| # save the processed datasets | |
| for dataset_dict, save_path in zip(modified_dataset_dicts, save_paths): | |
| dataset_dict.save_to_disk(save_path) | |
| return f"Data successfully processed and saved to {save_paths}" | |
| def train_model(model_name, load_dirs, result_dir, epochs, batch_size, warmup_steps, weight_decay, learning_rate, work_dir = '.', **kwargs): | |
| try: | |
| epochs = int(epochs) | |
| batch_size = int(batch_size) | |
| warmup_steps = int(warmup_steps) | |
| weight_decay = float(weight_decay) | |
| learning_rate = float(learning_rate) | |
| except ValueError: | |
| raise EnvException("Numerical parameters should be integers or floats as appropriate") | |
| load_dirs = load_dirs.split(':') | |
| result_dir = os.path.join(work_dir, result_dir) | |
| # load the datasets | |
| load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
| dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
| training_datasets = [dataset_dict["train"] for dataset_dict in dataset_dicts] | |
| validation_datasets = [dataset_dict["val"] for dataset_dict in dataset_dicts] | |
| trainer = GenerationModelTrainer( | |
| model_name, | |
| has_encoder=True, | |
| executor_batch_size=batch_size, | |
| tokenizer_max_length=1024, | |
| sequence_max_length=1280, | |
| ) | |
| hparams ={ | |
| "output_dir": os.path.join(result_dir, "training_output"), | |
| "save_strategy": "epoch", | |
| "num_train_epochs": epochs, | |
| "per_device_train_batch_size": batch_size, | |
| "evaluation_strategy": "epoch", | |
| "warmup_steps": warmup_steps, | |
| "weight_decay": weight_decay, | |
| "learning_rate": learning_rate, | |
| }, | |
| trained_model, trained_tokenizer = trainer.train_model( | |
| hyperparameter_choices=hparams, | |
| training_datasets=training_datasets, | |
| validation_datasets=validation_datasets, | |
| ) | |
| trained_model.save_pretrained(os.path.join(result_dir, "trained_model")) | |
| trained_tokenizer.save_pretrained(os.path.join(result_dir, "trained_tokenizer")) | |
| return f"Model and Tokenizer successfully trained and saved respectively to {result_dir}/trained_model and {result_dir}/trained_tokenizer" | |
| def execute_model(result_dir, load_dirs, save_path, batch_size, input_column, work_dir = '.', **kwargs): | |
| load_dirs = load_dirs.split(':') | |
| result_dir = os.path.join(work_dir, result_dir) | |
| save_path = os.path.join(work_dir, save_path) | |
| try: | |
| batch_size = int(batch_size) | |
| except ValueError: | |
| raise EnvException("Batch size should be an integer") | |
| # load the datasets | |
| load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
| dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
| test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] | |
| test_dataset = datasets.concatenate_datasets(test_datasets) | |
| trained_model_path = os.path.join(result_dir, "trained_model") | |
| trained_tokenizer_path = os.path.join(result_dir, "trained_tokenizer") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| trained_model = transformers.AutoModelForSeq2SeqLM.from_pretrained(trained_model_path).to(device) | |
| trained_tokenizer = transformers.AutoTokenizer.from_pretrained(trained_tokenizer_path) | |
| executor = GenerationModelExecutor( | |
| trained_model, | |
| trained_tokenizer, | |
| batch_size, | |
| tokenizer_max_length=1024, | |
| sequence_max_length=1280, | |
| ) | |
| outputs = executor.make_prediction( | |
| test_set=test_dataset, | |
| input_column=input_column | |
| ) | |
| with open(save_path, 'w') as f: | |
| json.dump(outputs, f, cls=EnhancedJSONEncoder) | |
| return f"Model successfully executed on the test sets of the specified datasets and saved to {save_path}" | |
| def evaluate_model(load_dirs, save_path, output_column, work_dir = '.', **kwargs): | |
| load_dirs = load_dirs.split(':') | |
| # load the datasets | |
| load_paths = [os.path.join(work_dir, load_dir) for load_dir in load_dirs] | |
| dataset_dicts = [datasets.load_from_disk(load_path) for load_path in load_paths] | |
| test_datasets = [dataset_dict["test"] for dataset_dict in dataset_dicts] | |
| test_dataset = datasets.concatenate_datasets(test_datasets) | |
| save_path = os.path.join(work_dir, save_path) | |
| with open(save_path, 'r') as f: | |
| outputs = json.load(f) | |
| outputs = [ModelOutput(**output) for output in outputs] | |
| evaluator = Seq2SeqEvaluator() | |
| metric_values = evaluator.evaluate_model( | |
| test_dataset, | |
| gt_column=output_column, | |
| predictions=outputs, | |
| encoder_model_name="xlm-roberta-base", | |
| ) | |
| return f"Evaluation metrics: {metric_values}" | |
| P2M_ACTIONS = [ | |
| ActionInfo( | |
| name="Retrieve Model", | |
| description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModel.from_pretrained function.", | |
| usage={ | |
| "instruction": "an instruction on how to generate the output from the input", | |
| }, | |
| return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", | |
| is_primitive=False, | |
| function=retrieve_model | |
| ), | |
| ] | |
| # P2M_ACTIONS = [ | |
| # ActionInfo( | |
| # name="Generate Dataset", | |
| # description="Generate a dataset based on an instruction and examples. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", | |
| # usage={ | |
| # "instruction": "an instruction on how to generate the output from the input", | |
| # "examples": "examples of input-output pairs", | |
| # "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/generated/", | |
| # "num_train": "number of examples to generate in the training set", | |
| # "num_valid": "number of examples to generate in the validation set", | |
| # "num_test": "number of examples to generate in the test set", | |
| # }, | |
| # return_value="The observation will be a success message if the dataset was generated successfully. Otherwise, an error message will be returned.", | |
| # is_primitive=False, | |
| # function=generate_dataset | |
| # ), | |
| # ActionInfo( | |
| # name="Retrieve Dataset", | |
| # description="Retrieve a suitable dataset based on a detailed description of the requirements. You can load the dataset later from `save_dir` using the load_from_disk function of the HuggingFace datasets library.", | |
| # usage={ | |
| # "instruction": "an instruction on how to generate the output from the input", | |
| # "save_dir": "directory to save the generated dataset dict to. We recommend saving to data/retrieved/", | |
| # }, | |
| # return_value="The observation will be a success message if the dataset was retrieved successfully. Otherwise, an error message will be returned.", | |
| # is_primitive=False, | |
| # function=retrieve_dataset | |
| # ), | |
| # ActionInfo( | |
| # name="Retrieve Model", | |
| # description="Retrieve a suitable model based on a detailed description of the requirements. You can obtain the model given the name using the transformers.AutoModelForSeq2SeqLM.from_pretrained function.", | |
| # usage={ | |
| # "instruction": "an instruction on how to generate the output from the input", | |
| # }, | |
| # return_value="The observation will be a list of suitable models. You can choose one of them based on the requirements.", | |
| # is_primitive=False, | |
| # function=retrieve_model | |
| # ), | |
| # ActionInfo( | |
| # name="Process Dataset", | |
| # description="Process dataset based on a detailed description of the requirements. You can load the processed data later from `save_dirs` using the load_from_disk function of the HuggingFace datasets library. The input text will be in the `model_input` column and the output text will be in the `model_output` column.", | |
| # usage={ | |
| # "instruction": "an instruction on how to generate the output from the input", | |
| # "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
| # "save_dirs": "directories to save the processed dataset dicts to, separated by colons. The order should match the order of the loaded datasets. We recommend saving to data/processed/", | |
| # }, | |
| # return_value="The observation will be a success message if the data was processed successfully. Otherwise, an error message will be returned.", | |
| # is_primitive=False, | |
| # function=process_dataset | |
| # ), | |
| # ActionInfo( | |
| # name="Train Model", | |
| # description="Train a Seq2Seq model from HuggingFace transformers library using the processed datasets and given hyperparameters.", | |
| # usage={ | |
| # "model_name": "name of the model to train", | |
| # "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
| # "result_dir": "directory to save the trained model and tokenizer to. We recommend using results/{trial_id}/. The trained model will be available as `{result_dir}/trained_model/` and the tokenizer will be available as `{result_dir}/trained_tokenizer/`.", | |
| # "epochs": "number of epochs to train the model for", | |
| # "batch_size": "batch size for training the model", | |
| # "warmup_steps": "number of warmup steps for the optimizer", | |
| # "weight_decay": "weight decay for the optimizer", | |
| # "learning_rate": "learning rate for the optimizer", | |
| # }, | |
| # return_value="The observation will be a success message if the model was trained successfully. Otherwise, an error message will be returned.", | |
| # is_primitive=False, | |
| # function=train_model | |
| # ), | |
| # ActionInfo( | |
| # name="Execute Model on Test Set", | |
| # description="Execute a trained model on the test sets of specified dataset dicts.", | |
| # usage={ | |
| # "result_dir": "directory where the trained model and tokenizer are saved", | |
| # "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
| # "save_path": "file to save the results of the model execution in json format", | |
| # "batch_size": "batch size for executing the model", | |
| # "input_column": "column name of the input text", | |
| # }, | |
| # return_value="The observation will be a success message if the model was executed successfully. Otherwise, an error message will be returned.", | |
| # is_primitive=False, | |
| # function=execute_model, | |
| # ), | |
| # ActionInfo( | |
| # name="Evaluate Model", | |
| # description="Evaluate a trained model on the test sets of specified dataset dicts.", | |
| # usage={ | |
| # "load_dirs": "directories to load the dataset dicts from, separated by colons", | |
| # "save_path": "file to load the results of the model execution in json format", | |
| # "output_column": "column name of the output text", | |
| # }, | |
| # return_value="The values for various evaluation metrics will be returned.", | |
| # is_primitive=False, | |
| # function=evaluate_model, | |
| # ) | |
| # ] | |