Bc-AI commited on
Commit
41df2ce
Β·
verified Β·
1 Parent(s): 3987421

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +143 -29
app.py CHANGED
@@ -191,14 +191,88 @@ def fuse_stdout(fragments):
191
 
192
 
193
  # ─── Fragmentation ─────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
194
  def fragment_code(code, cfg, task_id, libs):
 
 
 
195
  frags = []
196
  cpus = max(1, cfg.get("cpus", 1))
197
- needs_gpu = cfg.get("gpus", 0) > 0
 
 
198
  ram = cfg.get("ram_gb", 4)
199
- target = max(2, min(cpus * 2, 8))
200
-
201
  loop = None
 
 
202
  try:
203
  tree = ast.parse(code)
204
  for node in ast.walk(tree):
@@ -207,62 +281,102 @@ def fragment_code(code, cfg, task_id, libs):
207
  if isinstance(fn, ast.Name) and fn.id == "range":
208
  args = node.iter.args
209
  try:
210
- if len(args) == 1: loop = {"s": 0, "e": ast.literal_eval(args[0])}
211
- elif len(args) >= 2: loop = {"s": ast.literal_eval(args[0]), "e": ast.literal_eval(args[1])}
 
 
 
212
  if loop:
213
  loop["var"] = node.target.id if isinstance(node.target, ast.Name) else "i"
 
 
 
 
 
 
 
 
 
214
  break
215
- except: loop = None
216
- except: pass
217
-
218
- if loop and (loop["e"] - loop["s"]) >= 20:
 
 
 
219
  total = loop["e"] - loop["s"]
220
- n = min(target, total)
221
- chunk = math.ceil(total / n)
222
- patterns = ([f"range({loop['e']})"] if loop["s"] == 0 else []) + [
223
- f"range({loop['s']}, {loop['e']})", f"range({loop['s']},{loop['e']})"]
224
- if loop["s"] == 0: patterns.append(f"range(0, {loop['e']})")
225
-
 
 
 
 
 
 
 
226
  for idx in range(n):
227
  cs = loop["s"] + idx * chunk
228
  ce = min(loop["s"] + (idx + 1) * chunk, loop["e"])
229
- if cs >= loop["e"]: break
 
 
230
  mod = code
231
  for pat in patterns:
232
  if pat in mod:
233
  mod = mod.replace(pat, f"range({cs}, {ce})", 1)
234
  break
235
-
236
- # Inject SACCP metadata
237
  header = f"""# ═══ SACCP Fragment {idx}/{n} ═══
238
  __saccp_rank__ = {idx}
239
  __saccp_world_size__ = {n}
240
  __saccp_chunk__ = ({cs}, {ce})
 
241
  """
 
242
  frags.append({
243
- "fragment_id": f"{task_id}_frag_{idx}", "task_id": task_id,
244
- "fragment_index": idx, "fragment_type": "compute",
 
 
245
  "code": header + mod,
246
  "input_data": json.dumps({"rank": idx, "world_size": n, "chunk": [cs, ce]}),
247
- "required_libs": libs, "required_gpu": needs_gpu,
248
- "min_ram_gb": max(1, ram // n), "timeout_seconds": 600,
 
 
249
  })
250
-
 
251
  if not frags:
 
 
 
252
  header = """# ═══ SACCP Single Fragment ═══
253
  __saccp_rank__ = 0
254
  __saccp_world_size__ = 1
 
255
  """
 
256
  frags.append({
257
- "fragment_id": f"{task_id}_frag_0", "task_id": task_id,
258
- "fragment_index": 0, "fragment_type": "compute",
 
 
259
  "code": header + code,
260
  "input_data": json.dumps({"rank": 0, "world_size": 1}),
261
- "required_libs": libs, "required_gpu": needs_gpu,
262
- "min_ram_gb": ram, "timeout_seconds": 600,
 
 
263
  })
264
-
265
- print(f"[HEAD] Task {task_id[:8]} β†’ {len(frags)} fragments")
266
  return frags
267
 
268
 
 
191
 
192
 
193
  # ─── Fragmentation ─────────────────────────────────────────────────────────
194
+ # Add this to your head node fragmentation logic
195
+
196
+ def analyze_gpu_requirements(code):
197
+ """
198
+ Detect if code needs GPU by looking for:
199
+ 1. @gpu_required decorator
200
+ 2. torch.cuda usage
201
+ 3. device="cuda" patterns
202
+ """
203
+
204
+ # Check for explicit decorator
205
+ if "@gpu_required" in code or "@requires_gpu" in code:
206
+ return True
207
+
208
+ # Check for torch GPU patterns
209
+ gpu_patterns = [
210
+ "torch.cuda",
211
+ 'device="cuda"',
212
+ "device='cuda'",
213
+ ".cuda()",
214
+ "torch.device('cuda')",
215
+ 'torch.device("cuda")',
216
+ ]
217
+
218
+ return any(pattern in code for pattern in gpu_patterns)
219
+
220
+
221
+ def is_parallelizable_safely(code, loop_info):
222
+ """
223
+ Determine if a loop can be safely parallelized.
224
+ Returns: (can_parallelize: bool, reason: str)
225
+ """
226
+
227
+ # Don't parallelize GPU training loops
228
+ gpu_training_keywords = [
229
+ "model.train()",
230
+ "optimizer.step()",
231
+ "loss.backward()",
232
+ ".backward()",
233
+ "torch.nn",
234
+ "nn.Module",
235
+ ]
236
+
237
+ if any(kw in code for kw in gpu_training_keywords):
238
+ return False, "GPU training loop (stateful)"
239
+
240
+ # Don't parallelize if it has shared state
241
+ stateful_keywords = [
242
+ "global ",
243
+ "nonlocal ",
244
+ ".append(", # modifying shared list
245
+ "self.", # class methods
246
+ ]
247
+
248
+ if any(kw in code for kw in stateful_keywords):
249
+ return False, "Stateful operations detected"
250
+
251
+ # Safe to parallelize: map-reduce style loops
252
+ safe_patterns = [
253
+ "for i in range",
254
+ "for idx in range",
255
+ "for chunk in range",
256
+ ]
257
+
258
+ return True, "Independent iterations"
259
+
260
+
261
  def fragment_code(code, cfg, task_id, libs):
262
+ """
263
+ IMPROVED fragmentation with GPU awareness
264
+ """
265
  frags = []
266
  cpus = max(1, cfg.get("cpus", 1))
267
+
268
+ # Detect GPU requirements
269
+ needs_gpu = cfg.get("gpus", 0) > 0 or analyze_gpu_requirements(code)
270
  ram = cfg.get("ram_gb", 4)
271
+
272
+ # Find loops
273
  loop = None
274
+ loop_safe = False
275
+
276
  try:
277
  tree = ast.parse(code)
278
  for node in ast.walk(tree):
 
281
  if isinstance(fn, ast.Name) and fn.id == "range":
282
  args = node.iter.args
283
  try:
284
+ if len(args) == 1:
285
+ loop = {"s": 0, "e": ast.literal_eval(args[0])}
286
+ elif len(args) >= 2:
287
+ loop = {"s": ast.literal_eval(args[0]), "e": ast.literal_eval(args[1])}
288
+
289
  if loop:
290
  loop["var"] = node.target.id if isinstance(node.target, ast.Name) else "i"
291
+
292
+ # Check if safe to parallelize
293
+ can_parallelize, reason = is_parallelizable_safely(code, loop)
294
+ if can_parallelize:
295
+ loop_safe = True
296
+ print(f"[HEAD] Loop is safe to parallelize: {reason}")
297
+ else:
298
+ print(f"[HEAD] Loop NOT safe to parallelize: {reason}")
299
+ loop = None
300
  break
301
+ except:
302
+ loop = None
303
+ except:
304
+ pass
305
+
306
+ # Only fragment if safe AND big enough
307
+ if loop and loop_safe and (loop["e"] - loop["s"]) >= 50: # higher threshold
308
  total = loop["e"] - loop["s"]
309
+ n = min(cpus * 2, total // 10) # at least 10 iterations per fragment
310
+ chunk = max(10, total // n)
311
+
312
+ print(f"[HEAD] Fragmenting loop: {total} iterations β†’ {n} fragments ({chunk} each)")
313
+
314
+ # Build pattern matching
315
+ patterns = []
316
+ if loop["s"] == 0:
317
+ patterns.append(f"range({loop['e']})")
318
+ patterns.append(f"range(0, {loop['e']})")
319
+ patterns.append(f"range({loop['s']}, {loop['e']})")
320
+ patterns.append(f"range({loop['s']},{loop['e']})")
321
+
322
  for idx in range(n):
323
  cs = loop["s"] + idx * chunk
324
  ce = min(loop["s"] + (idx + 1) * chunk, loop["e"])
325
+ if cs >= loop["e"]:
326
+ break
327
+
328
  mod = code
329
  for pat in patterns:
330
  if pat in mod:
331
  mod = mod.replace(pat, f"range({cs}, {ce})", 1)
332
  break
333
+
334
+ # Inject metadata
335
  header = f"""# ═══ SACCP Fragment {idx}/{n} ═══
336
  __saccp_rank__ = {idx}
337
  __saccp_world_size__ = {n}
338
  __saccp_chunk__ = ({cs}, {ce})
339
+ __saccp_is_fragment__ = True
340
  """
341
+
342
  frags.append({
343
+ "fragment_id": f"{task_id}_frag_{idx}",
344
+ "task_id": task_id,
345
+ "fragment_index": idx,
346
+ "fragment_type": "compute",
347
  "code": header + mod,
348
  "input_data": json.dumps({"rank": idx, "world_size": n, "chunk": [cs, ce]}),
349
+ "required_libs": libs,
350
+ "required_gpu": needs_gpu,
351
+ "min_ram_gb": max(1, ram // n),
352
+ "timeout_seconds": 600,
353
  })
354
+
355
+ # Fallback: single fragment
356
  if not frags:
357
+ reason = "single" if not loop else ("unsafe" if not loop_safe else "too small")
358
+ print(f"[HEAD] No fragmentation ({reason}) β€” single fragment")
359
+
360
  header = """# ═══ SACCP Single Fragment ═══
361
  __saccp_rank__ = 0
362
  __saccp_world_size__ = 1
363
+ __saccp_is_fragment__ = False
364
  """
365
+
366
  frags.append({
367
+ "fragment_id": f"{task_id}_frag_0",
368
+ "task_id": task_id,
369
+ "fragment_index": 0,
370
+ "fragment_type": "compute",
371
  "code": header + code,
372
  "input_data": json.dumps({"rank": 0, "world_size": 1}),
373
+ "required_libs": libs,
374
+ "required_gpu": needs_gpu,
375
+ "min_ram_gb": ram,
376
+ "timeout_seconds": 600,
377
  })
378
+
379
+ print(f"[HEAD] Task {task_id[:8]} β†’ {len(frags)} fragments (GPU required: {needs_gpu})")
380
  return frags
381
 
382