koichi12 commited on
Commit
e549173
·
verified ·
1 Parent(s): f39d59b

Add files using upload-large-folder tool

Browse files
This view is limited to 50 files because it contains too many changes.   See raw diff
Files changed (50) hide show
  1. .gitattributes +1 -0
  2. .venv/lib/python3.11/site-packages/ray/train/__pycache__/_checkpoint.cpython-311.pyc +0 -0
  3. .venv/lib/python3.11/site-packages/ray/train/__pycache__/trainer.cpython-311.pyc +0 -0
  4. .venv/lib/python3.11/site-packages/ray/train/examples/__pycache__/__init__.cpython-311.pyc +0 -0
  5. .venv/lib/python3.11/site-packages/ray/train/examples/__pycache__/mlflow_simple_example.cpython-311.pyc +0 -0
  6. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__init__.py +0 -0
  7. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/__init__.cpython-311.pyc +0 -0
  8. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_cifar_pbt_example.cpython-311.pyc +0 -0
  9. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_example.cpython-311.pyc +0 -0
  10. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_pytorch_example.cpython-311.pyc +0 -0
  11. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_tune_example.cpython-311.pyc +0 -0
  12. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_cifar_pbt_example.py +210 -0
  13. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_example.py +286 -0
  14. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_pytorch_example.py +270 -0
  15. .venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_tune_example.py +139 -0
  16. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__init__.py +0 -0
  17. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/__init__.cpython-311.pyc +0 -0
  18. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_fashion_mnist_example.cpython-311.pyc +0 -0
  19. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_linear_example.cpython-311.pyc +0 -0
  20. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_quick_start.cpython-311.pyc +0 -0
  21. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_regression_example.cpython-311.pyc +0 -0
  22. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/tune_cifar_torch_pbt_example.cpython-311.pyc +0 -0
  23. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/tune_torch_regression_example.cpython-311.pyc +0 -0
  24. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__init__.py +0 -0
  25. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__pycache__/__init__.cpython-311.pyc +0 -0
  26. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__pycache__/auto_pipeline_for_host_to_device_data_transfer.cpython-311.pyc +0 -0
  27. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/auto_pipeline_for_host_to_device_data_transfer.py +161 -0
  28. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_fashion_mnist_example.py +152 -0
  29. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_linear_example.py +147 -0
  30. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_quick_start.py +110 -0
  31. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_regression_example.py +160 -0
  32. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/tune_cifar_torch_pbt_example.py +253 -0
  33. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch/tune_torch_regression_example.py +82 -0
  34. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__init__.py +0 -0
  35. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__pycache__/__init__.cpython-311.pyc +0 -0
  36. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__pycache__/distributed_sage_example.cpython-311.pyc +0 -0
  37. .venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/distributed_sage_example.py +228 -0
  38. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__init__.py +0 -0
  39. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/__init__.cpython-311.pyc +0 -0
  40. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_autoencoder_example.cpython-311.pyc +0 -0
  41. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_mnist_example.cpython-311.pyc +0 -0
  42. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_quick_start.cpython-311.pyc +0 -0
  43. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_regression_example.cpython-311.pyc +0 -0
  44. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tune_tensorflow_autoencoder_example.cpython-311.pyc +0 -0
  45. .venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tune_tensorflow_mnist_example.cpython-311.pyc +0 -0
  46. .venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_autoencoder_example.py +174 -0
  47. .venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_mnist_example.py +135 -0
  48. .venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_quick_start.py +87 -0
  49. .venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_regression_example.py +111 -0
  50. .venv/lib/python3.11/site-packages/ray/train/examples/tf/tune_tensorflow_mnist_example.py +80 -0
.gitattributes CHANGED
@@ -179,3 +179,4 @@ tuning-competition-baseline/.venv/lib/python3.11/site-packages/torch/_inductor/_
179
  .venv/lib/python3.11/site-packages/ray/rllib/algorithms/__pycache__/algorithm.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
180
  .venv/lib/python3.11/site-packages/ray/rllib/algorithms/__pycache__/algorithm_config.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
181
  .venv/lib/python3.11/site-packages/ray/rllib/env/__pycache__/multi_agent_episode.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
 
 
179
  .venv/lib/python3.11/site-packages/ray/rllib/algorithms/__pycache__/algorithm.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
180
  .venv/lib/python3.11/site-packages/ray/rllib/algorithms/__pycache__/algorithm_config.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
181
  .venv/lib/python3.11/site-packages/ray/rllib/env/__pycache__/multi_agent_episode.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
182
+ .venv/lib/python3.11/site-packages/ray/tune/execution/__pycache__/tune_controller.cpython-311.pyc filter=lfs diff=lfs merge=lfs -text
.venv/lib/python3.11/site-packages/ray/train/__pycache__/_checkpoint.cpython-311.pyc ADDED
Binary file (21.1 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/__pycache__/trainer.cpython-311.pyc ADDED
Binary file (9.04 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (191 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/__pycache__/mlflow_simple_example.cpython-311.pyc ADDED
Binary file (1.45 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (199 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_cifar_pbt_example.cpython-311.pyc ADDED
Binary file (9.37 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_example.cpython-311.pyc ADDED
Binary file (12.9 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_pytorch_example.cpython-311.pyc ADDED
Binary file (12.5 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/__pycache__/horovod_tune_example.cpython-311.pyc ADDED
Binary file (7.94 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_cifar_pbt_example.py ADDED
@@ -0,0 +1,210 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import tempfile
3
+
4
+ import numpy as np
5
+ import torch
6
+ import torch.nn as nn
7
+ import torchvision
8
+ import torchvision.transforms as transforms
9
+ from torch.utils.data import DataLoader
10
+ from torchvision.models import resnet18
11
+
12
+ import ray
13
+ import ray.cloudpickle as cpickle
14
+ import ray.train.torch
15
+ from ray import train, tune
16
+ from ray.train import (
17
+ Checkpoint,
18
+ CheckpointConfig,
19
+ FailureConfig,
20
+ RunConfig,
21
+ ScalingConfig,
22
+ )
23
+ from ray.train.horovod import HorovodTrainer
24
+ from ray.tune.schedulers import create_scheduler
25
+ from ray.tune.tune_config import TuneConfig
26
+ from ray.tune.tuner import Tuner
27
+ from ray.tune.utils.release_test_util import ProgressCallback
28
+
29
+ # The long running version starts 4 trials while only 2 can be run at a time.
30
+ # Thus trials are paused and restored at all times so that every trial can make
31
+ # progress. The PBT scheduler also applies perturbation and mutation,
32
+ # which also involves pausing and restoring.
33
+ # The intention is to stress test the pausing and restoring of trials,
34
+ # especially that there should be no GPU memory leak.
35
+
36
+ # TODO(ml-team): This test is very low signal at the moment.
37
+ # We should further trim it down.
38
+
39
+ CIFAR10_STATS = {
40
+ "mean": (0.4914, 0.4822, 0.4465),
41
+ "std": (0.2023, 0.1994, 0.2010),
42
+ }
43
+
44
+
45
+ def train_loop_per_worker(config):
46
+ import horovod.torch as hvd
47
+
48
+ hvd.init()
49
+ device = ray.train.torch.get_device()
50
+ net = resnet18().to(device)
51
+ optimizer = torch.optim.SGD(
52
+ net.parameters(),
53
+ lr=config["lr"],
54
+ )
55
+ epoch = 0
56
+
57
+ checkpoint = train.get_checkpoint()
58
+ if checkpoint:
59
+ with checkpoint.as_directory() as checkpoint_dir:
60
+ with open(os.path.join(checkpoint_dir, "data.ckpt"), "rb") as fp:
61
+ checkpoint_dict = cpickle.load(fp)
62
+
63
+ model_state = checkpoint_dict["model_state"]
64
+ optimizer_state = checkpoint_dict["optimizer_state"]
65
+ epoch = checkpoint_dict["epoch"] + 1
66
+
67
+ net.load_state_dict(model_state)
68
+ optimizer.load_state_dict(optimizer_state)
69
+
70
+ criterion = nn.CrossEntropyLoss()
71
+ optimizer = hvd.DistributedOptimizer(optimizer)
72
+ np.random.seed(1 + hvd.rank())
73
+ torch.manual_seed(1234)
74
+ # To ensure consistent initialization across workers,
75
+ hvd.broadcast_parameters(net.state_dict(), root_rank=0)
76
+ hvd.broadcast_optimizer_state(optimizer, root_rank=0)
77
+
78
+ trainset = ray.get(config["data"])
79
+
80
+ train_sampler = torch.utils.data.distributed.DistributedSampler(
81
+ trainset, num_replicas=hvd.size(), rank=hvd.rank()
82
+ )
83
+
84
+ # Note, don't set `num_workers` in DataLoader (not even 1),
85
+ # as that will separately start multiple processes (each corresponding to 1 worker)
86
+ # to load the data. This is known to cause issues with Ray.
87
+ trainloader = DataLoader(
88
+ trainset, batch_size=int(config["batch_size"]), sampler=train_sampler
89
+ )
90
+
91
+ for current_epoch in range(epoch, 40): # loop over the dataset multiple times
92
+ running_loss = 0.0
93
+ epoch_steps = 0
94
+ for i, data in enumerate(trainloader):
95
+ # get the inputs; data is a list of [inputs, labels]
96
+ inputs, labels = data
97
+ inputs, labels = inputs.to(device), labels.to(device)
98
+
99
+ # zero the parameter gradients
100
+ optimizer.zero_grad()
101
+
102
+ # forward + backward + optimize
103
+ outputs = net(inputs)
104
+ loss = criterion(outputs, labels)
105
+ loss.backward()
106
+ optimizer.step()
107
+
108
+ # print statistics
109
+ running_loss += loss.item()
110
+ epoch_steps += 1
111
+
112
+ if i % 2000 == 1999: # print every 2000 mini-batches
113
+ print(
114
+ "[%d, %5d] loss: %.3f"
115
+ % (current_epoch + 1, i + 1, running_loss / epoch_steps)
116
+ )
117
+
118
+ if config["smoke_test"]:
119
+ break
120
+
121
+ with tempfile.TemporaryDirectory() as checkpoint_dir:
122
+ with open(os.path.join(checkpoint_dir, "data.ckpt"), "wb") as fp:
123
+ cpickle.dump(
124
+ dict(
125
+ model_state=net.state_dict(),
126
+ optimizer_state=optimizer.state_dict(),
127
+ epoch=current_epoch,
128
+ ),
129
+ fp,
130
+ )
131
+ checkpoint = Checkpoint.from_directory(checkpoint_dir)
132
+ train.report(dict(loss=running_loss / epoch_steps), checkpoint=checkpoint)
133
+
134
+
135
+ if __name__ == "__main__":
136
+ import argparse
137
+
138
+ parser = argparse.ArgumentParser()
139
+ parser.add_argument(
140
+ "--smoke-test", action="store_true", help=("Finish quickly for testing.")
141
+ )
142
+ args = parser.parse_args()
143
+
144
+ if args.smoke_test:
145
+ ray.init()
146
+ else:
147
+ ray.init(address="auto") # assumes ray is started with ray up
148
+
149
+ transform_train = transforms.Compose(
150
+ [
151
+ transforms.RandomCrop(32, padding=4),
152
+ transforms.RandomHorizontalFlip(),
153
+ transforms.ToTensor(),
154
+ transforms.Normalize(CIFAR10_STATS["mean"], CIFAR10_STATS["std"]),
155
+ ]
156
+ ) # meanstd transformation
157
+
158
+ dataset = torchvision.datasets.CIFAR10(
159
+ root="/tmp/data_cifar", train=True, download=True, transform=transform_train
160
+ )
161
+
162
+ horovod_trainer = HorovodTrainer(
163
+ train_loop_per_worker=train_loop_per_worker,
164
+ scaling_config=ScalingConfig(
165
+ use_gpu=False if args.smoke_test else True,
166
+ num_workers=2,
167
+ ),
168
+ train_loop_config={"batch_size": 64, "data": ray.put(dataset)},
169
+ )
170
+
171
+ # ensure that checkpointing works.
172
+ pbt = create_scheduler(
173
+ "pbt",
174
+ perturbation_interval=1, # To make perturb more often.
175
+ hyperparam_mutations={
176
+ "train_loop_config": {"lr": tune.uniform(0.001, 0.1)},
177
+ },
178
+ )
179
+
180
+ tuner = Tuner(
181
+ horovod_trainer,
182
+ param_space={
183
+ "train_loop_config": {
184
+ "lr": 0.1
185
+ if args.smoke_test
186
+ else tune.grid_search([0.1 * i for i in range(1, 5)]), # 4 trials
187
+ "smoke_test": args.smoke_test,
188
+ }
189
+ },
190
+ tune_config=TuneConfig(
191
+ num_samples=2 if args.smoke_test else 1,
192
+ metric="loss",
193
+ mode="min",
194
+ scheduler=pbt,
195
+ ),
196
+ run_config=RunConfig(
197
+ stop={"training_iteration": 1} if args.smoke_test else None,
198
+ failure_config=FailureConfig(fail_fast=False),
199
+ checkpoint_config=CheckpointConfig(num_to_keep=1),
200
+ callbacks=[ProgressCallback()],
201
+ ),
202
+ )
203
+
204
+ result_grid = tuner.fit()
205
+
206
+ # Make sure trials do not fail.
207
+ for result in result_grid:
208
+ assert not result.error
209
+
210
+ print("Best hyperparameters found were: ", result_grid.get_best_result().config)
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_example.py ADDED
@@ -0,0 +1,286 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import os
3
+
4
+ import horovod.torch as hvd
5
+ import torch.nn as nn
6
+ import torch.nn.functional as F
7
+ import torch.optim as optim
8
+ import torch.utils.data.distributed
9
+ from filelock import FileLock
10
+ from torchvision import datasets, transforms
11
+
12
+ import ray
13
+ from ray import train
14
+ from ray.train import ScalingConfig
15
+ from ray.train.horovod import HorovodTrainer
16
+
17
+
18
+ def metric_average(val, name):
19
+ tensor = torch.tensor(val)
20
+ avg_tensor = hvd.allreduce(tensor, name=name)
21
+ return avg_tensor.item()
22
+
23
+
24
+ class Net(nn.Module):
25
+ def __init__(self):
26
+ super(Net, self).__init__()
27
+ self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
28
+ self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
29
+ self.conv2_drop = nn.Dropout2d()
30
+ self.fc1 = nn.Linear(320, 50)
31
+ self.fc2 = nn.Linear(50, 10)
32
+
33
+ def forward(self, x):
34
+ x = F.relu(F.max_pool2d(self.conv1(x), 2))
35
+ x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
36
+ x = x.view(-1, 320)
37
+ x = F.relu(self.fc1(x))
38
+ x = F.dropout(x, training=self.training)
39
+ x = self.fc2(x)
40
+ return F.log_softmax(x)
41
+
42
+
43
+ def setup(config):
44
+ data_dir = config.get("data_dir", None)
45
+ seed = config.get("seed", 42)
46
+ batch_size = config.get("batch_size", 64)
47
+ use_adasum = config.get("use_adasum", False)
48
+ lr = config.get("lr", 0.01)
49
+ momentum = config.get("momentum", 0.5)
50
+ use_cuda = config.get("use_cuda", False)
51
+
52
+ # Horovod: initialize library.
53
+ hvd.init()
54
+ torch.manual_seed(seed)
55
+
56
+ if use_cuda:
57
+ # Horovod: pin GPU to local rank.
58
+ torch.cuda.set_device(hvd.local_rank())
59
+ torch.cuda.manual_seed(seed)
60
+
61
+ # Horovod: limit # of CPU threads to be used per worker.
62
+ torch.set_num_threads(1)
63
+
64
+ kwargs = {"num_workers": 1, "pin_memory": True} if use_cuda else {}
65
+ data_dir = data_dir or "~/data"
66
+ with FileLock(os.path.expanduser("~/.horovod_lock")):
67
+ train_dataset = datasets.MNIST(
68
+ data_dir,
69
+ train=True,
70
+ download=True,
71
+ transform=transforms.Compose(
72
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
73
+ ),
74
+ )
75
+ # Horovod: use DistributedSampler to partition the training data.
76
+ train_sampler = torch.utils.data.distributed.DistributedSampler(
77
+ train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
78
+ )
79
+ train_loader = torch.utils.data.DataLoader(
80
+ train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs
81
+ )
82
+
83
+ model = Net()
84
+
85
+ # By default, Adasum doesn't need scaling up learning rate.
86
+ lr_scaler = hvd.size() if not use_adasum else 1
87
+
88
+ if use_cuda:
89
+ # Move model to GPU.
90
+ model.cuda()
91
+ # If using GPU Adasum allreduce, scale learning rate by local_size.
92
+ if use_adasum and hvd.nccl_built():
93
+ lr_scaler = hvd.local_size()
94
+
95
+ # Horovod: scale learning rate by lr_scaler.
96
+ optimizer = optim.SGD(model.parameters(), lr=lr * lr_scaler, momentum=momentum)
97
+
98
+ # Horovod: wrap optimizer with DistributedOptimizer.
99
+ optimizer = hvd.DistributedOptimizer(
100
+ optimizer,
101
+ named_parameters=model.named_parameters(),
102
+ op=hvd.Adasum if use_adasum else hvd.Average,
103
+ )
104
+
105
+ return model, optimizer, train_loader, train_sampler
106
+
107
+
108
+ def train_epoch(
109
+ model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
110
+ ):
111
+ loss = None
112
+ model.train()
113
+ # Horovod: set epoch to sampler for shuffling.
114
+ train_sampler.set_epoch(epoch)
115
+ for batch_idx, (data, target) in enumerate(train_loader):
116
+ if use_cuda:
117
+ data, target = data.cuda(), target.cuda()
118
+ optimizer.zero_grad()
119
+ output = model(data)
120
+ loss = F.nll_loss(output, target)
121
+ loss.backward()
122
+ optimizer.step()
123
+ if batch_idx % log_interval == 0:
124
+ # Horovod: use train_sampler to determine the number of
125
+ # examples in this worker's partition.
126
+ print(
127
+ "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
128
+ epoch,
129
+ batch_idx * len(data),
130
+ len(train_sampler),
131
+ 100.0 * batch_idx / len(train_loader),
132
+ loss.item(),
133
+ )
134
+ )
135
+ return loss.item() if loss else None
136
+
137
+
138
+ # Horovod function API.
139
+
140
+
141
+ def train_func(config):
142
+ num_epochs = config.get("num_epochs", 10)
143
+ log_interval = config.get("log_interval", 10)
144
+ use_cuda = config.get("use_cuda", False)
145
+
146
+ model, optimizer, train_loader, train_sampler = setup(config)
147
+
148
+ for epoch in range(num_epochs):
149
+ loss = train_epoch(
150
+ model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
151
+ )
152
+ train.report(dict(loss=loss))
153
+
154
+
155
+ def main(num_workers, use_gpu, kwargs):
156
+ trainer = HorovodTrainer(
157
+ train_func,
158
+ train_loop_config=kwargs,
159
+ scaling_config=ScalingConfig(use_gpu=use_gpu, num_workers=num_workers),
160
+ )
161
+ results = trainer.fit()
162
+ print(results.metrics)
163
+
164
+
165
+ # Horovod Class API.
166
+
167
+
168
+ class HorovodTrainClass:
169
+ def __init__(self, config):
170
+ self.log_interval = config.get("log_interval", 10)
171
+ self.use_cuda = config.get("use_cuda", False)
172
+
173
+ if self.use_cuda:
174
+ torch.cuda.set_device(hvd.local_rank())
175
+
176
+ self.model, self.optimizer, self.train_loader, self.train_sampler = setup(
177
+ config
178
+ )
179
+
180
+ def train(self, epoch):
181
+ loss = train_epoch(
182
+ self.model,
183
+ self.optimizer,
184
+ self.train_sampler,
185
+ self.train_loader,
186
+ epoch,
187
+ self.log_interval,
188
+ self.use_cuda,
189
+ )
190
+ return loss
191
+
192
+
193
+ if __name__ == "__main__":
194
+ # Training settings
195
+ parser = argparse.ArgumentParser(
196
+ description="PyTorch MNIST Example",
197
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
198
+ )
199
+ parser.add_argument(
200
+ "--batch-size",
201
+ type=int,
202
+ default=64,
203
+ metavar="N",
204
+ help="input batch size for training (default: 64)",
205
+ )
206
+ parser.add_argument(
207
+ "--num-epochs",
208
+ type=int,
209
+ default=5,
210
+ metavar="N",
211
+ help="number of epochs to train (default: 10)",
212
+ )
213
+ parser.add_argument(
214
+ "--lr",
215
+ type=float,
216
+ default=0.01,
217
+ metavar="LR",
218
+ help="learning rate (default: 0.01)",
219
+ )
220
+ parser.add_argument(
221
+ "--momentum",
222
+ type=float,
223
+ default=0.5,
224
+ metavar="M",
225
+ help="SGD momentum (default: 0.5)",
226
+ )
227
+ parser.add_argument(
228
+ "--use-gpu", action="store_true", default=False, help="enables CUDA training"
229
+ )
230
+ parser.add_argument(
231
+ "--seed", type=int, default=42, metavar="S", help="random seed (default: 42)"
232
+ )
233
+ parser.add_argument(
234
+ "--log-interval",
235
+ type=int,
236
+ default=10,
237
+ metavar="N",
238
+ help="how many batches to wait before logging training status",
239
+ )
240
+ parser.add_argument(
241
+ "--use-adasum",
242
+ action="store_true",
243
+ default=False,
244
+ help="use adasum algorithm to do reduction",
245
+ )
246
+ parser.add_argument(
247
+ "--num-workers",
248
+ type=int,
249
+ default=2,
250
+ help="Number of Ray workers to use for training.",
251
+ )
252
+ parser.add_argument(
253
+ "--data-dir",
254
+ help="location of the training dataset in the local filesystem ("
255
+ "will be downloaded if needed)",
256
+ )
257
+ parser.add_argument(
258
+ "--address",
259
+ required=False,
260
+ type=str,
261
+ default=None,
262
+ help="Address of Ray cluster.",
263
+ )
264
+
265
+ args = parser.parse_args()
266
+
267
+ if args.address:
268
+ ray.init(args.address)
269
+ else:
270
+ ray.init()
271
+
272
+ use_cuda = args.use_gpu if args.use_gpu is not None else False
273
+
274
+ kwargs = {
275
+ "data_dir": args.data_dir,
276
+ "seed": args.seed,
277
+ "use_cuda": use_cuda,
278
+ "batch_size": args.batch_size,
279
+ "use_adasum": args.use_adasum if args.use_adasum else False,
280
+ "lr": args.lr,
281
+ "momentum": args.momentum,
282
+ "num_epochs": args.num_epochs,
283
+ "log_interval": args.log_interval,
284
+ }
285
+
286
+ main(num_workers=args.num_workers, use_gpu=use_cuda, kwargs=kwargs)
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_pytorch_example.py ADDED
@@ -0,0 +1,270 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import os
3
+ import tempfile
4
+
5
+ import horovod.torch as hvd
6
+ import torch.nn as nn
7
+ import torch.nn.functional as F
8
+ import torch.optim as optim
9
+ import torch.utils.data.distributed
10
+ from filelock import FileLock
11
+ from torchvision import datasets, transforms
12
+
13
+ import ray.train.torch
14
+ from ray import train
15
+ from ray.train import Checkpoint, ScalingConfig
16
+ from ray.train.horovod import HorovodTrainer
17
+
18
+
19
+ def metric_average(val, name):
20
+ tensor = torch.tensor(val)
21
+ avg_tensor = hvd.allreduce(tensor, name=name)
22
+ return avg_tensor.item()
23
+
24
+
25
+ class Net(nn.Module):
26
+ def __init__(self):
27
+ super(Net, self).__init__()
28
+ self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
29
+ self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
30
+ self.conv2_drop = nn.Dropout2d()
31
+ self.fc1 = nn.Linear(320, 50)
32
+ self.fc2 = nn.Linear(50, 10)
33
+
34
+ def forward(self, x):
35
+ x = F.relu(F.max_pool2d(self.conv1(x), 2))
36
+ x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
37
+ x = x.view(-1, 320)
38
+ x = F.relu(self.fc1(x))
39
+ x = F.dropout(x, training=self.training)
40
+ x = self.fc2(x)
41
+ return F.log_softmax(x)
42
+
43
+
44
+ def setup(config):
45
+ data_dir = config.get("data_dir", None)
46
+ seed = config.get("seed", 42)
47
+ batch_size = config.get("batch_size", 64)
48
+ use_adasum = config.get("use_adasum", False)
49
+ lr = config.get("lr", 0.01)
50
+ momentum = config.get("momentum", 0.5)
51
+ use_cuda = config.get("use_cuda", False)
52
+
53
+ # Horovod: initialize library.
54
+ hvd.init()
55
+ torch.manual_seed(seed)
56
+
57
+ if use_cuda:
58
+ # Horovod: pin GPU to local rank.
59
+ torch.cuda.set_device(hvd.local_rank())
60
+ torch.cuda.manual_seed(seed)
61
+
62
+ # Horovod: limit # of CPU threads to be used per worker.
63
+ torch.set_num_threads(1)
64
+
65
+ kwargs = {"pin_memory": True} if use_cuda else {}
66
+ data_dir = data_dir or "~/data"
67
+ with FileLock(os.path.expanduser("~/.horovod_lock")):
68
+ train_dataset = datasets.MNIST(
69
+ data_dir,
70
+ train=True,
71
+ download=True,
72
+ transform=transforms.Compose(
73
+ [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]
74
+ ),
75
+ )
76
+ # Horovod: use DistributedSampler to partition the training data.
77
+ train_sampler = torch.utils.data.distributed.DistributedSampler(
78
+ train_dataset, num_replicas=hvd.size(), rank=hvd.rank()
79
+ )
80
+ # Note, don't set `num_workers` in DataLoader (not even 1),
81
+ # as that will separately start multiple processes (each corresponding to 1 worker)
82
+ # to load the data. This is known to cause issues with Ray.
83
+ train_loader = torch.utils.data.DataLoader(
84
+ train_dataset, batch_size=batch_size, sampler=train_sampler, **kwargs
85
+ )
86
+
87
+ model = Net()
88
+
89
+ # By default, Adasum doesn't need scaling up learning rate.
90
+ lr_scaler = hvd.size() if not use_adasum else 1
91
+
92
+ if use_cuda:
93
+ # Move model to GPU.
94
+ model.cuda()
95
+ # If using GPU Adasum allreduce, scale learning rate by local_size.
96
+ if use_adasum and hvd.nccl_built():
97
+ lr_scaler = hvd.local_size()
98
+
99
+ # Horovod: scale learning rate by lr_scaler.
100
+ optimizer = optim.SGD(model.parameters(), lr=lr * lr_scaler, momentum=momentum)
101
+
102
+ # Horovod: wrap optimizer with DistributedOptimizer.
103
+ optimizer = hvd.DistributedOptimizer(
104
+ optimizer,
105
+ named_parameters=model.named_parameters(),
106
+ op=hvd.Adasum if use_adasum else hvd.Average,
107
+ )
108
+
109
+ return model, optimizer, train_loader, train_sampler
110
+
111
+
112
+ def train_epoch(
113
+ model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
114
+ ):
115
+ loss = None
116
+ model.train()
117
+ # Horovod: set epoch to sampler for shuffling.
118
+ train_sampler.set_epoch(epoch)
119
+ for batch_idx, (data, target) in enumerate(train_loader):
120
+ if use_cuda:
121
+ data, target = data.cuda(), target.cuda()
122
+ optimizer.zero_grad()
123
+ output = model(data)
124
+ loss = F.nll_loss(output, target)
125
+ loss.backward()
126
+ optimizer.step()
127
+ if batch_idx % log_interval == 0:
128
+ # Horovod: use train_sampler to determine the number of
129
+ # examples in this worker's partition.
130
+ print(
131
+ "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
132
+ epoch,
133
+ batch_idx * len(data),
134
+ len(train_sampler),
135
+ 100.0 * batch_idx / len(train_loader),
136
+ loss.item(),
137
+ )
138
+ )
139
+ return loss.item() if loss else None
140
+
141
+
142
+ def train_func(config):
143
+ num_epochs = config.get("num_epochs", 10)
144
+ log_interval = config.get("log_interval", 10)
145
+ use_cuda = config.get("use_cuda", False)
146
+
147
+ model, optimizer, train_loader, train_sampler = setup(config)
148
+
149
+ results = []
150
+ for epoch in range(num_epochs):
151
+ loss = train_epoch(
152
+ model, optimizer, train_sampler, train_loader, epoch, log_interval, use_cuda
153
+ )
154
+ results.append(loss)
155
+ with tempfile.TemporaryDirectory() as tmpdir:
156
+ torch.save(model.state_dict(), os.path.join(tmpdir, "model.pt"))
157
+ train.report({"loss": loss}, checkpoint=Checkpoint.from_directory(tmpdir))
158
+
159
+ # Only used for testing.
160
+ return results
161
+
162
+
163
+ def main(num_workers, use_gpu, kwargs):
164
+ trainer = HorovodTrainer(
165
+ train_loop_per_worker=train_func,
166
+ train_loop_config={
167
+ "num_epochs": kwargs["num_epochs"],
168
+ "log_interval": kwargs["log_interval"],
169
+ "use_cuda": kwargs["use_cuda"],
170
+ },
171
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
172
+ )
173
+ result = trainer.fit()
174
+ print(result)
175
+
176
+
177
+ if __name__ == "__main__":
178
+ # Training settings
179
+ parser = argparse.ArgumentParser(
180
+ description="PyTorch MNIST Example",
181
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
182
+ )
183
+ parser.add_argument(
184
+ "--batch-size",
185
+ type=int,
186
+ default=64,
187
+ metavar="N",
188
+ help="input batch size for training (default: 64)",
189
+ )
190
+ parser.add_argument(
191
+ "--num-epochs",
192
+ type=int,
193
+ default=5,
194
+ metavar="N",
195
+ help="number of epochs to train (default: 10)",
196
+ )
197
+ parser.add_argument(
198
+ "--lr",
199
+ type=float,
200
+ default=0.01,
201
+ metavar="LR",
202
+ help="learning rate (default: 0.01)",
203
+ )
204
+ parser.add_argument(
205
+ "--momentum",
206
+ type=float,
207
+ default=0.5,
208
+ metavar="M",
209
+ help="SGD momentum (default: 0.5)",
210
+ )
211
+ parser.add_argument(
212
+ "--use-gpu", action="store_true", default=False, help="enables CUDA training"
213
+ )
214
+ parser.add_argument(
215
+ "--seed", type=int, default=42, metavar="S", help="random seed (default: 42)"
216
+ )
217
+ parser.add_argument(
218
+ "--log-interval",
219
+ type=int,
220
+ default=10,
221
+ metavar="N",
222
+ help="how many batches to wait before logging training status",
223
+ )
224
+ parser.add_argument(
225
+ "--use-adasum",
226
+ action="store_true",
227
+ default=False,
228
+ help="use adasum algorithm to do reduction",
229
+ )
230
+ parser.add_argument(
231
+ "--num-workers",
232
+ type=int,
233
+ default=2,
234
+ help="Number of Ray workers to use for training.",
235
+ )
236
+ parser.add_argument(
237
+ "--data-dir",
238
+ help="location of the training dataset in the local filesystem ("
239
+ "will be downloaded if needed)",
240
+ )
241
+ parser.add_argument(
242
+ "--address",
243
+ required=False,
244
+ type=str,
245
+ default=None,
246
+ help="Address of Ray cluster.",
247
+ )
248
+
249
+ args = parser.parse_args()
250
+
251
+ if args.address:
252
+ ray.init(args.address)
253
+ else:
254
+ ray.init()
255
+
256
+ use_cuda = args.use_gpu if args.use_gpu is not None else False
257
+
258
+ kwargs = {
259
+ "data_dir": args.data_dir,
260
+ "seed": args.seed,
261
+ "use_cuda": use_cuda,
262
+ "batch_size": args.batch_size,
263
+ "use_adasum": args.use_adasum if args.use_adasum else False,
264
+ "lr": args.lr,
265
+ "momentum": args.momentum,
266
+ "num_epochs": args.num_epochs,
267
+ "log_interval": args.log_interval,
268
+ }
269
+
270
+ main(num_workers=args.num_workers, use_gpu=use_cuda, kwargs=kwargs)
.venv/lib/python3.11/site-packages/ray/train/examples/horovod/horovod_tune_example.py ADDED
@@ -0,0 +1,139 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+
3
+ import numpy as np
4
+ import torch
5
+
6
+ import ray
7
+ import ray.train.torch
8
+ from ray import train, tune
9
+ from ray.train import ScalingConfig
10
+ from ray.train.horovod import HorovodTrainer
11
+ from ray.tune.tune_config import TuneConfig
12
+ from ray.tune.tuner import Tuner
13
+
14
+
15
+ def sq(x):
16
+ m2 = 1.0
17
+ m1 = -20.0
18
+ m0 = 50.0
19
+ return m2 * x * x + m1 * x + m0
20
+
21
+
22
+ def qu(x):
23
+ m3 = 10.0
24
+ m2 = 5.0
25
+ m1 = -20.0
26
+ m0 = -5.0
27
+ return m3 * x * x * x + m2 * x * x + m1 * x + m0
28
+
29
+
30
+ class Net(torch.nn.Module):
31
+ def __init__(self, mode="sq"):
32
+ super(Net, self).__init__()
33
+
34
+ if mode == "square":
35
+ self.mode = 0
36
+ self.param = torch.nn.Parameter(torch.FloatTensor([1.0, -1.0]))
37
+ else:
38
+ self.mode = 1
39
+ self.param = torch.nn.Parameter(torch.FloatTensor([1.0, -1.0, 1.0]))
40
+
41
+ def forward(self, x):
42
+ if ~self.mode:
43
+ return x * x + self.param[0] * x + self.param[1]
44
+ else:
45
+ return_val = 10 * x * x * x
46
+ return_val += self.param[0] * x * x
47
+ return_val += self.param[1] * x + self.param[2]
48
+ return return_val
49
+
50
+
51
+ def train_loop_per_worker(config):
52
+ import horovod.torch as hvd
53
+ import torch
54
+
55
+ hvd.init()
56
+ device = ray.train.torch.get_device()
57
+ mode = config["mode"]
58
+ net = Net(mode).to(device)
59
+ optimizer = torch.optim.SGD(
60
+ net.parameters(),
61
+ lr=config["lr"],
62
+ )
63
+ optimizer = hvd.DistributedOptimizer(optimizer)
64
+
65
+ num_steps = 5
66
+ print(hvd.size())
67
+ np.random.seed(1 + hvd.rank())
68
+ torch.manual_seed(1234)
69
+ # To ensure consistent initialization across workers,
70
+ hvd.broadcast_parameters(net.state_dict(), root_rank=0)
71
+ hvd.broadcast_optimizer_state(optimizer, root_rank=0)
72
+
73
+ start = time.time()
74
+ x_max = config["x_max"]
75
+ for step in range(1, num_steps + 1):
76
+ features = torch.Tensor(np.random.rand(1) * 2 * x_max - x_max).to(device)
77
+ if mode == "square":
78
+ labels = sq(features)
79
+ else:
80
+ labels = qu(features)
81
+ optimizer.zero_grad()
82
+ outputs = net(features)
83
+ loss = torch.nn.MSELoss()(outputs, labels)
84
+ loss.backward()
85
+
86
+ optimizer.step()
87
+ time.sleep(0.1)
88
+ train.report(dict(loss=loss.item()))
89
+ total = time.time() - start
90
+ print(f"Took {total:0.3f} s. Avg: {total / num_steps:0.3f} s.")
91
+
92
+
93
+ def tune_horovod(num_workers, num_samples, use_gpu, mode="square", x_max=1.0):
94
+ horovod_trainer = HorovodTrainer(
95
+ train_loop_per_worker=train_loop_per_worker,
96
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
97
+ train_loop_config={"mode": mode, "x_max": x_max},
98
+ )
99
+
100
+ tuner = Tuner(
101
+ horovod_trainer,
102
+ param_space={"train_loop_config": {"lr": tune.uniform(0.1, 1)}},
103
+ tune_config=TuneConfig(mode="min", metric="loss", num_samples=num_samples),
104
+ _tuner_kwargs={"fail_fast": True},
105
+ )
106
+
107
+ result_grid = tuner.fit()
108
+
109
+ print("Best hyperparameters found were: ", result_grid.get_best_result().config)
110
+
111
+
112
+ if __name__ == "__main__":
113
+ import argparse
114
+
115
+ parser = argparse.ArgumentParser()
116
+ parser.add_argument(
117
+ "--mode", type=str, default="square", choices=["square", "cubic"]
118
+ )
119
+ parser.add_argument(
120
+ "--learning_rate", type=float, default=0.1, dest="learning_rate"
121
+ )
122
+ parser.add_argument("--x_max", type=float, default=1.0, dest="x_max")
123
+ parser.add_argument("--gpu", action="store_true")
124
+ parser.add_argument(
125
+ "--smoke-test", action="store_true", help=("Finish quickly for testing.")
126
+ )
127
+ parser.add_argument("--num-workers", type=int, default=2)
128
+ args, _ = parser.parse_known_args()
129
+
130
+ if args.smoke_test:
131
+ ray.init(num_cpus=3)
132
+
133
+ tune_horovod(
134
+ num_workers=args.num_workers,
135
+ num_samples=2 if args.smoke_test else 10,
136
+ use_gpu=args.gpu,
137
+ mode=args.mode,
138
+ x_max=args.x_max,
139
+ )
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (199 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_fashion_mnist_example.cpython-311.pyc ADDED
Binary file (7.2 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_linear_example.cpython-311.pyc ADDED
Binary file (8.31 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_quick_start.cpython-311.pyc ADDED
Binary file (5.31 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/torch_regression_example.cpython-311.pyc ADDED
Binary file (8.81 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/tune_cifar_torch_pbt_example.cpython-311.pyc ADDED
Binary file (12.9 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/__pycache__/tune_torch_regression_example.cpython-311.pyc ADDED
Binary file (3.48 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (229 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/__pycache__/auto_pipeline_for_host_to_device_data_transfer.cpython-311.pyc ADDED
Binary file (8.63 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_data_prefetch_benchmark/auto_pipeline_for_host_to_device_data_transfer.py ADDED
@@ -0,0 +1,161 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # The PyTorch data transfer benchmark script.
2
+ import argparse
3
+ import warnings
4
+
5
+ import numpy as np
6
+ import torch
7
+ import torch.nn as nn
8
+
9
+ import ray.train as train
10
+ from ray.train import ScalingConfig
11
+ from ray.train.torch import TorchTrainer
12
+
13
+
14
+ class Net(nn.Module):
15
+ def __init__(self, in_d, hidden):
16
+ # output dim = 1
17
+ super(Net, self).__init__()
18
+ dims = [in_d] + hidden + [1]
19
+ self.layers = nn.ModuleList(
20
+ [nn.Linear(dims[i - 1], dims[i]) for i in range(len(dims))]
21
+ )
22
+
23
+ def forward(self, x):
24
+ for layer in self.layers:
25
+ x = layer(x)
26
+ return x
27
+
28
+
29
+ class BenchmarkDataset(torch.utils.data.Dataset):
30
+ """Create a naive dataset for the benchmark"""
31
+
32
+ def __init__(self, dim, size=1000):
33
+ self.x = torch.from_numpy(np.random.normal(size=(size, dim))).float()
34
+ self.y = torch.from_numpy(np.random.normal(size=(size, 1))).float()
35
+ self.size = size
36
+
37
+ def __getitem__(self, index):
38
+ return self.x[index, None], self.y[index, None]
39
+
40
+ def __len__(self):
41
+ return self.size
42
+
43
+
44
+ def train_epoch(epoch, dataloader, model, loss_fn, optimizer):
45
+ if train.get_context().get_world_size() > 1:
46
+ dataloader.sampler.set_epoch(epoch)
47
+
48
+ for X, y in dataloader:
49
+ # Compute prediction error
50
+ pred = model(X)
51
+ loss = loss_fn(pred, y)
52
+
53
+ # Backpropagation
54
+ optimizer.zero_grad()
55
+ loss.backward()
56
+ optimizer.step()
57
+
58
+
59
+ def train_func(config):
60
+ data_size = config.get("data_size", 4096 * 50)
61
+ batch_size = config.get("batch_size", 4096)
62
+ hidden_size = config.get("hidden_size", 1)
63
+ use_auto_transfer = config.get("use_auto_transfer", False)
64
+ lr = config.get("lr", 1e-2)
65
+ epochs = config.get("epochs", 10)
66
+
67
+ train_dataset = BenchmarkDataset(4096, size=data_size)
68
+ train_loader = torch.utils.data.DataLoader(
69
+ train_dataset, batch_size=batch_size, shuffle=True
70
+ )
71
+
72
+ train_loader = train.torch.prepare_data_loader(
73
+ data_loader=train_loader, move_to_device=True, auto_transfer=use_auto_transfer
74
+ )
75
+
76
+ model = Net(in_d=4096, hidden=[4096] * hidden_size)
77
+ model = train.torch.prepare_model(model)
78
+
79
+ loss_fn = nn.MSELoss()
80
+ optimizer = torch.optim.SGD(model.parameters(), lr=lr)
81
+
82
+ start = torch.cuda.Event(enable_timing=True)
83
+ end = torch.cuda.Event(enable_timing=True)
84
+
85
+ choice = "with" if use_auto_transfer else "without"
86
+ print(f"Starting the torch data prefetch benchmark {choice} auto pipeline...")
87
+
88
+ torch.cuda.synchronize()
89
+ start.record()
90
+ for epoch in range(epochs):
91
+ train_epoch(epoch, train_loader, model, loss_fn, optimizer)
92
+ end.record()
93
+ torch.cuda.synchronize()
94
+
95
+ print(
96
+ f"Finished the torch data prefetch benchmark {choice} "
97
+ f"auto pipeline: {start.elapsed_time(end)} ms."
98
+ )
99
+
100
+ return "Experiment done."
101
+
102
+
103
+ def train_linear(num_workers=1, num_hidden_layers=1, use_auto_transfer=True, epochs=3):
104
+ config = {
105
+ "lr": 1e-2,
106
+ "hidden_size": num_hidden_layers,
107
+ "batch_size": 4096,
108
+ "epochs": epochs,
109
+ "use_auto_transfer": use_auto_transfer,
110
+ }
111
+ trainer = TorchTrainer(
112
+ train_func,
113
+ train_loop_config=config,
114
+ scaling_config=ScalingConfig(use_gpu=True, num_workers=num_workers),
115
+ )
116
+ results = trainer.fit()
117
+
118
+ print(results.metrics)
119
+ return results
120
+
121
+
122
+ if __name__ == "__main__":
123
+ parser = argparse.ArgumentParser()
124
+ parser.add_argument(
125
+ "--address", required=False, type=str, help="the address to use for Ray"
126
+ )
127
+ parser.add_argument(
128
+ "--epochs", type=int, default=1, help="Number of epochs to train for."
129
+ )
130
+ parser.add_argument(
131
+ "--num_hidden_layers",
132
+ type=int,
133
+ default=1,
134
+ help="Number of epochs to train for.",
135
+ )
136
+
137
+ args, _ = parser.parse_known_args()
138
+
139
+ import ray
140
+
141
+ ray.init(address=args.address)
142
+
143
+ if not torch.cuda.is_available():
144
+ warnings.warn("GPU is not available. Skip the test using auto pipeline.")
145
+ else:
146
+ train_linear(
147
+ num_workers=1,
148
+ num_hidden_layers=args.num_hidden_layers,
149
+ use_auto_transfer=True,
150
+ epochs=args.epochs,
151
+ )
152
+
153
+ torch.cuda.empty_cache()
154
+ train_linear(
155
+ num_workers=1,
156
+ num_hidden_layers=args.num_hidden_layers,
157
+ use_auto_transfer=False,
158
+ epochs=args.epochs,
159
+ )
160
+
161
+ ray.shutdown()
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_fashion_mnist_example.py ADDED
@@ -0,0 +1,152 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Dict
3
+
4
+ import torch
5
+ from filelock import FileLock
6
+ from torch import nn
7
+ from torch.utils.data import DataLoader
8
+ from torchvision import datasets, transforms
9
+ from torchvision.transforms import Normalize, ToTensor
10
+ from tqdm import tqdm
11
+
12
+ import ray.train
13
+ from ray.train import ScalingConfig
14
+ from ray.train.torch import TorchTrainer
15
+
16
+
17
+ def get_dataloaders(batch_size):
18
+ # Transform to normalize the input images
19
+ transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])
20
+
21
+ with FileLock(os.path.expanduser("~/data.lock")):
22
+ # Download training data from open datasets
23
+ training_data = datasets.FashionMNIST(
24
+ root="~/data",
25
+ train=True,
26
+ download=True,
27
+ transform=transform,
28
+ )
29
+
30
+ # Download test data from open datasets
31
+ test_data = datasets.FashionMNIST(
32
+ root="~/data",
33
+ train=False,
34
+ download=True,
35
+ transform=transform,
36
+ )
37
+
38
+ # Create data loaders
39
+ train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
40
+ test_dataloader = DataLoader(test_data, batch_size=batch_size)
41
+
42
+ return train_dataloader, test_dataloader
43
+
44
+
45
+ # Model Definition
46
+ class NeuralNetwork(nn.Module):
47
+ def __init__(self):
48
+ super(NeuralNetwork, self).__init__()
49
+ self.flatten = nn.Flatten()
50
+ self.linear_relu_stack = nn.Sequential(
51
+ nn.Linear(28 * 28, 512),
52
+ nn.ReLU(),
53
+ nn.Dropout(0.25),
54
+ nn.Linear(512, 512),
55
+ nn.ReLU(),
56
+ nn.Dropout(0.25),
57
+ nn.Linear(512, 10),
58
+ nn.ReLU(),
59
+ )
60
+
61
+ def forward(self, x):
62
+ x = self.flatten(x)
63
+ logits = self.linear_relu_stack(x)
64
+ return logits
65
+
66
+
67
+ def train_func_per_worker(config: Dict):
68
+ lr = config["lr"]
69
+ epochs = config["epochs"]
70
+ batch_size = config["batch_size_per_worker"]
71
+
72
+ # Get dataloaders inside the worker training function
73
+ train_dataloader, test_dataloader = get_dataloaders(batch_size=batch_size)
74
+
75
+ # [1] Prepare Dataloader for distributed training
76
+ # Shard the datasets among workers and move batches to the correct device
77
+ # =======================================================================
78
+ train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
79
+ test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)
80
+
81
+ model = NeuralNetwork()
82
+
83
+ # [2] Prepare and wrap your model with DistributedDataParallel
84
+ # Move the model to the correct GPU/CPU device
85
+ # ============================================================
86
+ model = ray.train.torch.prepare_model(model)
87
+
88
+ loss_fn = nn.CrossEntropyLoss()
89
+ optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
90
+
91
+ # Model training loop
92
+ for epoch in range(epochs):
93
+ if ray.train.get_context().get_world_size() > 1:
94
+ # Required for the distributed sampler to shuffle properly across epochs.
95
+ train_dataloader.sampler.set_epoch(epoch)
96
+
97
+ model.train()
98
+ for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
99
+ pred = model(X)
100
+ loss = loss_fn(pred, y)
101
+
102
+ optimizer.zero_grad()
103
+ loss.backward()
104
+ optimizer.step()
105
+
106
+ model.eval()
107
+ test_loss, num_correct, num_total = 0, 0, 0
108
+ with torch.no_grad():
109
+ for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
110
+ pred = model(X)
111
+ loss = loss_fn(pred, y)
112
+
113
+ test_loss += loss.item()
114
+ num_total += y.shape[0]
115
+ num_correct += (pred.argmax(1) == y).sum().item()
116
+
117
+ test_loss /= len(test_dataloader)
118
+ accuracy = num_correct / num_total
119
+
120
+ # [3] Report metrics to Ray Train
121
+ # ===============================
122
+ ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})
123
+
124
+
125
+ def train_fashion_mnist(num_workers=2, use_gpu=False):
126
+ global_batch_size = 32
127
+
128
+ train_config = {
129
+ "lr": 1e-3,
130
+ "epochs": 10,
131
+ "batch_size_per_worker": global_batch_size // num_workers,
132
+ }
133
+
134
+ # Configure computation resources
135
+ scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)
136
+
137
+ # Initialize a Ray TorchTrainer
138
+ trainer = TorchTrainer(
139
+ train_loop_per_worker=train_func_per_worker,
140
+ train_loop_config=train_config,
141
+ scaling_config=scaling_config,
142
+ )
143
+
144
+ # [4] Start distributed training
145
+ # Run `train_func_per_worker` on all workers
146
+ # =============================================
147
+ result = trainer.fit()
148
+ print(f"Training result: {result}")
149
+
150
+
151
+ if __name__ == "__main__":
152
+ train_fashion_mnist(num_workers=4, use_gpu=True)
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_linear_example.py ADDED
@@ -0,0 +1,147 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import os
3
+ import tempfile
4
+
5
+ import numpy as np
6
+ import torch
7
+ import torch.nn as nn
8
+
9
+ import ray.train as train
10
+ from ray.train import Checkpoint, RunConfig, ScalingConfig
11
+ from ray.train.torch import TorchTrainer
12
+
13
+
14
+ class LinearDataset(torch.utils.data.Dataset):
15
+ """y = a * x + b"""
16
+
17
+ def __init__(self, a, b, size=1000):
18
+ x = np.arange(0, 10, 10 / size, dtype=np.float32)
19
+ self.x = torch.from_numpy(x)
20
+ self.y = torch.from_numpy(a * x + b)
21
+
22
+ def __getitem__(self, index):
23
+ return self.x[index, None], self.y[index, None]
24
+
25
+ def __len__(self):
26
+ return len(self.x)
27
+
28
+
29
+ def train_epoch(epoch, dataloader, model, loss_fn, optimizer):
30
+ if train.get_context().get_world_size() > 1:
31
+ dataloader.sampler.set_epoch(epoch)
32
+
33
+ for X, y in dataloader:
34
+ # Compute prediction error
35
+ pred = model(X)
36
+ loss = loss_fn(pred, y)
37
+
38
+ # Backpropagation
39
+ optimizer.zero_grad()
40
+ loss.backward()
41
+ optimizer.step()
42
+
43
+
44
+ def validate_epoch(dataloader, model, loss_fn):
45
+ num_batches = len(dataloader)
46
+ model.eval()
47
+ loss = 0
48
+ with torch.no_grad():
49
+ for X, y in dataloader:
50
+ pred = model(X)
51
+ loss += loss_fn(pred, y).item()
52
+ loss /= num_batches
53
+ import copy
54
+
55
+ model_copy = copy.deepcopy(model)
56
+ return model_copy.cpu().state_dict(), loss
57
+
58
+
59
+ def train_func(config):
60
+ data_size = config.get("data_size", 1000)
61
+ val_size = config.get("val_size", 400)
62
+ batch_size = config.get("batch_size", 32)
63
+ hidden_size = config.get("hidden_size", 1)
64
+ lr = config.get("lr", 1e-2)
65
+ epochs = config.get("epochs", 3)
66
+
67
+ train_dataset = LinearDataset(2, 5, size=data_size)
68
+ val_dataset = LinearDataset(2, 5, size=val_size)
69
+ train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
70
+ validation_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size)
71
+
72
+ train_loader = train.torch.prepare_data_loader(train_loader)
73
+ validation_loader = train.torch.prepare_data_loader(validation_loader)
74
+
75
+ model = nn.Linear(1, hidden_size)
76
+ model = train.torch.prepare_model(model)
77
+
78
+ loss_fn = nn.MSELoss()
79
+
80
+ optimizer = torch.optim.SGD(model.parameters(), lr=lr)
81
+
82
+ results = []
83
+ for epoch in range(epochs):
84
+ train_epoch(epoch, train_loader, model, loss_fn, optimizer)
85
+ state_dict, loss = validate_epoch(validation_loader, model, loss_fn)
86
+ result = dict(loss=loss)
87
+ results.append(result)
88
+
89
+ with tempfile.TemporaryDirectory() as tmpdir:
90
+ torch.save(state_dict, os.path.join(tmpdir, "model.pt"))
91
+ train.report(result, checkpoint=Checkpoint.from_directory(tmpdir))
92
+
93
+ return results
94
+
95
+
96
+ def train_linear(num_workers=2, use_gpu=False, epochs=3, storage_path=None):
97
+ config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": epochs}
98
+ trainer = TorchTrainer(
99
+ train_loop_per_worker=train_func,
100
+ train_loop_config=config,
101
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
102
+ run_config=RunConfig(storage_path=storage_path),
103
+ )
104
+ result = trainer.fit()
105
+
106
+ print(result.metrics)
107
+ return result.metrics
108
+
109
+
110
+ if __name__ == "__main__":
111
+ parser = argparse.ArgumentParser()
112
+ parser.add_argument(
113
+ "--address", required=False, type=str, help="the address to use for Ray"
114
+ )
115
+ parser.add_argument(
116
+ "--num-workers",
117
+ "-n",
118
+ type=int,
119
+ default=2,
120
+ help="Sets number of workers for training.",
121
+ )
122
+ parser.add_argument(
123
+ "--use-gpu", action="store_true", help="Whether to use GPU for training."
124
+ )
125
+ parser.add_argument(
126
+ "--epochs", type=int, default=3, help="Number of epochs to train for."
127
+ )
128
+ parser.add_argument(
129
+ "--smoke-test",
130
+ action="store_true",
131
+ default=False,
132
+ help="Finish quickly for testing.",
133
+ )
134
+
135
+ args, _ = parser.parse_known_args()
136
+
137
+ import ray
138
+
139
+ if args.smoke_test:
140
+ # 2 workers + 1 for trainer.
141
+ ray.init(num_cpus=3)
142
+ train_linear()
143
+ else:
144
+ ray.init(address=args.address)
145
+ train_linear(
146
+ num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
147
+ )
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_quick_start.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ruff: noqa
2
+ # fmt: off
3
+ # isort: skip_file
4
+
5
+ # __torch_setup_begin__
6
+ import torch
7
+ import torch.nn as nn
8
+ from torch.utils.data import DataLoader
9
+ from torchvision import datasets
10
+ from torchvision.transforms import ToTensor
11
+
12
+ def get_dataset():
13
+ return datasets.FashionMNIST(
14
+ root="/tmp/data",
15
+ train=True,
16
+ download=True,
17
+ transform=ToTensor(),
18
+ )
19
+
20
+ class NeuralNetwork(nn.Module):
21
+ def __init__(self):
22
+ super().__init__()
23
+ self.flatten = nn.Flatten()
24
+ self.linear_relu_stack = nn.Sequential(
25
+ nn.Linear(28 * 28, 512),
26
+ nn.ReLU(),
27
+ nn.Linear(512, 512),
28
+ nn.ReLU(),
29
+ nn.Linear(512, 10),
30
+ )
31
+
32
+ def forward(self, inputs):
33
+ inputs = self.flatten(inputs)
34
+ logits = self.linear_relu_stack(inputs)
35
+ return logits
36
+ # __torch_setup_end__
37
+
38
+ # __torch_single_begin__
39
+ def train_func():
40
+ num_epochs = 3
41
+ batch_size = 64
42
+
43
+ dataset = get_dataset()
44
+ dataloader = DataLoader(dataset, batch_size=batch_size)
45
+
46
+ model = NeuralNetwork()
47
+
48
+ criterion = nn.CrossEntropyLoss()
49
+ optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
50
+
51
+ for epoch in range(num_epochs):
52
+ for inputs, labels in dataloader:
53
+ optimizer.zero_grad()
54
+ pred = model(inputs)
55
+ loss = criterion(pred, labels)
56
+ loss.backward()
57
+ optimizer.step()
58
+ print(f"epoch: {epoch}, loss: {loss.item()}")
59
+ # __torch_single_end__
60
+
61
+ # __torch_distributed_begin__
62
+ import ray.train.torch
63
+
64
+ def train_func_distributed():
65
+ num_epochs = 3
66
+ batch_size = 64
67
+
68
+ dataset = get_dataset()
69
+ dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
70
+ dataloader = ray.train.torch.prepare_data_loader(dataloader)
71
+
72
+ model = NeuralNetwork()
73
+ model = ray.train.torch.prepare_model(model)
74
+
75
+ criterion = nn.CrossEntropyLoss()
76
+ optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
77
+
78
+ for epoch in range(num_epochs):
79
+ if ray.train.get_context().get_world_size() > 1:
80
+ dataloader.sampler.set_epoch(epoch)
81
+
82
+ for inputs, labels in dataloader:
83
+ optimizer.zero_grad()
84
+ pred = model(inputs)
85
+ loss = criterion(pred, labels)
86
+ loss.backward()
87
+ optimizer.step()
88
+ print(f"epoch: {epoch}, loss: {loss.item()}")
89
+ # __torch_distributed_end__
90
+
91
+
92
+ if __name__ == "__main__":
93
+ # __torch_single_run_begin__
94
+ train_func()
95
+ # __torch_single_run_end__
96
+
97
+ # __torch_trainer_begin__
98
+ from ray.train.torch import TorchTrainer
99
+ from ray.train import ScalingConfig
100
+
101
+ # For GPU Training, set `use_gpu` to True.
102
+ use_gpu = False
103
+
104
+ trainer = TorchTrainer(
105
+ train_func_distributed,
106
+ scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu)
107
+ )
108
+
109
+ results = trainer.fit()
110
+ # __torch_trainer_end__
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/torch_regression_example.py ADDED
@@ -0,0 +1,160 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import os
3
+ import tempfile
4
+ from typing import Tuple
5
+
6
+ import pandas as pd
7
+ import torch
8
+ import torch.nn as nn
9
+
10
+ import ray
11
+ import ray.train as train
12
+ from ray.data import Dataset
13
+ from ray.train import Checkpoint, DataConfig, ScalingConfig
14
+ from ray.train.torch import TorchTrainer
15
+
16
+
17
+ def get_datasets(split: float = 0.7) -> Tuple[Dataset]:
18
+ dataset = ray.data.read_csv("s3://anonymous@air-example-data/regression.csv")
19
+
20
+ def combine_x(batch):
21
+ return pd.DataFrame(
22
+ {
23
+ "x": batch[[f"x{i:03d}" for i in range(100)]].values.tolist(),
24
+ "y": batch["y"],
25
+ }
26
+ )
27
+
28
+ dataset = dataset.map_batches(combine_x, batch_format="pandas")
29
+ train_dataset, validation_dataset = dataset.repartition(
30
+ num_blocks=4
31
+ ).train_test_split(split, shuffle=True)
32
+ return train_dataset, validation_dataset
33
+
34
+
35
+ def train_epoch(iterable_dataset, model, loss_fn, optimizer, device):
36
+ model.train()
37
+ for X, y in iterable_dataset:
38
+ X = X.to(device)
39
+ y = y.to(device)
40
+
41
+ # Compute prediction error
42
+ pred = model(X)
43
+ loss = loss_fn(pred, y)
44
+
45
+ # Backpropagation
46
+ optimizer.zero_grad()
47
+ loss.backward()
48
+ optimizer.step()
49
+
50
+
51
+ def validate_epoch(iterable_dataset, model, loss_fn, device):
52
+ num_batches = 0
53
+ model.eval()
54
+ loss = 0
55
+ with torch.no_grad():
56
+ for X, y in iterable_dataset:
57
+ X = X.to(device)
58
+ y = y.to(device)
59
+ num_batches += 1
60
+ pred = model(X)
61
+ loss += loss_fn(pred, y).item()
62
+ loss /= num_batches
63
+ result = {"loss": loss}
64
+ return result
65
+
66
+
67
+ def train_func(config):
68
+ batch_size = config.get("batch_size", 32)
69
+ hidden_size = config.get("hidden_size", 10)
70
+ lr = config.get("lr", 1e-2)
71
+ epochs = config.get("epochs", 3)
72
+
73
+ train_dataset_shard = train.get_dataset_shard("train")
74
+ validation_dataset = train.get_dataset_shard("validation")
75
+
76
+ model = nn.Sequential(
77
+ nn.Linear(100, hidden_size), nn.ReLU(), nn.Linear(hidden_size, 1)
78
+ )
79
+ model = train.torch.prepare_model(model)
80
+
81
+ loss_fn = nn.L1Loss()
82
+
83
+ optimizer = torch.optim.SGD(model.parameters(), lr=lr)
84
+
85
+ results = []
86
+
87
+ def create_torch_iterator(shard):
88
+ iterator = shard.iter_torch_batches(batch_size=batch_size)
89
+ for batch in iterator:
90
+ yield batch["x"].float(), batch["y"].float()
91
+
92
+ for _ in range(epochs):
93
+ train_torch_dataset = create_torch_iterator(train_dataset_shard)
94
+ validation_torch_dataset = create_torch_iterator(validation_dataset)
95
+
96
+ device = train.torch.get_device()
97
+
98
+ train_epoch(train_torch_dataset, model, loss_fn, optimizer, device)
99
+ if train.get_context().get_world_rank() == 0:
100
+ result = validate_epoch(validation_torch_dataset, model, loss_fn, device)
101
+ else:
102
+ result = {}
103
+ results.append(result)
104
+
105
+ with tempfile.TemporaryDirectory() as tmpdir:
106
+ torch.save(model.module.state_dict(), os.path.join(tmpdir, "model.pt"))
107
+ train.report(result, checkpoint=Checkpoint.from_directory(tmpdir))
108
+
109
+ return results
110
+
111
+
112
+ def train_regression(num_workers=2, use_gpu=False):
113
+ train_dataset, val_dataset = get_datasets()
114
+ config = {"lr": 1e-2, "hidden_size": 20, "batch_size": 4, "epochs": 3}
115
+
116
+ trainer = TorchTrainer(
117
+ train_loop_per_worker=train_func,
118
+ train_loop_config=config,
119
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
120
+ datasets={"train": train_dataset, "validation": val_dataset},
121
+ dataset_config=DataConfig(datasets_to_split=["train"]),
122
+ )
123
+
124
+ result = trainer.fit()
125
+ print(result.metrics)
126
+ return result
127
+
128
+
129
+ if __name__ == "__main__":
130
+ parser = argparse.ArgumentParser()
131
+ parser.add_argument(
132
+ "--address", required=False, type=str, help="the address to use for Ray"
133
+ )
134
+ parser.add_argument(
135
+ "--num-workers",
136
+ "-n",
137
+ type=int,
138
+ default=2,
139
+ help="Sets number of workers for training.",
140
+ )
141
+ parser.add_argument(
142
+ "--smoke-test",
143
+ action="store_true",
144
+ default=False,
145
+ help="Finish quickly for testing.",
146
+ )
147
+ parser.add_argument(
148
+ "--use-gpu", action="store_true", default=False, help="Use GPU for training."
149
+ )
150
+
151
+ args, _ = parser.parse_known_args()
152
+
153
+ if args.smoke_test:
154
+ # 2 workers, 1 for trainer, 1 for datasets
155
+ ray.init(num_cpus=4)
156
+ result = train_regression()
157
+ else:
158
+ ray.init(address=args.address)
159
+ result = train_regression(num_workers=args.num_workers, use_gpu=args.use_gpu)
160
+ print(result)
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/tune_cifar_torch_pbt_example.py ADDED
@@ -0,0 +1,253 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import os
3
+ import tempfile
4
+
5
+ import torch
6
+ import torch.nn as nn
7
+ import torchvision.transforms as transforms
8
+ from filelock import FileLock
9
+ from torch.utils.data import DataLoader, Subset
10
+ from torchvision.datasets import CIFAR10
11
+ from torchvision.models import resnet18
12
+
13
+ import ray
14
+ import ray.cloudpickle as cpickle
15
+ from ray import train, tune
16
+ from ray.train import Checkpoint, FailureConfig, RunConfig, ScalingConfig
17
+ from ray.train.torch import TorchTrainer
18
+ from ray.tune.schedulers import PopulationBasedTraining
19
+ from ray.tune.tune_config import TuneConfig
20
+ from ray.tune.tuner import Tuner
21
+
22
+
23
+ def train_epoch(epoch, dataloader, model, loss_fn, optimizer):
24
+ if ray.train.get_context().get_world_size() > 1:
25
+ dataloader.sampler.set_epoch(epoch)
26
+
27
+ size = len(dataloader.dataset) // train.get_context().get_world_size()
28
+ model.train()
29
+ for batch, (X, y) in enumerate(dataloader):
30
+ # Compute prediction error
31
+ pred = model(X)
32
+ loss = loss_fn(pred, y)
33
+
34
+ # Backpropagation
35
+ optimizer.zero_grad()
36
+ loss.backward()
37
+ optimizer.step()
38
+
39
+ if batch % 100 == 0:
40
+ loss, current = loss.item(), batch * len(X)
41
+ print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
42
+
43
+
44
+ def validate_epoch(dataloader, model, loss_fn):
45
+ size = len(dataloader.dataset) // train.get_context().get_world_size()
46
+ num_batches = len(dataloader)
47
+ model.eval()
48
+ test_loss, correct = 0, 0
49
+ with torch.no_grad():
50
+ for X, y in dataloader:
51
+ pred = model(X)
52
+ test_loss += loss_fn(pred, y).item()
53
+ correct += (pred.argmax(1) == y).type(torch.float).sum().item()
54
+ test_loss /= num_batches
55
+ correct /= size
56
+ print(
57
+ f"Test Error: \n "
58
+ f"Accuracy: {(100 * correct):>0.1f}%, "
59
+ f"Avg loss: {test_loss:>8f} \n"
60
+ )
61
+ return {"loss": test_loss}
62
+
63
+
64
+ def update_optimizer_config(optimizer, config):
65
+ for param_group in optimizer.param_groups:
66
+ for param, val in config.items():
67
+ param_group[param] = val
68
+
69
+
70
+ def train_func(config):
71
+ epochs = config.get("epochs", 3)
72
+
73
+ model = resnet18()
74
+
75
+ # Note that `prepare_model` needs to be called before setting optimizer.
76
+ if not train.get_checkpoint(): # fresh start
77
+ model = train.torch.prepare_model(model)
78
+
79
+ # Create optimizer.
80
+ optimizer_config = {
81
+ "lr": config.get("lr"),
82
+ "momentum": config.get("momentum"),
83
+ }
84
+ optimizer = torch.optim.SGD(model.parameters(), **optimizer_config)
85
+
86
+ starting_epoch = 0
87
+ if train.get_checkpoint():
88
+ with train.get_checkpoint().as_directory() as checkpoint_dir:
89
+ with open(os.path.join(checkpoint_dir, "data.ckpt"), "rb") as fp:
90
+ checkpoint_dict = cpickle.load(fp)
91
+
92
+ # Load in model
93
+ model_state = checkpoint_dict["model"]
94
+ model.load_state_dict(model_state)
95
+ model = train.torch.prepare_model(model)
96
+
97
+ # Load in optimizer
98
+ optimizer_state = checkpoint_dict["optimizer_state_dict"]
99
+ optimizer.load_state_dict(optimizer_state)
100
+
101
+ # Optimizer configs (`lr`, `momentum`) are being mutated by PBT and passed in
102
+ # through config, so we need to update the optimizer loaded from the checkpoint
103
+ update_optimizer_config(optimizer, optimizer_config)
104
+
105
+ # The current epoch increments the loaded epoch by 1
106
+ checkpoint_epoch = checkpoint_dict["epoch"]
107
+ starting_epoch = checkpoint_epoch + 1
108
+
109
+ # Load in training and validation data.
110
+ transform_train = transforms.Compose(
111
+ [
112
+ transforms.RandomCrop(32, padding=4),
113
+ transforms.RandomHorizontalFlip(),
114
+ transforms.ToTensor(),
115
+ transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
116
+ ]
117
+ ) # meanstd transformation
118
+
119
+ transform_test = transforms.Compose(
120
+ [
121
+ transforms.ToTensor(),
122
+ transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
123
+ ]
124
+ )
125
+
126
+ data_dir = config.get("data_dir", os.path.expanduser("~/data"))
127
+ os.makedirs(data_dir, exist_ok=True)
128
+ with FileLock(os.path.join(data_dir, ".ray.lock")):
129
+ train_dataset = CIFAR10(
130
+ root=data_dir, train=True, download=True, transform=transform_train
131
+ )
132
+ validation_dataset = CIFAR10(
133
+ root=data_dir, train=False, download=False, transform=transform_test
134
+ )
135
+
136
+ if config.get("test_mode"):
137
+ train_dataset = Subset(train_dataset, list(range(64)))
138
+ validation_dataset = Subset(validation_dataset, list(range(64)))
139
+
140
+ worker_batch_size = config["batch_size"] // train.get_context().get_world_size()
141
+
142
+ train_loader = DataLoader(train_dataset, batch_size=worker_batch_size, shuffle=True)
143
+ validation_loader = DataLoader(validation_dataset, batch_size=worker_batch_size)
144
+
145
+ train_loader = train.torch.prepare_data_loader(train_loader)
146
+ validation_loader = train.torch.prepare_data_loader(validation_loader)
147
+
148
+ # Create loss.
149
+ criterion = nn.CrossEntropyLoss()
150
+
151
+ for epoch in range(starting_epoch, epochs):
152
+ train_epoch(epoch, train_loader, model, criterion, optimizer)
153
+ result = validate_epoch(validation_loader, model, criterion)
154
+
155
+ with tempfile.TemporaryDirectory() as checkpoint_dir:
156
+ with open(os.path.join(checkpoint_dir, "data.ckpt"), "wb") as fp:
157
+ cpickle.dump(
158
+ {
159
+ "epoch": epoch,
160
+ "model": model.state_dict(),
161
+ "optimizer_state_dict": optimizer.state_dict(),
162
+ },
163
+ fp,
164
+ )
165
+ checkpoint = Checkpoint.from_directory(checkpoint_dir)
166
+ train.report(result, checkpoint=checkpoint)
167
+
168
+
169
+ if __name__ == "__main__":
170
+ parser = argparse.ArgumentParser()
171
+ parser.add_argument(
172
+ "--address", required=False, type=str, help="The address to use for Redis."
173
+ )
174
+ parser.add_argument(
175
+ "--num-workers",
176
+ "-n",
177
+ type=int,
178
+ default=2,
179
+ help="Sets number of workers for training.",
180
+ )
181
+ parser.add_argument(
182
+ "--num-epochs", type=int, default=5, help="Number of epochs to train."
183
+ )
184
+ parser.add_argument(
185
+ "--smoke-test",
186
+ action="store_true",
187
+ default=False,
188
+ help="Finish quickly for testing.",
189
+ )
190
+ parser.add_argument(
191
+ "--use-gpu", action="store_true", default=False, help="Enables GPU training."
192
+ )
193
+ parser.add_argument(
194
+ "--data-dir",
195
+ required=False,
196
+ type=str,
197
+ default="~/data",
198
+ help="Root directory for storing downloaded dataset.",
199
+ )
200
+ parser.add_argument(
201
+ "--synch", action="store_true", default=False, help="Use synchronous PBT."
202
+ )
203
+
204
+ args, _ = parser.parse_known_args()
205
+ if args.smoke_test:
206
+ ray.init(num_cpus=4)
207
+ else:
208
+ ray.init(address=args.address)
209
+
210
+ trainer = TorchTrainer(
211
+ train_func,
212
+ scaling_config=ScalingConfig(
213
+ num_workers=args.num_workers, use_gpu=args.use_gpu
214
+ ),
215
+ )
216
+ pbt_scheduler = PopulationBasedTraining(
217
+ time_attr="training_iteration",
218
+ perturbation_interval=1,
219
+ hyperparam_mutations={
220
+ "train_loop_config": {
221
+ # distribution for resampling
222
+ "lr": tune.loguniform(0.001, 0.1),
223
+ # allow perturbations within this set of categorical values
224
+ "momentum": [0.8, 0.9, 0.99],
225
+ }
226
+ },
227
+ synch=args.synch,
228
+ )
229
+
230
+ tuner = Tuner(
231
+ trainer,
232
+ param_space={
233
+ "train_loop_config": {
234
+ "lr": tune.grid_search([0.001, 0.01, 0.05, 0.1]),
235
+ "momentum": 0.8,
236
+ "batch_size": 128 * args.num_workers,
237
+ "test_mode": args.smoke_test, # whether to to subset the data
238
+ "data_dir": args.data_dir,
239
+ "epochs": args.num_epochs,
240
+ }
241
+ },
242
+ tune_config=TuneConfig(
243
+ num_samples=1, metric="loss", mode="min", scheduler=pbt_scheduler
244
+ ),
245
+ run_config=RunConfig(
246
+ stop={"training_iteration": 3 if args.smoke_test else args.num_epochs},
247
+ failure_config=FailureConfig(max_failures=3), # used for fault tolerance
248
+ ),
249
+ )
250
+
251
+ results = tuner.fit()
252
+
253
+ print(results.get_best_result(metric="loss", mode="min"))
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch/tune_torch_regression_example.py ADDED
@@ -0,0 +1,82 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+
3
+ import ray
4
+ from ray import tune
5
+ from ray.train import DataConfig, ScalingConfig
6
+ from ray.train.examples.pytorch.torch_regression_example import get_datasets, train_func
7
+ from ray.train.torch import TorchTrainer
8
+ from ray.tune.tune_config import TuneConfig
9
+ from ray.tune.tuner import Tuner
10
+
11
+
12
+ def tune_linear(num_workers, num_samples, use_gpu):
13
+ train_dataset, val_dataset = get_datasets()
14
+
15
+ config = {"lr": 1e-2, "hidden_size": 1, "batch_size": 4, "epochs": 3}
16
+
17
+ trainer = TorchTrainer(
18
+ train_loop_per_worker=train_func,
19
+ train_loop_config=config,
20
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
21
+ datasets={"train": train_dataset, "validation": val_dataset},
22
+ dataset_config=DataConfig(datasets_to_split=["train"]),
23
+ )
24
+
25
+ tuner = Tuner(
26
+ trainer,
27
+ param_space={
28
+ "train_loop_config": {
29
+ "lr": tune.loguniform(1e-4, 1e-1),
30
+ "batch_size": tune.choice([4, 16, 32]),
31
+ "epochs": 3,
32
+ }
33
+ },
34
+ tune_config=TuneConfig(num_samples=num_samples, metric="loss", mode="min"),
35
+ )
36
+ result_grid = tuner.fit()
37
+ best_result = result_grid.get_best_result()
38
+ print(best_result)
39
+ return best_result
40
+
41
+
42
+ if __name__ == "__main__":
43
+ parser = argparse.ArgumentParser()
44
+ parser.add_argument(
45
+ "--smoke-test",
46
+ action="store_true",
47
+ default=False,
48
+ help="Finish quickly for testing.",
49
+ )
50
+ parser.add_argument(
51
+ "--address", required=False, type=str, help="the address to use for Ray"
52
+ )
53
+ parser.add_argument(
54
+ "--num-workers",
55
+ "-n",
56
+ type=int,
57
+ default=2,
58
+ help="Sets number of workers for training.",
59
+ )
60
+ parser.add_argument(
61
+ "--num-samples",
62
+ type=int,
63
+ default=2,
64
+ help="Sets number of samples for training.",
65
+ )
66
+ parser.add_argument(
67
+ "--use-gpu", action="store_true", default=False, help="Use GPU for training."
68
+ )
69
+
70
+ args = parser.parse_args()
71
+
72
+ if args.smoke_test:
73
+ # 2 workers, 1 for trainer, 1 for datasets
74
+ ray.init(num_cpus=4)
75
+ tune_linear(num_workers=2, num_samples=1, use_gpu=False)
76
+ else:
77
+ ray.init(address=args.address)
78
+ tune_linear(
79
+ num_workers=args.num_workers,
80
+ use_gpu=args.use_gpu,
81
+ num_samples=args.num_samples,
82
+ )
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (209 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/__pycache__/distributed_sage_example.cpython-311.pyc ADDED
Binary file (12 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/pytorch_geometric/distributed_sage_example.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Adapted from https://github.com/pyg-team/pytorch_geometric/blob/2.1.0
2
+ # /examples/multi_gpu/distributed_sampling.py
3
+
4
+ import argparse
5
+ import os
6
+
7
+ import torch
8
+ import torch.nn.functional as F
9
+ from filelock import FileLock
10
+ from torch_geometric.datasets import FakeDataset, Reddit
11
+ from torch_geometric.loader import NeighborSampler
12
+ from torch_geometric.nn import SAGEConv
13
+ from torch_geometric.transforms import RandomNodeSplit
14
+
15
+ from ray import train
16
+ from ray.train import ScalingConfig
17
+ from ray.train.torch import TorchTrainer
18
+
19
+
20
+ class SAGE(torch.nn.Module):
21
+ def __init__(self, in_channels, hidden_channels, out_channels, num_layers=2):
22
+ super().__init__()
23
+ self.num_layers = num_layers
24
+
25
+ self.convs = torch.nn.ModuleList()
26
+ self.convs.append(SAGEConv(in_channels, hidden_channels))
27
+ for _ in range(self.num_layers - 2):
28
+ self.convs.append(SAGEConv(hidden_channels, hidden_channels))
29
+ self.convs.append(SAGEConv(hidden_channels, out_channels))
30
+
31
+ def forward(self, x, adjs):
32
+ for i, (edge_index, _, size) in enumerate(adjs):
33
+ x_target = x[: size[1]] # Target nodes are always placed first.
34
+ x = self.convs[i]((x, x_target), edge_index)
35
+ if i != self.num_layers - 1:
36
+ x = F.relu(x)
37
+ x = F.dropout(x, p=0.5, training=self.training)
38
+ return x.log_softmax(dim=-1)
39
+
40
+ @torch.no_grad()
41
+ def test(self, x_all, subgraph_loader):
42
+ for i in range(self.num_layers):
43
+ xs = []
44
+ for batch_size, n_id, adj in subgraph_loader:
45
+ edge_index, _, size = adj
46
+ x = x_all[n_id.to(x_all.device)].to(train.torch.get_device())
47
+ x_target = x[: size[1]]
48
+ x = self.convs[i]((x, x_target), edge_index)
49
+ if i != self.num_layers - 1:
50
+ x = F.relu(x)
51
+ xs.append(x.cpu())
52
+
53
+ x_all = torch.cat(xs, dim=0)
54
+
55
+ return x_all
56
+
57
+
58
+ def train_loop_per_worker(train_loop_config):
59
+ dataset = train_loop_config["dataset_fn"]()
60
+ batch_size = train_loop_config["batch_size"]
61
+ num_epochs = train_loop_config["num_epochs"]
62
+
63
+ data = dataset[0]
64
+ train_idx = data.train_mask.nonzero(as_tuple=False).view(-1)
65
+ train_idx = train_idx.split(
66
+ train_idx.size(0) // train.get_context().get_world_size()
67
+ )[train.get_context().get_world_rank()]
68
+
69
+ train_loader = NeighborSampler(
70
+ data.edge_index,
71
+ node_idx=train_idx,
72
+ sizes=[25, 10],
73
+ batch_size=batch_size,
74
+ shuffle=True,
75
+ )
76
+
77
+ # Disable distributed sampler since the train_loader has already been split above.
78
+ train_loader = train.torch.prepare_data_loader(train_loader, add_dist_sampler=False)
79
+
80
+ # Do validation on rank 0 worker only.
81
+ if train.get_context().get_world_rank() == 0:
82
+ subgraph_loader = NeighborSampler(
83
+ data.edge_index, node_idx=None, sizes=[-1], batch_size=2048, shuffle=False
84
+ )
85
+ subgraph_loader = train.torch.prepare_data_loader(
86
+ subgraph_loader, add_dist_sampler=False
87
+ )
88
+
89
+ model = SAGE(dataset.num_features, 256, dataset.num_classes)
90
+ model = train.torch.prepare_model(model)
91
+ optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
92
+
93
+ x, y = data.x.to(train.torch.get_device()), data.y.to(train.torch.get_device())
94
+
95
+ for epoch in range(num_epochs):
96
+ model.train()
97
+
98
+ # ``batch_size`` is the number of samples in the current batch.
99
+ # ``n_id`` are the ids of all the nodes used in the computation. This is
100
+ # needed to pull in the necessary features just for the current batch that is
101
+ # being trained on.
102
+ # ``adjs`` is a list of 3 element tuple consisting of ``(edge_index, e_id,
103
+ # size)`` for each sample in the batch, where ``edge_index``represent the
104
+ # edges of the sampled subgraph, ``e_id`` are the ids of the edges in the
105
+ # sample, and ``size`` holds the shape of the subgraph.
106
+ # See ``torch_geometric.loader.neighbor_sampler.NeighborSampler`` for more info.
107
+ for batch_size, n_id, adjs in train_loader:
108
+ optimizer.zero_grad()
109
+ out = model(x[n_id], adjs)
110
+ loss = F.nll_loss(out, y[n_id[:batch_size]])
111
+ loss.backward()
112
+ optimizer.step()
113
+
114
+ if train.get_context().get_world_rank() == 0:
115
+ print(f"Epoch: {epoch:03d}, Loss: {loss:.4f}")
116
+
117
+ train_accuracy = validation_accuracy = test_accuracy = None
118
+
119
+ # Do validation on rank 0 worker only.
120
+ if train.get_context().get_world_rank() == 0:
121
+ model.eval()
122
+ with torch.no_grad():
123
+ out = model.module.test(x, subgraph_loader)
124
+ res = out.argmax(dim=-1) == data.y
125
+ train_accuracy = int(res[data.train_mask].sum()) / int(
126
+ data.train_mask.sum()
127
+ )
128
+ validation_accuracy = int(res[data.val_mask].sum()) / int(
129
+ data.val_mask.sum()
130
+ )
131
+ test_accuracy = int(res[data.test_mask].sum()) / int(data.test_mask.sum())
132
+
133
+ train.report(
134
+ dict(
135
+ train_accuracy=train_accuracy,
136
+ validation_accuracy=validation_accuracy,
137
+ test_accuracy=test_accuracy,
138
+ )
139
+ )
140
+
141
+
142
+ def gen_fake_dataset():
143
+ """Returns a function to be called on each worker that returns a Fake Dataset."""
144
+
145
+ # For fake dataset, since the dataset is randomized, we create it once on the
146
+ # driver, and then send the same dataset to all the training workers.
147
+ # Use 10% of nodes for validation and 10% for testing.
148
+ fake_dataset = FakeDataset(transform=RandomNodeSplit(num_val=0.1, num_test=0.1))
149
+
150
+ def gen_dataset():
151
+ return fake_dataset
152
+
153
+ return gen_dataset
154
+
155
+
156
+ def gen_reddit_dataset():
157
+ """Returns a function to be called on each worker that returns Reddit Dataset."""
158
+
159
+ # For Reddit dataset, we have to download the data on each node, so we create the
160
+ # dataset on each training worker.
161
+ with FileLock(os.path.expanduser("~/.reddit_dataset_lock")):
162
+ dataset = Reddit("./data/Reddit")
163
+ return dataset
164
+
165
+
166
+ def train_gnn(
167
+ num_workers=2, use_gpu=False, epochs=3, global_batch_size=32, dataset="reddit"
168
+ ):
169
+ per_worker_batch_size = global_batch_size // num_workers
170
+
171
+ trainer = TorchTrainer(
172
+ train_loop_per_worker=train_loop_per_worker,
173
+ train_loop_config={
174
+ "num_epochs": epochs,
175
+ "batch_size": per_worker_batch_size,
176
+ "dataset_fn": gen_reddit_dataset
177
+ if dataset == "reddit"
178
+ else gen_fake_dataset(),
179
+ },
180
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
181
+ )
182
+ result = trainer.fit()
183
+ print(result.metrics)
184
+
185
+
186
+ if __name__ == "__main__":
187
+ parser = argparse.ArgumentParser()
188
+ parser.add_argument(
189
+ "--address", required=False, type=str, help="the address to use for Ray"
190
+ )
191
+ parser.add_argument(
192
+ "--num-workers",
193
+ "-n",
194
+ type=int,
195
+ default=2,
196
+ help="Sets number of workers for training.",
197
+ )
198
+ parser.add_argument(
199
+ "--use-gpu", action="store_true", help="Whether to use GPU for training."
200
+ )
201
+ parser.add_argument(
202
+ "--epochs", type=int, default=3, help="Number of epochs to train for."
203
+ )
204
+ parser.add_argument(
205
+ "--global-batch-size",
206
+ "-b",
207
+ type=int,
208
+ default=32,
209
+ help="Global batch size to use for training.",
210
+ )
211
+ parser.add_argument(
212
+ "--dataset",
213
+ "-d",
214
+ type=str,
215
+ choices=["reddit", "fake"],
216
+ default="reddit",
217
+ help="The dataset to use. Either 'reddit' or 'fake' Defaults to 'reddit'.",
218
+ )
219
+
220
+ args, _ = parser.parse_known_args()
221
+
222
+ train_gnn(
223
+ num_workers=args.num_workers,
224
+ use_gpu=args.use_gpu,
225
+ epochs=args.epochs,
226
+ global_batch_size=args.global_batch_size,
227
+ dataset=args.dataset,
228
+ )
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__init__.py ADDED
File without changes
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/__init__.cpython-311.pyc ADDED
Binary file (194 Bytes). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_autoencoder_example.cpython-311.pyc ADDED
Binary file (9.45 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_mnist_example.cpython-311.pyc ADDED
Binary file (7.07 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_quick_start.cpython-311.pyc ADDED
Binary file (4.58 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tensorflow_regression_example.cpython-311.pyc ADDED
Binary file (5.92 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tune_tensorflow_autoencoder_example.cpython-311.pyc ADDED
Binary file (3.36 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/__pycache__/tune_tensorflow_mnist_example.cpython-311.pyc ADDED
Binary file (3.45 kB). View file
 
.venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_autoencoder_example.py ADDED
@@ -0,0 +1,174 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # This example showcases how to use Tensorflow with Ray Train.
2
+ # Original code:
3
+ # https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
4
+ # https://blog.keras.io/building-autoencoders-in-keras.html
5
+ import argparse
6
+
7
+ import numpy as np
8
+ import pandas as pd
9
+ import tensorflow as tf
10
+ import tensorflow_datasets as tfds
11
+
12
+ import ray
13
+ from ray import train
14
+ from ray.air.integrations.keras import ReportCheckpointCallback
15
+ from ray.data.datasource import SimpleTensorFlowDatasource
16
+ from ray.data.extensions import TensorArray
17
+ from ray.train import Result, ScalingConfig
18
+ from ray.train.tensorflow import TensorflowTrainer, prepare_dataset_shard
19
+
20
+
21
+ def get_dataset(split_type="train"):
22
+ def dataset_factory():
23
+ return tfds.load("mnist", split=[split_type], as_supervised=True)[0].take(128)
24
+
25
+ dataset = ray.data.read_datasource(
26
+ SimpleTensorFlowDatasource(), dataset_factory=dataset_factory
27
+ )
28
+
29
+ def normalize_images(x):
30
+ x = np.float32(x.numpy()) / 255.0
31
+ x = np.reshape(x, (-1,))
32
+ return x
33
+
34
+ def preprocess_dataset(batch):
35
+ return [
36
+ (normalize_images(image), normalize_images(image)) for image, _ in batch
37
+ ]
38
+
39
+ dataset = dataset.map_batches(preprocess_dataset)
40
+
41
+ def convert_batch_to_pandas(batch):
42
+
43
+ images = [TensorArray(image) for image, _ in batch]
44
+ # because we did autoencoder here
45
+ df = pd.DataFrame({"image": images, "label": images})
46
+ return df
47
+
48
+ dataset = dataset.map_batches(convert_batch_to_pandas)
49
+ return dataset
50
+
51
+
52
+ def build_autoencoder_model() -> tf.keras.Model:
53
+ model = tf.keras.Sequential(
54
+ [
55
+ tf.keras.Input(shape=(784,)),
56
+ # encoder
57
+ tf.keras.layers.Dense(128, activation="relu"),
58
+ tf.keras.layers.Dense(64, activation="relu"),
59
+ tf.keras.layers.Dense(32, activation="relu"),
60
+ # decoder
61
+ tf.keras.layers.Dense(64, activation="relu"),
62
+ tf.keras.layers.Dense(128, activation="relu"),
63
+ tf.keras.layers.Dense(784, activation="sigmoid"),
64
+ ]
65
+ )
66
+ return model
67
+
68
+
69
+ def train_func(config: dict):
70
+
71
+ per_worker_batch_size = config.get("batch_size", 64)
72
+ epochs = config.get("epochs", 3)
73
+
74
+ dataset_shard = train.get_dataset_shard("train")
75
+
76
+ strategy = tf.distribute.MultiWorkerMirroredStrategy()
77
+
78
+ with strategy.scope():
79
+ # Model building/compiling need to be within `strategy.scope()`.
80
+ multi_worker_model = build_autoencoder_model()
81
+ learning_rate = config.get("lr", 0.001)
82
+ multi_worker_model.compile(
83
+ loss=tf.keras.losses.BinaryCrossentropy(),
84
+ optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
85
+ metrics=[
86
+ "binary_crossentropy",
87
+ ],
88
+ )
89
+
90
+ def to_tf_dataset(dataset, batch_size):
91
+ def to_tensor_iterator():
92
+ for batch in dataset.iter_tf_batches(
93
+ batch_size=batch_size, dtypes=tf.float32
94
+ ):
95
+ yield batch["image"], batch["label"]
96
+
97
+ output_signature = (
98
+ tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
99
+ tf.TensorSpec(shape=(None, 784), dtype=tf.float32),
100
+ )
101
+ tf_dataset = tf.data.Dataset.from_generator(
102
+ to_tensor_iterator, output_signature=output_signature
103
+ )
104
+ return prepare_dataset_shard(tf_dataset)
105
+
106
+ results = []
107
+ for epoch in range(epochs):
108
+ tf_dataset = to_tf_dataset(
109
+ dataset=dataset_shard,
110
+ batch_size=per_worker_batch_size,
111
+ )
112
+ history = multi_worker_model.fit(
113
+ tf_dataset, callbacks=[ReportCheckpointCallback()]
114
+ )
115
+ results.append(history.history)
116
+ return results
117
+
118
+
119
+ def train_tensorflow_mnist(
120
+ num_workers: int = 2, use_gpu: bool = False, epochs: int = 4
121
+ ) -> Result:
122
+ train_dataset = get_dataset(split_type="train")
123
+ config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
124
+ scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)
125
+ trainer = TensorflowTrainer(
126
+ train_loop_per_worker=train_func,
127
+ train_loop_config=config,
128
+ datasets={"train": train_dataset},
129
+ scaling_config=scaling_config,
130
+ )
131
+
132
+ results = trainer.fit()
133
+ print(results.metrics)
134
+ return results
135
+
136
+
137
+ if __name__ == "__main__":
138
+ parser = argparse.ArgumentParser()
139
+ parser.add_argument(
140
+ "--address", required=False, type=str, help="the address to use for Ray"
141
+ )
142
+ parser.add_argument(
143
+ "--num-workers",
144
+ "-n",
145
+ type=int,
146
+ default=2,
147
+ help="Sets number of workers for training.",
148
+ )
149
+ parser.add_argument(
150
+ "--use-gpu", action="store_true", default=False, help="Enables GPU training"
151
+ )
152
+ parser.add_argument(
153
+ "--epochs", type=int, default=3, help="Number of epochs to train for."
154
+ )
155
+ parser.add_argument(
156
+ "--smoke-test",
157
+ action="store_true",
158
+ default=False,
159
+ help="Finish quickly for testing.",
160
+ )
161
+
162
+ args, _ = parser.parse_known_args()
163
+
164
+ if args.smoke_test:
165
+ # 2 workers, 1 for trainer, 1 for datasets
166
+ num_gpus = args.num_workers if args.use_gpu else 0
167
+ ray.init(num_cpus=4, num_gpus=num_gpus)
168
+ result = train_tensorflow_mnist(num_workers=2, use_gpu=args.use_gpu)
169
+ else:
170
+ ray.init(address=args.address)
171
+ result = train_tensorflow_mnist(
172
+ num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
173
+ )
174
+ print(result)
.venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_mnist_example.py ADDED
@@ -0,0 +1,135 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # This example showcases how to use Tensorflow with Ray Train.
2
+ # Original code:
3
+ # https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras
4
+ import argparse
5
+ import json
6
+ import os
7
+
8
+ import numpy as np
9
+ import tensorflow as tf
10
+ from filelock import FileLock
11
+
12
+ from ray.air.integrations.keras import ReportCheckpointCallback
13
+ from ray.train import Result, RunConfig, ScalingConfig
14
+ from ray.train.tensorflow import TensorflowTrainer
15
+
16
+
17
+ def mnist_dataset(batch_size: int) -> tf.data.Dataset:
18
+ with FileLock(os.path.expanduser("~/.mnist_lock")):
19
+ (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
20
+ # The `x` arrays are in uint8 and have values in the [0, 255] range.
21
+ # You need to convert them to float32 with values in the [0, 1] range.
22
+ x_train = x_train / np.float32(255)
23
+ y_train = y_train.astype(np.int64)
24
+ train_dataset = (
25
+ tf.data.Dataset.from_tensor_slices((x_train, y_train))
26
+ .shuffle(60000)
27
+ .repeat()
28
+ .batch(batch_size)
29
+ )
30
+ return train_dataset
31
+
32
+
33
+ def build_cnn_model() -> tf.keras.Model:
34
+ model = tf.keras.Sequential(
35
+ [
36
+ tf.keras.Input(shape=(28, 28)),
37
+ tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
38
+ tf.keras.layers.Conv2D(32, 3, activation="relu"),
39
+ tf.keras.layers.Flatten(),
40
+ tf.keras.layers.Dense(128, activation="relu"),
41
+ tf.keras.layers.Dense(10),
42
+ ]
43
+ )
44
+ return model
45
+
46
+
47
+ def train_func(config: dict):
48
+ per_worker_batch_size = config.get("batch_size", 64)
49
+ epochs = config.get("epochs", 3)
50
+ steps_per_epoch = config.get("steps_per_epoch", 70)
51
+
52
+ tf_config = json.loads(os.environ["TF_CONFIG"])
53
+ num_workers = len(tf_config["cluster"]["worker"])
54
+
55
+ strategy = tf.distribute.MultiWorkerMirroredStrategy()
56
+
57
+ global_batch_size = per_worker_batch_size * num_workers
58
+ multi_worker_dataset = mnist_dataset(global_batch_size)
59
+
60
+ with strategy.scope():
61
+ # Model building/compiling need to be within `strategy.scope()`.
62
+ multi_worker_model = build_cnn_model()
63
+ learning_rate = config.get("lr", 0.001)
64
+ multi_worker_model.compile(
65
+ loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
66
+ optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
67
+ metrics=["accuracy"],
68
+ )
69
+
70
+ history = multi_worker_model.fit(
71
+ multi_worker_dataset,
72
+ epochs=epochs,
73
+ steps_per_epoch=steps_per_epoch,
74
+ callbacks=[ReportCheckpointCallback()],
75
+ )
76
+ results = history.history
77
+ return results
78
+
79
+
80
+ def train_tensorflow_mnist(
81
+ num_workers: int = 2,
82
+ use_gpu: bool = False,
83
+ epochs: int = 4,
84
+ storage_path: str = None,
85
+ ) -> Result:
86
+ config = {"lr": 1e-3, "batch_size": 64, "epochs": epochs}
87
+ trainer = TensorflowTrainer(
88
+ train_loop_per_worker=train_func,
89
+ train_loop_config=config,
90
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
91
+ run_config=RunConfig(storage_path=storage_path),
92
+ )
93
+ results = trainer.fit()
94
+ return results
95
+
96
+
97
+ if __name__ == "__main__":
98
+ parser = argparse.ArgumentParser()
99
+ parser.add_argument(
100
+ "--address", required=False, type=str, help="the address to use for Ray"
101
+ )
102
+ parser.add_argument(
103
+ "--num-workers",
104
+ "-n",
105
+ type=int,
106
+ default=2,
107
+ help="Sets number of workers for training.",
108
+ )
109
+ parser.add_argument(
110
+ "--use-gpu", action="store_true", default=False, help="Enables GPU training"
111
+ )
112
+ parser.add_argument(
113
+ "--epochs", type=int, default=3, help="Number of epochs to train for."
114
+ )
115
+ parser.add_argument(
116
+ "--smoke-test",
117
+ action="store_true",
118
+ default=False,
119
+ help="Finish quickly for testing.",
120
+ )
121
+
122
+ args, _ = parser.parse_known_args()
123
+
124
+ import ray
125
+
126
+ if args.smoke_test:
127
+ # 2 workers, 1 for trainer, 1 for datasets
128
+ num_gpus = args.num_workers if args.use_gpu else 0
129
+ ray.init(num_cpus=4, num_gpus=num_gpus)
130
+ train_tensorflow_mnist(num_workers=2, use_gpu=args.use_gpu)
131
+ else:
132
+ ray.init(address=args.address)
133
+ train_tensorflow_mnist(
134
+ num_workers=args.num_workers, use_gpu=args.use_gpu, epochs=args.epochs
135
+ )
.venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_quick_start.py ADDED
@@ -0,0 +1,87 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # ruff: noqa
2
+ # fmt: off
3
+ # isort: skip_file
4
+
5
+ # __tf_setup_begin__
6
+ import sys
7
+ import numpy as np
8
+
9
+ if sys.version_info >= (3, 12):
10
+ # Tensorflow is not installed for Python 3.12 because of keras compatibility.
11
+ sys.exit(0)
12
+ else:
13
+ import tensorflow as tf
14
+
15
+ def mnist_dataset(batch_size):
16
+ (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
17
+ # The `x` arrays are in uint8 and have values in the [0, 255] range.
18
+ # You need to convert them to float32 with values in the [0, 1] range.
19
+ x_train = x_train / np.float32(255)
20
+ y_train = y_train.astype(np.int64)
21
+ train_dataset = tf.data.Dataset.from_tensor_slices(
22
+ (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
23
+ return train_dataset
24
+
25
+
26
+ def build_and_compile_cnn_model():
27
+ model = tf.keras.Sequential([
28
+ tf.keras.layers.InputLayer(input_shape=(28, 28)),
29
+ tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
30
+ tf.keras.layers.Conv2D(32, 3, activation='relu'),
31
+ tf.keras.layers.Flatten(),
32
+ tf.keras.layers.Dense(128, activation='relu'),
33
+ tf.keras.layers.Dense(10)
34
+ ])
35
+ model.compile(
36
+ loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
37
+ optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
38
+ metrics=['accuracy'])
39
+ return model
40
+ # __tf_setup_end__
41
+
42
+ # __tf_single_begin__
43
+ def train_func():
44
+ batch_size = 64
45
+ single_worker_dataset = mnist_dataset(batch_size)
46
+ single_worker_model = build_and_compile_cnn_model()
47
+ single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)
48
+ # __tf_single_end__
49
+
50
+ # __tf_distributed_begin__
51
+ import json
52
+ import os
53
+
54
+ def train_func_distributed():
55
+ per_worker_batch_size = 64
56
+ # This environment variable will be set by Ray Train.
57
+ tf_config = json.loads(os.environ['TF_CONFIG'])
58
+ num_workers = len(tf_config['cluster']['worker'])
59
+
60
+ strategy = tf.distribute.MultiWorkerMirroredStrategy()
61
+
62
+ global_batch_size = per_worker_batch_size * num_workers
63
+ multi_worker_dataset = mnist_dataset(global_batch_size)
64
+
65
+ with strategy.scope():
66
+ # Model building/compiling need to be within `strategy.scope()`.
67
+ multi_worker_model = build_and_compile_cnn_model()
68
+
69
+ multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
70
+ # __tf_distributed_end__
71
+
72
+ if __name__ == "__main__":
73
+ # __tf_single_run_begin__
74
+ train_func()
75
+ # __tf_single_run_end__
76
+
77
+ # __tf_trainer_begin__
78
+ from ray.train.tensorflow import TensorflowTrainer
79
+ from ray.train import ScalingConfig
80
+
81
+ # For GPU Training, set `use_gpu` to True.
82
+ use_gpu = False
83
+
84
+ trainer = TensorflowTrainer(train_func_distributed, scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu))
85
+
86
+ trainer.fit()
87
+ # __tf_trainer_end__
.venv/lib/python3.11/site-packages/ray/train/examples/tf/tensorflow_regression_example.py ADDED
@@ -0,0 +1,111 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import sys
3
+
4
+ import ray
5
+ from ray import train
6
+ from ray.data.preprocessors import Concatenator
7
+ from ray.train import Result, ScalingConfig
8
+
9
+ if sys.version_info >= (3, 12):
10
+ # Skip this test in Python 3.12+ because TensorFlow is not supported.
11
+ sys.exit(0)
12
+ else:
13
+ import tensorflow as tf
14
+
15
+ from ray.air.integrations.keras import ReportCheckpointCallback
16
+ from ray.train.tensorflow import TensorflowTrainer
17
+
18
+
19
+ def build_model() -> tf.keras.Model:
20
+ model = tf.keras.Sequential(
21
+ [
22
+ tf.keras.layers.InputLayer(input_shape=(100,)),
23
+ tf.keras.layers.Dense(10),
24
+ tf.keras.layers.Dense(1),
25
+ ]
26
+ )
27
+ return model
28
+
29
+
30
+ def train_func(config: dict):
31
+ batch_size = config.get("batch_size", 64)
32
+ epochs = config.get("epochs", 3)
33
+
34
+ strategy = tf.distribute.MultiWorkerMirroredStrategy()
35
+ with strategy.scope():
36
+ # Model building/compiling need to be within `strategy.scope()`.
37
+ multi_worker_model = build_model()
38
+ multi_worker_model.compile(
39
+ optimizer=tf.keras.optimizers.SGD(learning_rate=config.get("lr", 1e-3)),
40
+ loss=tf.keras.losses.mean_absolute_error,
41
+ metrics=[tf.keras.metrics.mean_squared_error],
42
+ )
43
+
44
+ dataset = train.get_dataset_shard("train")
45
+
46
+ results = []
47
+ for _ in range(epochs):
48
+ tf_dataset = dataset.to_tf(
49
+ feature_columns="x", label_columns="y", batch_size=batch_size
50
+ )
51
+ history = multi_worker_model.fit(
52
+ tf_dataset, callbacks=[ReportCheckpointCallback()]
53
+ )
54
+ results.append(history.history)
55
+ return results
56
+
57
+
58
+ def train_tensorflow_regression(num_workers: int = 2, use_gpu: bool = False) -> Result:
59
+ dataset = ray.data.read_csv("s3://anonymous@air-example-data/regression.csv")
60
+ columns_to_concatenate = [f"x{i:03}" for i in range(100)]
61
+ preprocessor = Concatenator(columns=columns_to_concatenate, output_column_name="x")
62
+ dataset = preprocessor.fit_transform(dataset)
63
+
64
+ config = {"lr": 1e-3, "batch_size": 32, "epochs": 4}
65
+ scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)
66
+ trainer = TensorflowTrainer(
67
+ train_loop_per_worker=train_func,
68
+ train_loop_config=config,
69
+ scaling_config=scaling_config,
70
+ datasets={"train": dataset},
71
+ )
72
+ results = trainer.fit()
73
+ print(results.metrics)
74
+ return results
75
+
76
+
77
+ if __name__ == "__main__":
78
+ parser = argparse.ArgumentParser()
79
+ parser.add_argument(
80
+ "--address", required=False, type=str, help="the address to use for Ray"
81
+ )
82
+ parser.add_argument(
83
+ "--num-workers",
84
+ "-n",
85
+ type=int,
86
+ default=2,
87
+ help="Sets number of workers for training.",
88
+ )
89
+ parser.add_argument(
90
+ "--use-gpu", action="store_true", default=False, help="Enables GPU training"
91
+ )
92
+ parser.add_argument(
93
+ "--smoke-test",
94
+ action="store_true",
95
+ default=False,
96
+ help="Finish quickly for testing.",
97
+ )
98
+
99
+ args, _ = parser.parse_known_args()
100
+
101
+ if args.smoke_test:
102
+ # 2 workers, 1 for trainer, 1 for datasets
103
+ num_gpus = args.num_workers if args.use_gpu else 0
104
+ ray.init(num_cpus=4, num_gpus=num_gpus)
105
+ result = train_tensorflow_regression(num_workers=2, use_gpu=args.use_gpu)
106
+ else:
107
+ ray.init(address=args.address)
108
+ result = train_tensorflow_regression(
109
+ num_workers=args.num_workers, use_gpu=args.use_gpu
110
+ )
111
+ print(result)
.venv/lib/python3.11/site-packages/ray/train/examples/tf/tune_tensorflow_mnist_example.py ADDED
@@ -0,0 +1,80 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import argparse
2
+ import sys
3
+
4
+ import ray
5
+ from ray import tune
6
+ from ray.train import ScalingConfig
7
+ from ray.tune.tune_config import TuneConfig
8
+ from ray.tune.tuner import Tuner
9
+
10
+ if sys.version_info >= (3, 12):
11
+ # Skip this test in Python 3.12+ because TensorFlow is not supported.
12
+ exit(0)
13
+ else:
14
+ from ray.train.examples.tf.tensorflow_mnist_example import train_func
15
+ from ray.train.tensorflow import TensorflowTrainer
16
+
17
+
18
+ def tune_tensorflow_mnist(
19
+ num_workers: int = 2, num_samples: int = 2, use_gpu: bool = False
20
+ ):
21
+ trainer = TensorflowTrainer(
22
+ train_loop_per_worker=train_func,
23
+ scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
24
+ )
25
+ tuner = Tuner(
26
+ trainer,
27
+ tune_config=TuneConfig(num_samples=num_samples, metric="accuracy", mode="max"),
28
+ param_space={
29
+ "train_loop_config": {
30
+ "lr": tune.loguniform(1e-4, 1e-1),
31
+ "batch_size": tune.choice([32, 64, 128]),
32
+ "epochs": 3,
33
+ }
34
+ },
35
+ )
36
+ best_accuracy = tuner.fit().get_best_result().metrics["accuracy"]
37
+ print(f"Best accuracy config: {best_accuracy}")
38
+
39
+
40
+ if __name__ == "__main__":
41
+ parser = argparse.ArgumentParser()
42
+ parser.add_argument(
43
+ "--smoke-test",
44
+ action="store_true",
45
+ default=False,
46
+ help="Finish quickly for testing.",
47
+ )
48
+ parser.add_argument(
49
+ "--address", required=False, type=str, help="the address to use for Ray"
50
+ )
51
+ parser.add_argument(
52
+ "--num-workers",
53
+ "-n",
54
+ type=int,
55
+ default=2,
56
+ help="Sets number of workers for training.",
57
+ )
58
+ parser.add_argument(
59
+ "--num-samples",
60
+ type=int,
61
+ default=2,
62
+ help="Sets number of samples for training.",
63
+ )
64
+ parser.add_argument(
65
+ "--use-gpu", action="store_true", default=False, help="Enables GPU training"
66
+ )
67
+
68
+ args = parser.parse_args()
69
+
70
+ if args.smoke_test:
71
+ num_gpus = args.num_workers if args.use_gpu else 0
72
+ ray.init(num_cpus=8, num_gpus=num_gpus)
73
+ tune_tensorflow_mnist(num_workers=2, num_samples=2, use_gpu=args.use_gpu)
74
+ else:
75
+ ray.init(address=args.address)
76
+ tune_tensorflow_mnist(
77
+ num_workers=args.num_workers,
78
+ num_samples=args.num_samples,
79
+ use_gpu=args.use_gpu,
80
+ )