Bc-AI commited on
Commit
bca1298
Β·
verified Β·
1 Parent(s): bd200f0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +328 -391
app.py CHANGED
@@ -1,59 +1,53 @@
1
  """
2
- SharePUTERβ„’ v2 Head Node β€” v2.0.3 FIXED
3
- Key fix: completion check happens IN submit_result, not background task.
4
  """
5
 
6
- from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
7
  from fastapi.responses import HTMLResponse, JSONResponse
8
  from fastapi.middleware.cors import CORSMiddleware
9
  from pydantic import BaseModel
10
  from typing import Optional, Dict, Any
11
  import httpx, asyncio, uuid, os, json, time, ast, math
12
- from datetime import datetime, timedelta
13
 
14
- app = FastAPI(title="SharePUTERβ„’ v2 Head Node")
15
  app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
16
 
17
- DB_URL = os.environ.get("SACCP_DB_URL", "https://saccpshareputer1.pythonanywhere.com/")
18
  DB_SECRET = os.environ.get("SACCP_DB_SECRET", "saccp-v2-master-key")
19
  HEAD_ID = os.environ.get("HEAD_NODE_ID", f"head-{uuid.uuid4().hex[:8]}")
20
 
21
  UNIT_PRICING = {
22
  "cpu": 0.5, "ram": 0.1,
23
  "gpu_t4": 4.0, "gpu_a10": 10.0, "gpu_a100": 25.0, "gpu_generic": 5.0,
 
 
24
  }
25
- NODE_PAY_RATES = {"RAM": 0.3, "CPU": 1.5, "GPU": 5.0}
26
  active_nodes: Dict[str, dict] = {}
27
 
 
28
 
29
- def db_headers():
30
- return {"X-SACCP-Secret": DB_SECRET, "Content-Type": "application/json"}
31
-
32
-
33
- async def db_req(method: str, path: str, **kwargs):
34
  try:
35
- async with httpx.AsyncClient(timeout=30.0) as c:
36
- resp = await getattr(c, method)(f"{DB_URL}{path}", headers=db_headers(), **kwargs)
37
- return resp
38
  except Exception as e:
39
- print(f"[HEAD] DB ERROR {method} {path}: {e}")
40
  return None
41
 
42
 
43
- # ─── Models (Fixed Optional types for Pydantic v2) ─────────────────────────
44
  class UserCreate(BaseModel):
45
  username: str
46
  password: str
47
 
48
-
49
  class TaskSubmit(BaseModel):
50
  api_key: str
51
  code: str
52
  task_type: str = "general"
53
  config: dict = {}
54
  required_libs: list = []
55
- ml_config: Optional[dict] = None
56
-
57
 
58
  class NodeRegister(BaseModel):
59
  node_type: str
@@ -61,97 +55,149 @@ class NodeRegister(BaseModel):
61
  owner: str = "anonymous"
62
  specs: dict = {}
63
  installed_libs: list = []
64
- sandbox_enabled: bool = True
65
 
66
 
67
- # ─── Homepage ───────────────────────────────────────────────────────────────
68
  @app.get("/", response_class=HTMLResponse)
69
  async def homepage():
70
- try:
71
- sr = await db_req("get", "/stats")
72
- s = sr.json() if sr and sr.status_code == 200 else {}
73
- except:
74
- s = {}
75
  return HTMLResponse(f"""<!DOCTYPE html>
76
- <html><head><title>SharePUTERβ„’ v2</title>
77
- <style>body{{font-family:system-ui;background:#0a0a0f;color:#ddd;padding:40px;max-width:900px;margin:0 auto}}
78
- h1{{color:#00ff88}}.st{{display:inline-block;background:#111;padding:20px;margin:8px;border-radius:12px;
79
- min-width:120px;text-align:center}}.st .n{{font-size:2rem;color:#00ff88;font-weight:bold}}
80
- .st .l{{color:#888;font-size:.8rem}}</style></head><body>
81
- <h1>🍽️ SharePUTERβ„’ v2.0.3</h1><p>Head Node β€” {HEAD_ID}</p>
82
- <div>
83
- <div class="st"><div class="n">{s.get('online_nodes',0)}</div><div class="l">Nodes</div></div>
84
- <div class="st"><div class="n">{s.get('total_users',0)}</div><div class="l">Users</div></div>
85
- <div class="st"><div class="n">{s.get('active_tasks',0)}</div><div class="l">Active</div></div>
86
- <div class="st"><div class="n">{s.get('completed_tasks',0)}</div><div class="l">Done</div></div>
87
- <div class="st"><div class="n">{s.get('pending_fragments',0)}</div><div class="l">Pending</div></div>
88
- <div class="st"><div class="n">{s.get('completed_fragments',0)}</div><div class="l">Frags Done</div></div>
89
- </div></body></html>""")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
 
91
 
92
  # ─── Auth ───────────────────────────────────────────────────────────────────
93
  @app.post("/api/register")
94
- async def register(user: UserCreate):
95
- r = await db_req("post", "/users", json={"username": user.username, "password": user.password})
96
- if not r or r.status_code != 201:
97
- raise HTTPException(r.status_code if r else 500, r.json().get("error") if r else "DB error")
98
  return r.json()
99
 
100
-
101
  @app.post("/api/login")
102
- async def login(user: UserCreate):
103
- r = await db_req("post", "/users/auth", json={"username": user.username, "password": user.password})
104
- if not r or r.status_code != 200:
105
- raise HTTPException(401, "Invalid credentials")
106
  return r.json()
107
 
108
-
109
  @app.get("/api/balance")
110
  async def balance(api_key: str):
111
- r = await db_req("post", "/users/by_api_key", json={"api_key": api_key})
112
- if not r or r.status_code != 200:
113
- raise HTTPException(401, "Invalid API key")
114
- u = r.json()
115
- return {"username": u["username"], "balance": u["balance"]}
116
-
117
 
118
  @app.get("/api/capabilities")
119
- async def capabilities():
120
- r = await db_req("get", "/nodes/capabilities")
121
- return r.json() if r and r.status_code == 200 else {}
122
-
123
 
124
  @app.get("/api/pricing")
125
- async def pricing():
126
- return UNIT_PRICING
127
-
128
 
129
  @app.get("/api/available_libs")
130
- async def available_libs():
131
- r = await db_req("get", "/libs/available")
132
- return r.json() if r and r.status_code == 200 else {}
133
 
134
 
135
  # ─── Cost ───────────────────────────────────────────────────────────────────
136
- def calc_cost(config: dict) -> float:
137
- cpus = config.get("cpus", 1)
138
- ram = config.get("ram_gb", 4)
139
- gpus = config.get("gpus", 0)
140
- gt = config.get("gpu_type", "generic")
141
- gk = f"gpu_{gt.lower()}" if f"gpu_{gt.lower()}" in UNIT_PRICING else "gpu_generic"
142
- return round(cpus * UNIT_PRICING["cpu"] + ram * UNIT_PRICING["ram"] +
143
- (gpus * UNIT_PRICING.get(gk, 5) if gpus else 0), 4)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
144
 
145
 
146
  # ─── Fragmentation ─────────────────────────────────────────────────────────
147
- def fragment_code(code: str, config: dict, task_id: str, required_libs: list) -> list:
148
  frags = []
149
- cpus = max(1, config.get("cpus", 1))
150
- needs_gpu = config.get("gpus", 0) > 0
151
- ram = config.get("ram_gb", 4)
152
  target = max(2, min(cpus * 2, 8))
153
 
154
- # Find splittable range loop
155
  loop = None
156
  try:
157
  tree = ast.parse(code)
@@ -161,412 +207,303 @@ def fragment_code(code: str, config: dict, task_id: str, required_libs: list) ->
161
  if isinstance(fn, ast.Name) and fn.id == "range":
162
  args = node.iter.args
163
  try:
164
- if len(args) == 1:
165
- loop = {"var": node.target.id if isinstance(node.target, ast.Name) else "i",
166
- "start": 0, "end": ast.literal_eval(args[0])}
167
- elif len(args) >= 2:
168
- loop = {"var": node.target.id if isinstance(node.target, ast.Name) else "i",
169
- "start": ast.literal_eval(args[0]), "end": ast.literal_eval(args[1])}
170
  if loop:
 
171
  break
172
- except:
173
- loop = None
174
- except:
175
- pass
176
 
177
- if loop and (loop["end"] - loop["start"]) >= 10:
178
- total = loop["end"] - loop["start"]
179
  n = min(target, total)
180
  chunk = math.ceil(total / n)
181
-
182
- # Build range patterns to search-replace
183
- if loop["start"] == 0:
184
- patterns = [f"range({loop['end']})", f"range(0, {loop['end']})",
185
- f"range(0,{loop['end']})"]
186
- else:
187
- patterns = [f"range({loop['start']}, {loop['end']})",
188
- f"range({loop['start']},{loop['end']})"]
189
 
190
  for idx in range(n):
191
- cs = loop["start"] + idx * chunk
192
- ce = min(loop["start"] + (idx + 1) * chunk, loop["end"])
193
- if cs >= loop["end"]:
194
- break
195
-
196
  mod = code
197
- replacement = f"range({cs}, {ce})"
198
- replaced = False
199
  for pat in patterns:
200
  if pat in mod:
201
- mod = mod.replace(pat, replacement, 1)
202
- replaced = True
203
  break
204
 
205
- if not replaced:
206
- mod = f"# SACCP chunk: {cs}-{ce}\n{code}"
207
-
 
 
 
208
  frags.append({
209
- "fragment_id": f"{task_id}_frag_{idx}",
210
- "task_id": task_id,
211
- "fragment_index": idx,
212
- "fragment_type": "compute",
213
- "code": mod,
214
- "input_data": json.dumps({"chunk": [cs, ce]}),
215
- "required_libs": required_libs,
216
- "required_gpu": needs_gpu,
217
- "min_ram_gb": max(1, ram // n),
218
- "timeout_seconds": 600,
219
  })
220
 
221
  if not frags:
 
 
 
 
222
  frags.append({
223
- "fragment_id": f"{task_id}_frag_0",
224
- "task_id": task_id,
225
- "fragment_index": 0,
226
- "fragment_type": "compute",
227
- "code": code,
228
- "input_data": json.dumps({"type": "full"}),
229
- "required_libs": required_libs,
230
- "required_gpu": needs_gpu,
231
- "min_ram_gb": ram,
232
- "timeout_seconds": 600,
233
  })
234
 
235
- print(f"[HEAD] Fragmented task {task_id[:8]} into {len(frags)} pieces")
236
  return frags
237
 
238
 
239
- # ─── Task Submission ─────────────────────────────────────────────────────────
240
  @app.post("/api/submit_task")
241
- async def submit_task(task: TaskSubmit):
242
- ur = await db_req("post", "/users/by_api_key", json={"api_key": task.api_key})
243
- if not ur or ur.status_code != 200:
244
- raise HTTPException(401, "Invalid API key")
245
  user = ur.json()
246
-
247
- config = task.config
248
- hourly = calc_cost(config)
249
- if user["balance"] < hourly * 0.001:
250
- raise HTTPException(402, "Insufficient balance")
251
 
252
  task_id = str(uuid.uuid4())
253
-
254
- tr = await db_req("post", "/tasks", json={
255
  "task_id": task_id, "owner": user["username"], "code": task.code,
256
- "task_type": task.task_type, "config": config, "required_libs": task.required_libs,
257
  })
258
- if not tr or tr.status_code != 201:
259
- raise HTTPException(500, "Failed to create task")
260
-
261
- fragments = fragment_code(task.code, config, task_id, task.required_libs)
262
 
263
- fr = await db_req("post", "/fragments/batch", json={"fragments": fragments})
264
- if not fr or fr.status_code != 201:
265
- raise HTTPException(500, "Failed to create fragments")
266
-
267
- await db_req("patch", f"/tasks/{task_id}", json={
268
- "status": "running", "total_fragments": len(fragments)
269
- })
270
 
271
- print(f"[HEAD] βœ… Task {task_id[:8]} submitted: {len(fragments)} fragments, {hourly}/hr")
272
- return {
273
- "task_id": task_id, "status": "running",
274
- "total_fragments": len(fragments), "hourly_cost": hourly,
275
- }
276
 
277
 
278
- # ─── THE KEY FIX: Assemble directly when results come in ────────────────────
279
- async def try_complete_task(task_id: str):
280
- """Check if all fragments are done. If so, assemble and mark complete."""
281
-
282
- fr = await db_req("get", f"/fragments/by_task/{task_id}")
283
- if not fr or fr.status_code != 200:
284
- return
285
-
286
  fragments = fr.json()
287
- if not fragments:
288
- return
289
-
290
- total = len(fragments)
291
  completed = [f for f in fragments if f["status"] == "completed"]
292
  failed = [f for f in fragments if f["status"] == "failed"]
293
- done_count = len(completed) + len(failed)
294
-
295
- # Update progress
296
- await db_req("patch", f"/tasks/{task_id}", json={
297
- "completed_fragments": len(completed),
298
- "failed_fragments": len(failed),
299
- })
300
-
301
- print(f"[HEAD] Task {task_id[:8]} progress: {done_count}/{total}")
302
-
303
- # Not done yet
304
- if done_count < total:
305
- return
306
-
307
- # ═══ ALL FRAGMENTS DONE β€” ASSEMBLE! ═══
308
- print(f"[HEAD] Task {task_id[:8]} ALL DONE β€” assembling results!")
309
-
310
- fragments.sort(key=lambda x: x["fragment_index"])
311
-
312
- results = []
313
- stdouts = []
314
- errors = []
315
-
316
- for f in fragments:
317
- idx = f["fragment_index"]
318
- if f["status"] == "completed":
319
- if f.get("result") is not None:
320
- results.append(f["result"])
321
- if f.get("stdout"):
322
- stdouts.append(f"[Fragment {idx}]\n{f['stdout']}")
323
- elif f["status"] == "failed":
324
- errors.append(f"[Fragment {idx}] {f.get('error', 'Unknown error')}")
325
-
326
  final = {
327
- "fragments_total": total,
 
 
328
  "fragments_ok": len(completed),
329
  "fragments_failed": len(failed),
330
- "results": results,
331
- "stdout": "\n\n".join(stdouts),
332
  "errors": errors,
 
333
  }
334
-
335
- status = "failed" if len(failed) == total else "completed"
336
-
337
- # Calculate cost
338
- task_resp = await db_req("get", f"/tasks/{task_id}")
339
- elapsed_hours = 0.01
340
- config = {}
341
- owner = "unknown"
342
-
343
- if task_resp and task_resp.status_code == 200:
344
- task = task_resp.json()
345
- owner = task.get("owner", "unknown")
346
- config = task.get("config", {})
347
  try:
348
- created_str = task["created_at"].split("+")[0].split("Z")[0]
349
- created = datetime.fromisoformat(created_str)
350
- elapsed_hours = max((datetime.utcnow() - created).total_seconds() / 3600, 0.001)
351
- except:
352
- elapsed_hours = 0.01
353
-
354
- cost = round(calc_cost(config) * elapsed_hours, 6)
355
-
356
- # Charge user
357
  if owner != "unknown":
358
- await db_req("patch", f"/users/{owner}/balance", json={
359
- "amount": -cost, "reason": f"task_{task_id}"
360
- })
361
-
362
- # Mark task complete
363
- await db_req("patch", f"/tasks/{task_id}", json={
364
- "status": status,
365
- "result": json.dumps(final),
366
- "cost": cost,
367
  "completed_at": datetime.utcnow().isoformat(),
368
  "error": "\n".join(errors) if errors else None,
369
  })
370
-
371
- print(f"[HEAD] πŸŽ‰ Task {task_id[:8]} β†’ {status} | Cost: {cost} SACCP | "
372
- f"{len(results)} results | {len(errors)} errors")
373
 
374
 
375
- # ─── Submit Result (NO background task dependency!) ──────────────────────────
376
  @app.post("/api/submit_result")
377
  async def submit_result(request: Request):
378
- """Worker submits fragment result. Uses raw request to avoid Pydantic issues."""
379
-
380
- try:
381
- data = await request.json()
382
- except Exception as e:
383
- print(f"[HEAD] submit_result: bad JSON: {e}")
384
- raise HTTPException(400, "Invalid JSON")
385
-
386
- fragment_id = data.get("fragment_id", "")
387
- node_id = data.get("node_id", "")
388
- status = data.get("status", "failed")
389
- result_val = data.get("result")
390
- error_val = data.get("error")
391
- stdout_val = data.get("stdout", "")
392
- resource_usage = data.get("resource_usage", {})
393
-
394
- if not fragment_id:
395
- raise HTTPException(400, "fragment_id required")
396
-
397
- print(f"[HEAD] πŸ“₯ Result received: {fragment_id[:24]}... β†’ {status}")
398
-
399
- # Update fragment in database
400
- update_resp = await db_req("patch", f"/fragments/{fragment_id}", json={
401
- "status": status,
402
- "result": result_val,
403
- "error": error_val if error_val else None,
404
- "stdout": stdout_val if stdout_val else "",
405
- "completed_at": datetime.utcnow().isoformat(),
406
- "resource_usage": resource_usage,
407
  })
408
-
409
- if not update_resp or update_resp.status_code != 200:
410
- print(f"[HEAD] ⚠️ Failed to update fragment {fragment_id[:16]} in DB!")
411
- print(f"[HEAD] DB response: {update_resp.text if update_resp else 'None'}")
412
- else:
413
- print(f"[HEAD] βœ… Fragment {fragment_id[:16]} updated in DB")
414
-
415
- # Pay node
416
- if status == "completed" and node_id:
417
- ninfo = active_nodes.get(node_id, {})
418
- ntype = ninfo.get("node_type", "CPU")
419
- pay = NODE_PAY_RATES.get(ntype, 0) / 360
420
- if pay > 0:
421
- await db_req("post", f"/nodes/{node_id}/pay", json={"amount": round(pay, 6)})
422
-
423
- # ═══ KEY: Get task_id and check if task is complete ═══
424
- frag_resp = await db_req("get", f"/fragments/{fragment_id}")
425
- if frag_resp and frag_resp.status_code == 200:
426
- frag_data = frag_resp.json()
427
- task_id = frag_data.get("task_id", "")
428
- if task_id:
429
- print(f"[HEAD] Checking if task {task_id[:8]} is complete...")
430
- await try_complete_task(task_id)
431
- else:
432
- print(f"[HEAD] ⚠️ Could not fetch fragment {fragment_id[:16]} to get task_id")
433
-
434
  return {"ok": True}
435
 
436
 
437
- # ─── Node Management ────────────────────────────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
438
  @app.post("/api/register_node")
439
- async def register_node(node: NodeRegister):
440
  nid = f"node-{uuid.uuid4().hex[:12]}"
441
- await db_req("post", "/nodes", json={
442
  "node_id": nid, "node_type": node.node_type.upper(),
443
  "node_url": node.node_url, "owner": node.owner,
444
  "specs": node.specs, "installed_libs": node.installed_libs,
445
  })
446
  active_nodes[nid] = {"node_type": node.node_type.upper(), "last_hb": time.time()}
447
- print(f"[HEAD] Node registered: {nid} ({node.node_type})")
448
  return {"node_id": nid, "pay_rate": f"{NODE_PAY_RATES.get(node.node_type.upper(), 0)} SACCP/hr"}
449
 
450
-
451
  @app.post("/api/node_heartbeat")
452
- async def heartbeat(data: dict):
453
  nid = data.get("node_id", "")
454
- if nid in active_nodes:
455
- active_nodes[nid]["last_hb"] = time.time()
456
- await db_req("post", f"/nodes/{nid}/heartbeat", json=data)
457
  return {"ok": True}
458
 
459
-
460
  @app.get("/api/get_work")
461
- async def get_work(node_id: str, node_type: str = "CPU", has_gpu: bool = False,
462
- ram_gb: float = 4, libs: str = ""):
463
- r = await db_req("get", "/fragments/pending?limit=10")
464
- if not r or r.status_code != 200:
465
- return {"work": None}
466
-
467
- pending = r.json()
468
- if not pending:
469
- return {"work": None}
470
-
471
- for frag in pending:
472
- if frag.get("required_gpu") and not has_gpu:
473
- continue
474
- if frag.get("min_ram_gb", 0) > ram_gb > 0:
475
- continue
476
-
477
- frag_id = frag["fragment_id"]
478
- await db_req("patch", f"/fragments/{frag_id}", json={
479
- "status": "running",
480
- "assigned_node": node_id,
481
- "started_at": datetime.utcnow().isoformat()
482
- })
483
-
484
- print(f"[HEAD] πŸ“€ Assigned {frag_id[:24]}... β†’ {node_id}")
485
- return {"work": frag}
486
-
487
  return {"work": None}
488
 
489
-
490
- # ─── Queries ────────────────────────────────────────────────────────────────
491
  @app.get("/api/task/{task_id}")
492
  async def get_task(task_id: str):
493
- r = await db_req("get", f"/tasks/{task_id}")
494
- if not r or r.status_code != 200:
495
- raise HTTPException(404, "Not found")
496
  return r.json()
497
 
498
-
499
  @app.get("/api/task/{task_id}/fragments")
500
- async def task_fragments(task_id: str):
501
- r = await db_req("get", f"/fragments/by_task/{task_id}")
502
  return r.json() if r and r.status_code == 200 else []
503
 
504
-
505
  @app.get("/api/my_tasks")
506
  async def my_tasks(api_key: str):
507
- ur = await db_req("post", "/users/by_api_key", json={"api_key": api_key})
508
- if not ur or ur.status_code != 200:
509
- raise HTTPException(401, "Invalid API key")
510
- r = await db_req("get", f"/tasks?owner={ur.json()['username']}")
511
  return r.json() if r and r.status_code == 200 else []
512
 
513
-
514
  @app.get("/api/stats")
515
  async def stats():
516
- r = await db_req("get", "/stats")
517
- return r.json() if r and r.status_code == 200 else {}
518
-
519
 
520
  @app.get("/health")
521
  async def health():
522
- return {"status": "healthy", "head_id": HEAD_ID, "version": "2.0.3",
523
- "active_nodes": len(active_nodes)}
524
 
525
-
526
- # ─── Safety Net: catch stuck tasks ──────────────────────────────────────────
527
  @app.on_event("startup")
528
  async def startup():
529
- print(f"[HEAD] SharePUTER v2.0.3 starting β€” {HEAD_ID}")
530
- asyncio.create_task(stuck_task_checker())
531
- asyncio.create_task(stale_node_cleaner())
532
-
533
 
534
- async def stuck_task_checker():
535
- """Every 15 seconds, check for tasks where all fragments done but task still running."""
536
  while True:
537
  await asyncio.sleep(15)
538
  try:
539
- r = await db_req("get", "/tasks")
540
- if not r or r.status_code != 200:
541
- continue
542
- for task in r.json():
543
- if task["status"] not in ("running", "pending"):
544
- continue
545
- tid = task["task_id"]
546
- fr = await db_req("get", f"/fragments/by_task/{tid}")
547
- if not fr or fr.status_code != 200:
548
- continue
549
- frags = fr.json()
550
- if not frags:
551
- continue
552
- done = len([f for f in frags if f["status"] in ("completed", "failed")])
553
- if done >= len(frags):
554
- print(f"[HEAD] πŸ”§ Stuck task found: {tid[:8]}, fixing...")
555
- await try_complete_task(tid)
556
- except Exception as e:
557
- print(f"[HEAD] Stuck checker error: {e}")
558
-
559
-
560
- async def stale_node_cleaner():
561
  while True:
562
  await asyncio.sleep(60)
563
- cutoff = time.time() - 120
564
- dead = [n for n, d in list(active_nodes.items()) if d.get("last_hb", 0) < cutoff]
565
  for n in dead:
566
  active_nodes.pop(n, None)
567
- await db_req("post", f"/nodes/{n}/heartbeat", json={"status": "offline"})
568
-
569
 
570
  if __name__ == "__main__":
571
- import uvicorn
572
- uvicorn.run(app, host="0.0.0.0", port=7860)
 
1
  """
2
+ SharePUTERβ„’ v2.1 Head Node
3
+ Result fusion, live streaming, IPU/TPU, fixed stats display
4
  """
5
 
6
+ from fastapi import FastAPI, HTTPException, Request
7
  from fastapi.responses import HTMLResponse, JSONResponse
8
  from fastapi.middleware.cors import CORSMiddleware
9
  from pydantic import BaseModel
10
  from typing import Optional, Dict, Any
11
  import httpx, asyncio, uuid, os, json, time, ast, math
12
+ from datetime import datetime
13
 
14
+ app = FastAPI(title="SharePUTERβ„’ v2.1")
15
  app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
16
 
17
+ DB_URL = os.environ.get("SACCP_DB_URL", "http://localhost:5000")
18
  DB_SECRET = os.environ.get("SACCP_DB_SECRET", "saccp-v2-master-key")
19
  HEAD_ID = os.environ.get("HEAD_NODE_ID", f"head-{uuid.uuid4().hex[:8]}")
20
 
21
  UNIT_PRICING = {
22
  "cpu": 0.5, "ram": 0.1,
23
  "gpu_t4": 4.0, "gpu_a10": 10.0, "gpu_a100": 25.0, "gpu_generic": 5.0,
24
+ "tpu_v2": 15.0, "tpu_v4": 40.0,
25
+ "ipu_pod4": 20.0, "ipu_pod16": 60.0,
26
  }
27
+ NODE_PAY_RATES = {"RAM": 0.3, "CPU": 1.5, "GPU": 5.0, "TPU": 8.0, "IPU": 7.0}
28
  active_nodes: Dict[str, dict] = {}
29
 
30
+ def db_h(): return {"X-SACCP-Secret": DB_SECRET, "Content-Type": "application/json"}
31
 
32
+ async def db(method, path, **kw):
 
 
 
 
33
  try:
34
+ async with httpx.AsyncClient(timeout=30) as c:
35
+ return await getattr(c, method)(f"{DB_URL}{path}", headers=db_h(), **kw)
 
36
  except Exception as e:
37
+ print(f"[HEAD] DB err {method} {path}: {e}")
38
  return None
39
 
40
 
 
41
  class UserCreate(BaseModel):
42
  username: str
43
  password: str
44
 
 
45
  class TaskSubmit(BaseModel):
46
  api_key: str
47
  code: str
48
  task_type: str = "general"
49
  config: dict = {}
50
  required_libs: list = []
 
 
51
 
52
  class NodeRegister(BaseModel):
53
  node_type: str
 
55
  owner: str = "anonymous"
56
  specs: dict = {}
57
  installed_libs: list = []
 
58
 
59
 
60
+ # ─── Homepage (JS-loaded stats so page always renders) ─────────────────────
61
  @app.get("/", response_class=HTMLResponse)
62
  async def homepage():
 
 
 
 
 
63
  return HTMLResponse(f"""<!DOCTYPE html>
64
+ <html><head><meta charset="UTF-8"><title>SharePUTERβ„’ v2.1</title>
65
+ <style>
66
+ body{{font-family:system-ui;background:#08080d;color:#ddd;margin:0;padding:40px;max-width:1000px;margin:0 auto}}
67
+ h1{{font-size:2.5rem;background:linear-gradient(135deg,#00ff88,#00aaff);-webkit-background-clip:text;-webkit-text-fill-color:transparent}}
68
+ .g{{display:grid;grid-template-columns:repeat(auto-fill,minmax(140px,1fr));gap:12px;margin:24px 0}}
69
+ .c{{background:#111;border:1px solid #222;border-radius:12px;padding:20px;text-align:center}}
70
+ .c .n{{font-size:1.8rem;font-weight:700;color:#00ff88;font-family:monospace}}
71
+ .c .l{{color:#888;font-size:.75rem;text-transform:uppercase;letter-spacing:1px;margin-top:4px}}
72
+ .pulse{{display:inline-block;width:8px;height:8px;border-radius:50%;background:#00ff88;
73
+ animation:p 2s infinite;margin-right:6px}}
74
+ @keyframes p{{0%,100%{{box-shadow:0 0 0 0 rgba(0,255,136,.4)}}50%{{box-shadow:0 0 0 6px rgba(0,255,136,0)}}}}
75
+ </style></head><body>
76
+ <h1>SharePUTERβ„’ v2.1</h1>
77
+ <p><span class="pulse"></span>Head Node <code>{HEAD_ID}</code> β€” Online</p>
78
+ <div class="g" id="stats"></div>
79
+ <script>
80
+ async function load(){{try{{const r=await fetch('/api/stats');const s=await r.json();
81
+ const items=[
82
+ ['online_nodes','Nodes Online','#00ff88'],['cpu_nodes_online','CPU','#00aaff'],
83
+ ['gpu_nodes_online','GPU','#aa66ff'],['tpu_nodes_online','TPU','#ff8844'],
84
+ ['ipu_nodes_online','IPU','#00eeff'],['total_users','Users','#ffcc00'],
85
+ ['active_tasks','Active Tasks','#00aaff'],['completed_tasks','Completed','#00ff88'],
86
+ ['pending_fragments','Pending Frags','#ffcc00'],['completed_fragments','Done Frags','#00ff88'],
87
+ ];
88
+ document.getElementById('stats').innerHTML=items.map(([k,l,c])=>
89
+ `<div class="c"><div class="n" style="color:${{c}}">${{s[k]||0}}</div><div class="l">${{l}}</div></div>`).join('')
90
+ }}catch(e){{document.getElementById('stats').innerHTML='<p style="color:#f44">Stats unavailable</p>'}}}}
91
+ load();setInterval(load,5000);
92
+ </script>
93
+ </body></html>""")
94
 
95
 
96
  # ─── Auth ───────────────────────────────────────────────────────────────────
97
  @app.post("/api/register")
98
+ async def register(u: UserCreate):
99
+ r = await db("post", "/users", json={"username": u.username, "password": u.password})
100
+ if not r or r.status_code != 201: raise HTTPException(r.status_code if r else 500, r.json().get("error") if r else "DB error")
 
101
  return r.json()
102
 
 
103
  @app.post("/api/login")
104
+ async def login(u: UserCreate):
105
+ r = await db("post", "/users/auth", json={"username": u.username, "password": u.password})
106
+ if not r or r.status_code != 200: raise HTTPException(401, "Invalid credentials")
 
107
  return r.json()
108
 
 
109
  @app.get("/api/balance")
110
  async def balance(api_key: str):
111
+ r = await db("post", "/users/by_api_key", json={"api_key": api_key})
112
+ if not r or r.status_code != 200: raise HTTPException(401, "Invalid API key")
113
+ u = r.json(); return {"username": u["username"], "balance": u["balance"]}
 
 
 
114
 
115
  @app.get("/api/capabilities")
116
+ async def caps():
117
+ r = await db("get", "/nodes/capabilities"); return r.json() if r and r.status_code == 200 else {}
 
 
118
 
119
  @app.get("/api/pricing")
120
+ async def pricing(): return UNIT_PRICING
 
 
121
 
122
  @app.get("/api/available_libs")
123
+ async def avlibs():
124
+ r = await db("get", "/libs/available"); return r.json() if r and r.status_code == 200 else {}
 
125
 
126
 
127
  # ─── Cost ───────────────────────────────────────────────────────────────────
128
+ def calc_cost(cfg):
129
+ c = cfg.get("cpus", 1) * UNIT_PRICING["cpu"] + cfg.get("ram_gb", 4) * UNIT_PRICING["ram"]
130
+ gpus = cfg.get("gpus", 0)
131
+ gt = cfg.get("gpu_type", "generic")
132
+ if gpus: c += gpus * UNIT_PRICING.get(f"gpu_{gt}", 5)
133
+ tpus = cfg.get("tpus", 0)
134
+ tt = cfg.get("tpu_type", "v2")
135
+ if tpus: c += tpus * UNIT_PRICING.get(f"tpu_{tt}", 15)
136
+ ipus = cfg.get("ipus", 0)
137
+ it = cfg.get("ipu_type", "pod4")
138
+ if ipus: c += ipus * UNIT_PRICING.get(f"ipu_{it}", 20)
139
+ return round(c, 4)
140
+
141
+
142
+ # ─── Result Fusion Engine ──────────────────────────────────────────────────
143
+ def fuse_results(results):
144
+ """Merge fragment results into one unified result."""
145
+ if not results: return None
146
+ if len(results) == 1: return results[0]
147
+
148
+ # All dicts β†’ merge keys (sum numbers, concat lists)
149
+ if all(isinstance(r, dict) for r in results):
150
+ merged = {}
151
+ all_keys = set()
152
+ for r in results: all_keys.update(r.keys())
153
+ for key in all_keys:
154
+ vals = [r[key] for r in results if key in r]
155
+ if not vals: continue
156
+ if all(isinstance(v, (int, float)) for v in vals):
157
+ merged[key] = sum(vals)
158
+ elif all(isinstance(v, list) for v in vals):
159
+ merged[key] = [item for v in vals for item in v]
160
+ elif all(isinstance(v, str) for v in vals):
161
+ merged[key] = "\n".join(vals)
162
+ else:
163
+ merged[key] = vals[-1] # Take last value
164
+ return merged
165
+
166
+ # All lists β†’ concatenate
167
+ if all(isinstance(r, list) for r in results):
168
+ return [item for r in results for item in r]
169
+
170
+ # All numbers β†’ sum
171
+ if all(isinstance(r, (int, float)) for r in results):
172
+ return sum(results)
173
+
174
+ # All strings β†’ merge (remove redundant lines)
175
+ if all(isinstance(r, str) for r in results):
176
+ return "\n".join(results)
177
+
178
+ return results
179
+
180
+
181
+ def fuse_stdout(fragments):
182
+ """Merge fragment stdout into one continuous stream."""
183
+ merged = []
184
+ for f in sorted(fragments, key=lambda x: x["fragment_index"]):
185
+ stdout = f.get("stdout", "")
186
+ if stdout:
187
+ for line in stdout.split('\n'):
188
+ if line.strip():
189
+ merged.append(line)
190
+ return "\n".join(merged)
191
 
192
 
193
  # ─── Fragmentation ─────────────────────────────────────────────────────────
194
+ def fragment_code(code, cfg, task_id, libs):
195
  frags = []
196
+ cpus = max(1, cfg.get("cpus", 1))
197
+ needs_gpu = cfg.get("gpus", 0) > 0
198
+ ram = cfg.get("ram_gb", 4)
199
  target = max(2, min(cpus * 2, 8))
200
 
 
201
  loop = None
202
  try:
203
  tree = ast.parse(code)
 
207
  if isinstance(fn, ast.Name) and fn.id == "range":
208
  args = node.iter.args
209
  try:
210
+ if len(args) == 1: loop = {"s": 0, "e": ast.literal_eval(args[0])}
211
+ elif len(args) >= 2: loop = {"s": ast.literal_eval(args[0]), "e": ast.literal_eval(args[1])}
 
 
 
 
212
  if loop:
213
+ loop["var"] = node.target.id if isinstance(node.target, ast.Name) else "i"
214
  break
215
+ except: loop = None
216
+ except: pass
 
 
217
 
218
+ if loop and (loop["e"] - loop["s"]) >= 20:
219
+ total = loop["e"] - loop["s"]
220
  n = min(target, total)
221
  chunk = math.ceil(total / n)
222
+ patterns = ([f"range({loop['e']})"] if loop["s"] == 0 else []) + [
223
+ f"range({loop['s']}, {loop['e']})", f"range({loop['s']},{loop['e']})"]
224
+ if loop["s"] == 0: patterns.append(f"range(0, {loop['e']})")
 
 
 
 
 
225
 
226
  for idx in range(n):
227
+ cs = loop["s"] + idx * chunk
228
+ ce = min(loop["s"] + (idx + 1) * chunk, loop["e"])
229
+ if cs >= loop["e"]: break
 
 
230
  mod = code
 
 
231
  for pat in patterns:
232
  if pat in mod:
233
+ mod = mod.replace(pat, f"range({cs}, {ce})", 1)
 
234
  break
235
 
236
+ # Inject SACCP metadata
237
+ header = f"""# ═══ SACCP Fragment {idx}/{n} ═══
238
+ __saccp_rank__ = {idx}
239
+ __saccp_world_size__ = {n}
240
+ __saccp_chunk__ = ({cs}, {ce})
241
+ """
242
  frags.append({
243
+ "fragment_id": f"{task_id}_frag_{idx}", "task_id": task_id,
244
+ "fragment_index": idx, "fragment_type": "compute",
245
+ "code": header + mod,
246
+ "input_data": json.dumps({"rank": idx, "world_size": n, "chunk": [cs, ce]}),
247
+ "required_libs": libs, "required_gpu": needs_gpu,
248
+ "min_ram_gb": max(1, ram // n), "timeout_seconds": 600,
 
 
 
 
249
  })
250
 
251
  if not frags:
252
+ header = """# ═══ SACCP Single Fragment ═══
253
+ __saccp_rank__ = 0
254
+ __saccp_world_size__ = 1
255
+ """
256
  frags.append({
257
+ "fragment_id": f"{task_id}_frag_0", "task_id": task_id,
258
+ "fragment_index": 0, "fragment_type": "compute",
259
+ "code": header + code,
260
+ "input_data": json.dumps({"rank": 0, "world_size": 1}),
261
+ "required_libs": libs, "required_gpu": needs_gpu,
262
+ "min_ram_gb": ram, "timeout_seconds": 600,
 
 
 
 
263
  })
264
 
265
+ print(f"[HEAD] Task {task_id[:8]} β†’ {len(frags)} fragments")
266
  return frags
267
 
268
 
269
+ # ─── Task Submit ─────────────────────────────────────────────────────────────
270
  @app.post("/api/submit_task")
271
+ async def submit(task: TaskSubmit):
272
+ ur = await db("post", "/users/by_api_key", json={"api_key": task.api_key})
273
+ if not ur or ur.status_code != 200: raise HTTPException(401, "Invalid API key")
 
274
  user = ur.json()
275
+ cfg = task.config
276
+ hourly = calc_cost(cfg)
277
+ if user["balance"] < hourly * 0.001: raise HTTPException(402, "Insufficient balance")
 
 
278
 
279
  task_id = str(uuid.uuid4())
280
+ tr = await db("post", "/tasks", json={
 
281
  "task_id": task_id, "owner": user["username"], "code": task.code,
282
+ "task_type": task.task_type, "config": cfg, "required_libs": task.required_libs,
283
  })
284
+ if not tr or tr.status_code != 201: raise HTTPException(500, "Failed to create task")
 
 
 
285
 
286
+ fragments = fragment_code(task.code, cfg, task_id, task.required_libs)
287
+ fr = await db("post", "/fragments/batch", json={"fragments": fragments})
288
+ if not fr or fr.status_code != 201: raise HTTPException(500, "Failed to create fragments")
 
 
 
 
289
 
290
+ await db("patch", f"/tasks/{task_id}", json={"status": "running", "total_fragments": len(fragments)})
291
+ print(f"[HEAD] βœ… Task {task_id[:8]} submitted: {len(fragments)} frags, {hourly}/hr")
292
+ return {"task_id": task_id, "status": "running", "total_fragments": len(fragments), "hourly_cost": hourly}
 
 
293
 
294
 
295
+ # ─── Completion Check (called from submit_result) ──────────────────────────
296
+ async def try_complete(task_id):
297
+ fr = await db("get", f"/fragments/by_task/{task_id}")
298
+ if not fr or fr.status_code != 200: return
 
 
 
 
299
  fragments = fr.json()
300
+ if not fragments: return
301
+
 
 
302
  completed = [f for f in fragments if f["status"] == "completed"]
303
  failed = [f for f in fragments if f["status"] == "failed"]
304
+
305
+ await db("patch", f"/tasks/{task_id}", json={
306
+ "completed_fragments": len(completed), "failed_fragments": len(failed)})
307
+
308
+ if len(completed) + len(failed) < len(fragments): return
309
+
310
+ # ═══ ALL DONE β€” FUSE RESULTS ═══
311
+ print(f"[HEAD] Task {task_id[:8]} complete β€” fusing {len(fragments)} fragments")
312
+
313
+ # Collect raw results and parse JSON strings
314
+ raw_results = []
315
+ for f in sorted(completed, key=lambda x: x["fragment_index"]):
316
+ r = f.get("result")
317
+ if r is not None:
318
+ if isinstance(r, str):
319
+ try: r = json.loads(r)
320
+ except: pass
321
+ raw_results.append(r)
322
+
323
+ # Fuse results into one unified output
324
+ fused = fuse_results(raw_results)
325
+ merged_stdout = fuse_stdout(fragments)
326
+
327
+ errors = [f"[Fragment {f['fragment_index']}] {f.get('error', '?')}" for f in failed]
328
+ status = "failed" if len(failed) == len(fragments) else "completed"
329
+
 
 
 
 
 
 
 
330
  final = {
331
+ "fused_result": fused,
332
+ "stdout": merged_stdout,
333
+ "fragments_total": len(fragments),
334
  "fragments_ok": len(completed),
335
  "fragments_failed": len(failed),
 
 
336
  "errors": errors,
337
+ "per_fragment": raw_results,
338
  }
339
+
340
+ # Cost
341
+ tr = await db("get", f"/tasks/{task_id}")
342
+ owner, cfg = "unknown", {}
343
+ elapsed = 0.01
344
+ if tr and tr.status_code == 200:
345
+ t = tr.json(); owner = t.get("owner", "unknown"); cfg = t.get("config", {})
 
 
 
 
 
 
346
  try:
347
+ created = datetime.fromisoformat(t["created_at"].split("+")[0].split("Z")[0])
348
+ elapsed = max((datetime.utcnow() - created).total_seconds() / 3600, 0.001)
349
+ except: pass
350
+
351
+ cost = round(calc_cost(cfg) * elapsed, 6)
 
 
 
 
352
  if owner != "unknown":
353
+ await db("patch", f"/users/{owner}/balance", json={"amount": -cost, "reason": f"task_{task_id}"})
354
+
355
+ await db("patch", f"/tasks/{task_id}", json={
356
+ "status": status, "result": json.dumps(final), "cost": cost,
 
 
 
 
 
357
  "completed_at": datetime.utcnow().isoformat(),
358
  "error": "\n".join(errors) if errors else None,
359
  })
360
+ print(f"[HEAD] πŸŽ‰ Task {task_id[:8]} β†’ {status} | Cost: {cost}")
 
 
361
 
362
 
363
+ # ─── Submit Result ──────────────────────────────────────────────────────────
364
  @app.post("/api/submit_result")
365
  async def submit_result(request: Request):
366
+ data = await request.json()
367
+ fid = data.get("fragment_id", "")
368
+ if not fid: raise HTTPException(400, "fragment_id required")
369
+
370
+ await db("patch", f"/fragments/{fid}", json={
371
+ "status": data.get("status", "failed"),
372
+ "result": data.get("result"), "error": data.get("error") or None,
373
+ "stdout": data.get("stdout") or "", "completed_at": datetime.utcnow().isoformat(),
374
+ "resource_usage": data.get("resource_usage", {}),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
375
  })
376
+ print(f"[HEAD] πŸ“₯ {fid[:20]}... β†’ {data.get('status')}")
377
+
378
+ nid = data.get("node_id", "")
379
+ if data.get("status") == "completed" and nid:
380
+ nt = active_nodes.get(nid, {}).get("node_type", "CPU")
381
+ pay = NODE_PAY_RATES.get(nt, 0) / 360
382
+ if pay > 0: await db("post", f"/nodes/{nid}/pay", json={"amount": round(pay, 6)})
383
+
384
+ # Check completion
385
+ fg = await db("get", f"/fragments/{fid}")
386
+ if fg and fg.status_code == 200:
387
+ tid = fg.json().get("task_id")
388
+ if tid: await try_complete(tid)
389
+
 
 
 
 
 
 
 
 
 
 
 
 
390
  return {"ok": True}
391
 
392
 
393
+ # ─── Live Task Stream ──────────────���───────────────────────────────────────
394
+ @app.get("/api/task/{task_id}/live")
395
+ async def live_task(task_id: str):
396
+ """Return current state with merged stdout for live streaming."""
397
+ tr = await db("get", f"/tasks/{task_id}")
398
+ if not tr or tr.status_code != 200: raise HTTPException(404, "Not found")
399
+ task = tr.json()
400
+
401
+ fr = await db("get", f"/fragments/by_task/{task_id}")
402
+ fragments = fr.json() if fr and fr.status_code == 200 else []
403
+
404
+ merged_stdout = fuse_stdout(fragments)
405
+ completed = len([f for f in fragments if f["status"] == "completed"])
406
+ running = len([f for f in fragments if f["status"] == "running"])
407
+ total = len(fragments)
408
+
409
+ return {
410
+ "task_id": task_id, "status": task["status"],
411
+ "completed": completed, "running": running, "total": total,
412
+ "stdout": merged_stdout, "result": task.get("result"),
413
+ "cost": task.get("cost", 0),
414
+ }
415
+
416
+
417
+ # ─── Node + Task Endpoints ─────────────────────────────────────────────────
418
  @app.post("/api/register_node")
419
+ async def reg_node(node: NodeRegister):
420
  nid = f"node-{uuid.uuid4().hex[:12]}"
421
+ await db("post", "/nodes", json={
422
  "node_id": nid, "node_type": node.node_type.upper(),
423
  "node_url": node.node_url, "owner": node.owner,
424
  "specs": node.specs, "installed_libs": node.installed_libs,
425
  })
426
  active_nodes[nid] = {"node_type": node.node_type.upper(), "last_hb": time.time()}
427
+ print(f"[HEAD] Node {nid} ({node.node_type})")
428
  return {"node_id": nid, "pay_rate": f"{NODE_PAY_RATES.get(node.node_type.upper(), 0)} SACCP/hr"}
429
 
 
430
  @app.post("/api/node_heartbeat")
431
+ async def hb(data: dict):
432
  nid = data.get("node_id", "")
433
+ if nid in active_nodes: active_nodes[nid]["last_hb"] = time.time()
434
+ await db("post", f"/nodes/{nid}/heartbeat", json=data)
 
435
  return {"ok": True}
436
 
 
437
  @app.get("/api/get_work")
438
+ async def get_work(node_id: str, node_type: str = "CPU", has_gpu: bool = False, ram_gb: float = 4, libs: str = ""):
439
+ r = await db("get", "/fragments/pending?limit=10")
440
+ if not r or r.status_code != 200: return {"work": None}
441
+ for f in r.json():
442
+ if f.get("required_gpu") and not has_gpu: continue
443
+ if f.get("min_ram_gb", 0) > ram_gb > 0: continue
444
+ fid = f["fragment_id"]
445
+ await db("patch", f"/fragments/{fid}", json={
446
+ "status": "running", "assigned_node": node_id, "started_at": datetime.utcnow().isoformat()})
447
+ print(f"[HEAD] πŸ“€ {fid[:20]}... β†’ {node_id}")
448
+ return {"work": f}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
449
  return {"work": None}
450
 
 
 
451
  @app.get("/api/task/{task_id}")
452
  async def get_task(task_id: str):
453
+ r = await db("get", f"/tasks/{task_id}")
454
+ if not r or r.status_code != 200: raise HTTPException(404, "Not found")
 
455
  return r.json()
456
 
 
457
  @app.get("/api/task/{task_id}/fragments")
458
+ async def get_frags(task_id: str):
459
+ r = await db("get", f"/fragments/by_task/{task_id}")
460
  return r.json() if r and r.status_code == 200 else []
461
 
 
462
  @app.get("/api/my_tasks")
463
  async def my_tasks(api_key: str):
464
+ ur = await db("post", "/users/by_api_key", json={"api_key": api_key})
465
+ if not ur or ur.status_code != 200: raise HTTPException(401, "Invalid API key")
466
+ r = await db("get", f"/tasks?owner={ur.json()['username']}")
 
467
  return r.json() if r and r.status_code == 200 else []
468
 
 
469
  @app.get("/api/stats")
470
  async def stats():
471
+ r = await db("get", "/stats"); return r.json() if r and r.status_code == 200 else {}
 
 
472
 
473
  @app.get("/health")
474
  async def health():
475
+ return {"status": "healthy", "head_id": HEAD_ID, "version": "2.1", "nodes": len(active_nodes)}
 
476
 
477
+ # ─── Background ─────────────────────────────────────────────────────────────
 
478
  @app.on_event("startup")
479
  async def startup():
480
+ print(f"[HEAD] SharePUTER v2.1 β€” {HEAD_ID}")
481
+ asyncio.create_task(stuck_checker())
482
+ asyncio.create_task(node_cleaner())
 
483
 
484
+ async def stuck_checker():
 
485
  while True:
486
  await asyncio.sleep(15)
487
  try:
488
+ r = await db("get", "/tasks")
489
+ if not r or r.status_code != 200: continue
490
+ for t in r.json():
491
+ if t["status"] not in ("running", "pending"): continue
492
+ fr = await db("get", f"/fragments/by_task/{t['task_id']}")
493
+ if not fr or fr.status_code != 200: continue
494
+ fs = fr.json()
495
+ if fs and len([f for f in fs if f["status"] in ("completed", "failed")]) >= len(fs):
496
+ print(f"[HEAD] πŸ”§ Unsticking {t['task_id'][:8]}")
497
+ await try_complete(t["task_id"])
498
+ except: pass
499
+
500
+ async def node_cleaner():
 
 
 
 
 
 
 
 
 
501
  while True:
502
  await asyncio.sleep(60)
503
+ dead = [n for n, d in list(active_nodes.items()) if d.get("last_hb", 0) < time.time() - 120]
 
504
  for n in dead:
505
  active_nodes.pop(n, None)
506
+ await db("post", f"/nodes/{n}/heartbeat", json={"status": "offline"})
 
507
 
508
  if __name__ == "__main__":
509
+ import uvicorn; uvicorn.run(app, host="0.0.0.0", port=7860)