Baladithya Balamurugan Claude Opus 4.8 (1M context) commited on
Commit
bd37412
·
1 Parent(s): 50e3f0e

Wave 20: close F4 s3:// gap — live-S3 DiLoCo allreduce smoke (AWS_SMOKE-gated)

Browse files

ObjectStoreAllReduce's S3 branches (_init_fsspec/_put/_exists/_get over
s3fs) previously had only mock coverage; F4 flagged "s3:// never
exercised against real S3" as the highest-value unproven gap.

Adds test_s3_rendezvous_allreduce_across_replicas to test_serverless_local.py
(torchft-free, so it runs on Apple Silicon CI too): 2 OS processes call
ObjectStoreAllReduce.allreduce() over an s3:// rendezvous, every rank must
end with the cross-rank mean. Reuses the existing spawn-importable
_replica_compute_and_sync. Gated on AWS_SMOKE=1; skips cleanly otherwise.

Verified live 2026-06-09 against
s3://amazon-sagemaker-386931836011-us-west-2-7597bf4d9a3d/diloco-rdv/:
both ranks converged to identical weights (max|diff|=0.00e+00), both
round_000000/rank_000{0,1}.pt objects present in S3 — proving the
PUT->poll->GET->mean path and S3 strong read-after-write consistency
hold cross-process. Full file: 10 passed, 1 skipped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

composer_replication/diloco/serverless/tests/test_serverless_local.py CHANGED
@@ -182,6 +182,105 @@ def test_local_executor_handles_multiple_rounds():
182
  assert all(abs(v - 100.0) < 1e-4 for v in r["result"]["avg2"])
183
 
184
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  def _replica_that_raises(rendezvous_uri: str, world_size: int) -> dict:
186
  """Simulates a replica that crashes mid-run."""
187
  rank = int(os.environ["REPLICA_RANK"])
 
182
  assert all(abs(v - 100.0) < 1e-4 for v in r["result"]["avg2"])
183
 
184
 
185
+ # ---------------------------------------------------------------------
186
+ # Live-S3 smoke (F4 step 1): the file:// → s3:// transport gap.
187
+ #
188
+ # ObjectStoreAllReduce's S3 branches (_init_fsspec/_put/_exists/_get over
189
+ # s3fs) only have mock coverage; this exercises them against REAL S3 with
190
+ # concurrent OS processes, relying on S3's strong read-after-write
191
+ # consistency (the poll loop's _exists()→_get() assumption). Gated on
192
+ # AWS_SMOKE=1 so it never runs in ordinary CI / on machines without creds.
193
+ #
194
+ # Run it with:
195
+ # AWS_SMOKE=1 AWS_REGION=us-west-2 \
196
+ # DILOCO_S3_RENDEZVOUS=s3://<sagemaker-bucket>/diloco-rdv \
197
+ # pytest composer_replication/diloco/serverless/tests/test_serverless_local.py \
198
+ # -k s3_rendezvous -s
199
+ #
200
+ # Use a sagemaker-named bucket: stock AmazonSageMakerFullAccess only grants
201
+ # S3 on buckets whose name contains "sagemaker"/"aws-glue" — a custom-named
202
+ # bucket would 403 the first PUT and hang every peer until timeout_s (F4 §3).
203
+ # Verified PASS 2026-06-09 against
204
+ # s3://amazon-sagemaker-386931836011-us-west-2-7597bf4d9a3d/diloco-rdv/.
205
+ # ---------------------------------------------------------------------
206
+
207
+
208
+ def _s3_smoke_enabled() -> bool:
209
+ return os.environ.get("AWS_SMOKE") == "1"
210
+
211
+
212
+ @pytest.mark.skipif(
213
+ not _s3_smoke_enabled(),
214
+ reason="live-S3 smoke; set AWS_SMOKE=1 (+ AWS creds, DILOCO_S3_RENDEZVOUS) to run",
215
+ )
216
+ @pytest.mark.parametrize("n_replicas", [2])
217
+ def test_s3_rendezvous_allreduce_across_replicas(n_replicas):
218
+ """Real-S3 analogue of test_local_executor_runs_allreduce_across_replicas.
219
+
220
+ Same property (N processes call allreduce, every rank ends with the
221
+ cross-rank mean) but over an ``s3://`` rendezvous instead of a tmp dir,
222
+ so it actually drives s3fs PUT/poll/GET and depends on S3 strong
223
+ read-after-write consistency. This is the cheapest (≈$0, no GPU) closure
224
+ of F4's documented "ObjectStoreAllReduce over s3:// never exercised
225
+ against real S3" gap.
226
+ """
227
+ import uuid
228
+
229
+ pytest.importorskip("s3fs", reason="s3fs required for the live-S3 smoke")
230
+ import s3fs
231
+
232
+ base = os.environ.get(
233
+ "DILOCO_S3_RENDEZVOUS",
234
+ "s3://amazon-sagemaker-386931836011-us-west-2-7597bf4d9a3d/diloco-rdv",
235
+ ).rstrip("/")
236
+ rendezvous = f"{base}/smoke-{uuid.uuid4().hex[:8]}/"
237
+
238
+ executor = LocalProcessExecutor()
239
+ handles = executor.launch_replicas(
240
+ n_replicas=n_replicas,
241
+ entrypoint=f"{__name__}._replica_compute_and_sync",
242
+ entrypoint_args={
243
+ "rendezvous_uri": rendezvous,
244
+ "world_size": n_replicas,
245
+ "rank_value": 10.0,
246
+ "rank_env": "REPLICA_RANK",
247
+ },
248
+ timeout=300,
249
+ )
250
+ try:
251
+ results = executor.collect(handles, timeout=300)
252
+
253
+ for r in results:
254
+ assert r["status"] == "succeeded", (
255
+ f"rank {r['rank']} failed (S3 rendezvous {rendezvous}): "
256
+ f"{r.get('error')}"
257
+ )
258
+
259
+ # Every rank must agree on the mean — only possible if each read the
260
+ # SAME peer objects through S3 (proves the cross-process exchange).
261
+ N = n_replicas
262
+ expected_mean = 10.0 * (N * (N + 1) / 2) / N
263
+ for r in results:
264
+ for v in r["result"]["post"]:
265
+ assert abs(v - expected_mean) < 1e-4, (
266
+ f"rank {r['rank']}: expected S3-averaged mean {expected_mean}, "
267
+ f"got {v}"
268
+ )
269
+
270
+ # Both ranks' pseudo-gradient objects must be present in S3.
271
+ fs = s3fs.S3FileSystem()
272
+ listing = fs.ls(rendezvous.replace("s3://", "") + "round_000000/")
273
+ got = {os.path.basename(p) for p in listing}
274
+ expected = {f"rank_{r:04d}.pt" for r in range(n_replicas)}
275
+ assert expected <= got, f"missing rank objects in S3: {expected - got}"
276
+ finally:
277
+ # Best-effort cleanup so repeated smokes don't accrete prefixes.
278
+ try:
279
+ s3fs.S3FileSystem().rm(rendezvous.replace("s3://", ""), recursive=True)
280
+ except Exception:
281
+ pass
282
+
283
+
284
  def _replica_that_raises(rendezvous_uri: str, world_size: int) -> dict:
285
  """Simulates a replica that crashes mid-run."""
286
  rank = int(os.environ["REPLICA_RANK"])