JatinAutonomousLabs commited on
Commit
aa83392
·
verified ·
1 Parent(s): d2a4a5a

Update app_gradio.py

Browse files
Files changed (1) hide show
  1. app_gradio.py +33 -2
app_gradio.py CHANGED
@@ -405,19 +405,50 @@ def execute_main_task(history, state, budget):
405
  aggregated_state["status_update"] = str(node_output)
406
  continue
407
 
 
 
408
  for key, value in node_output.items():
 
409
  if key == 'execution_path':
410
  current_path = aggregated_state.get(key, [])
411
  if isinstance(current_path, list):
412
- # Ensure we are always extending a list
413
  new_items = value if isinstance(value, list) else [value]
414
  aggregated_state[key] = current_path + new_items
415
  else:
416
- # Recover from corrupted state
417
  aggregated_state[key] = list(current_path) + (value if isinstance(value, list) else [value])
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
418
  else:
419
  aggregated_state[key] = value
420
 
 
421
  # --- Real-time Budget Check ---
422
  current_cost = cost_tracker.get_total_cost()
423
  aggregated_state["current_cost"] = current_cost
 
405
  aggregated_state["status_update"] = str(node_output)
406
  continue
407
 
408
+ # --- Robust State Aggregation (namespace status_update to avoid concurrent writes) ---
409
+ # Prevent multiple agents in the same step from clobbering the same top-level key
410
  for key, value in node_output.items():
411
+ # Normalize execution_path (keep existing logic)
412
  if key == 'execution_path':
413
  current_path = aggregated_state.get(key, [])
414
  if isinstance(current_path, list):
 
415
  new_items = value if isinstance(value, list) else [value]
416
  aggregated_state[key] = current_path + new_items
417
  else:
 
418
  aggregated_state[key] = list(current_path) + (value if isinstance(value, list) else [value])
419
+ continue
420
+
421
+ # Namespace status_update and other commonly-colliding keys from agents
422
+ if key == "status_update":
423
+ # rename to a node-scoped key so LangGraph doesn't see concurrent writes to the same key
424
+ node_status_key = f"status_update__{node_name}"
425
+ # keep the node-scoped status
426
+ aggregated_state[node_status_key] = value
427
+ # also set a global 'status_update' if not already present, prefer explicit global writes
428
+ if "status_update" not in aggregated_state or not aggregated_state.get("status_update"):
429
+ aggregated_state["status_update"] = value
430
+ else:
431
+ # if there is already a global status, prefer the most recent node's status (override)
432
+ aggregated_state["status_update"] = value
433
+ continue
434
+
435
+ # If key already exists and is not a list, convert to list and append (safe merging)
436
+ if key in aggregated_state and key != "execution_path":
437
+ existing = aggregated_state[key]
438
+ # if both are lists, concatenate; otherwise turn into list of values
439
+ if isinstance(existing, list) and isinstance(value, list):
440
+ aggregated_state[key] = existing + value
441
+ elif isinstance(existing, list):
442
+ aggregated_state[key] = existing + [value]
443
+ elif isinstance(value, list):
444
+ aggregated_state[key] = [existing] + value
445
+ else:
446
+ # keep both values in list to avoid overwrite semantics
447
+ aggregated_state[key] = [existing, value]
448
  else:
449
  aggregated_state[key] = value
450
 
451
+
452
  # --- Real-time Budget Check ---
453
  current_cost = cost_tracker.get_total_cost()
454
  aggregated_state["current_cost"] = current_cost