alexmec commited on
Commit
ec124cd
Β·
verified Β·
1 Parent(s): afafd1a

Upload folder using huggingface_hub

Browse files
Files changed (1) hide show
  1. core/dynamic_a2a_manager.py +120 -12
core/dynamic_a2a_manager.py CHANGED
@@ -58,6 +58,7 @@ class DynamicA2AAgentManager:
58
  self.allocated_ports = []
59
  self.custom_prompts = custom_prompts or {} # Store custom prompts
60
  self.request_counts = {} # Track requests per agent for memory management
 
61
 
62
  def _generate_session_id(self) -> str:
63
  """Generate a unique session ID."""
@@ -245,7 +246,8 @@ BE LOUD! BE PROUD! BE UNFORGETTABLE! 🎯"""
245
  agent_args={
246
  "temperature": 0.8,
247
  "response_format": {"type": "json_object"},
248
- "max_tokens": 200, # Limit response size
 
249
  }
250
  )
251
  )
@@ -278,8 +280,11 @@ BE LOUD! BE PROUD! BE UNFORGETTABLE! 🎯"""
278
  self.serve_tasks.append(serve_task)
279
  print(f"βœ… Started {config['team_name']} on port {config['port']} (session: {self.session_id})")
280
 
 
 
 
281
  # Wait for this specific server to be ready
282
- await self._wait_for_server(config['port'], config['team_name'])
283
 
284
  except Exception as e:
285
  print(f"❌ Failed to create/serve {config['team_name']}: {e}")
@@ -353,6 +358,86 @@ BE LOUD! BE PROUD! BE UNFORGETTABLE! 🎯"""
353
 
354
  print(f"βœ… A2A agents stopped and ports released for session {self.session_id}")
355
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
356
  async def check_and_restart_agent(self, team_num: int) -> bool:
357
  """Check if agent is alive and restart if needed."""
358
  if team_num not in self.agent_tools:
@@ -503,24 +588,33 @@ Your picks: {', '.join(recent_picks)}
503
  MUST output JSON: {{"type":"pick","player_name":"[FROM LIST]","reasoning":"[WHY]","trash_talk":"[OPTIONAL]"}}
504
  Pick NOW! πŸ’ͺ"""
505
 
506
- # Retry logic with reduced delays for HF Spaces
507
- max_retries = 2
508
- retry_delay = 0.5
509
 
510
  for attempt in range(max_retries):
511
  try:
512
  # Track request count for memory management
513
  self.request_counts[team_num] = self.request_counts.get(team_num, 0) + 1
514
 
515
- # Reset task_id every 10 requests to prevent memory buildup
516
- if self.request_counts[team_num] > 10:
517
  self.task_ids.pop(team_num, None)
518
  self.request_counts[team_num] = 0
519
  print(f"πŸ”„ Resetting context for Team {team_num} to prevent memory buildup")
520
 
 
 
 
 
 
 
521
  # Use task_id if we have one for this agent
522
  task_id = self.task_ids.get(team_num)
523
- result = await self.agent_tools[team_num](prompt, task_id=task_id)
 
 
 
524
 
525
  # Extract and store task_id
526
  new_task_id = extract_task_id(result)
@@ -550,10 +644,21 @@ Pick NOW! πŸ’ͺ"""
550
 
551
  except Exception as e:
552
  error_name = type(e).__name__
553
- is_timeout = "timeout" in str(e).lower() or "readtimeout" in error_name.lower()
 
 
 
554
 
555
- if attempt < max_retries - 1 and is_timeout:
556
- print(f"⚠️ Timeout for Team {team_num} (attempt {attempt + 1}/{max_retries}), retrying quickly...")
 
 
 
 
 
 
 
 
557
  await asyncio.sleep(retry_delay)
558
  continue
559
  else:
@@ -582,7 +687,10 @@ COMMENT? {{"type":"comment","should_comment":true/false,"comment":"[ROAST THEM!]
582
  try:
583
  # Use task_id for continuity
584
  task_id = self.task_ids.get(commenting_team)
585
- result = await self.agent_tools[commenting_team](prompt, task_id=task_id)
 
 
 
586
 
587
  # Extract and store task_id
588
  task_id = extract_task_id(result)
 
58
  self.allocated_ports = []
59
  self.custom_prompts = custom_prompts or {} # Store custom prompts
60
  self.request_counts = {} # Track requests per agent for memory management
61
+ self._request_semaphore = asyncio.Semaphore(1) # Limit concurrent A2A requests
62
 
63
  def _generate_session_id(self) -> str:
64
  """Generate a unique session ID."""
 
246
  agent_args={
247
  "temperature": 0.8,
248
  "response_format": {"type": "json_object"},
249
+ "max_tokens": 150, # Reduced response size
250
+ "timeout": 60, # Add explicit timeout
251
  }
252
  )
253
  )
 
280
  self.serve_tasks.append(serve_task)
281
  print(f"βœ… Started {config['team_name']} on port {config['port']} (session: {self.session_id})")
282
 
283
+ # Stagger server startups to avoid overwhelming the system
284
+ await asyncio.sleep(1.5)
285
+
286
  # Wait for this specific server to be ready
287
+ await self._wait_for_server(config['port'], config['team_name'], max_attempts=15)
288
 
289
  except Exception as e:
290
  print(f"❌ Failed to create/serve {config['team_name']}: {e}")
 
358
 
359
  print(f"βœ… A2A agents stopped and ports released for session {self.session_id}")
360
 
361
+ async def restart_agent_server(self, team_num: int) -> bool:
362
+ """Force restart a specific agent's server."""
363
+ if team_num not in self.agents:
364
+ return False
365
+
366
+ print(f"πŸ”§ Force restarting Team {team_num} server...")
367
+
368
+ # Get port for this team
369
+ port_idx = self._get_port_index(team_num)
370
+ if port_idx >= len(self.allocated_ports):
371
+ return False
372
+
373
+ port = self.allocated_ports[port_idx]
374
+
375
+ # Cancel the existing serve task
376
+ if port_idx < len(self.serve_tasks):
377
+ task = self.serve_tasks[port_idx]
378
+ if not task.done():
379
+ task.cancel()
380
+ try:
381
+ await task
382
+ except asyncio.CancelledError:
383
+ pass
384
+
385
+ # Remove from agent tools
386
+ self.agent_tools.pop(team_num, None)
387
+
388
+ # Wait for port to be released
389
+ await asyncio.sleep(1.0)
390
+
391
+ # Get the agent
392
+ agent = self.agents.get(team_num)
393
+ if not agent:
394
+ return False
395
+
396
+ try:
397
+ # Restart serving
398
+ host = "0.0.0.0" if os.getenv("SPACE_ID") else "localhost"
399
+ serving_config = A2AServingConfig(
400
+ port=port,
401
+ host=host,
402
+ task_timeout_minutes=30,
403
+ )
404
+
405
+ async def monitored_serve(agent, serving_config):
406
+ """Wrapper to monitor serve task for crashes."""
407
+ try:
408
+ await agent.serve_async(serving_config)
409
+ except Exception as e:
410
+ print(f"❌ Server crashed for Team {team_num}: {type(e).__name__}: {e}")
411
+ self.agent_tools.pop(team_num, None)
412
+
413
+ serve_task = asyncio.create_task(
414
+ monitored_serve(agent, serving_config)
415
+ )
416
+
417
+ # Replace the serve task
418
+ if port_idx < len(self.serve_tasks):
419
+ self.serve_tasks[port_idx] = serve_task
420
+
421
+ # Wait for server to be ready
422
+ await self._wait_for_server(port, f"Team {team_num}", max_attempts=10)
423
+
424
+ # Recreate tool
425
+ tool_url = f"http://127.0.0.1:{port}"
426
+ import httpx
427
+ timeout_config = httpx.Timeout(timeout=90.0, connect=30.0, read=90.0, write=30.0, pool=30.0)
428
+
429
+ self.agent_tools[team_num] = await a2a_tool_async(
430
+ tool_url,
431
+ http_kwargs={"timeout": timeout_config}
432
+ )
433
+
434
+ print(f"βœ… Successfully restarted Team {team_num} server")
435
+ return True
436
+
437
+ except Exception as e:
438
+ print(f"❌ Failed to restart Team {team_num}: {e}")
439
+ return False
440
+
441
  async def check_and_restart_agent(self, team_num: int) -> bool:
442
  """Check if agent is alive and restart if needed."""
443
  if team_num not in self.agent_tools:
 
588
  MUST output JSON: {{"type":"pick","player_name":"[FROM LIST]","reasoning":"[WHY]","trash_talk":"[OPTIONAL]"}}
589
  Pick NOW! πŸ’ͺ"""
590
 
591
+ # Retry logic with exponential backoff for HF Spaces
592
+ max_retries = 3
593
+ base_delay = 1.0
594
 
595
  for attempt in range(max_retries):
596
  try:
597
  # Track request count for memory management
598
  self.request_counts[team_num] = self.request_counts.get(team_num, 0) + 1
599
 
600
+ # Reset task_id every 5 requests to prevent memory buildup
601
+ if self.request_counts[team_num] > 5:
602
  self.task_ids.pop(team_num, None)
603
  self.request_counts[team_num] = 0
604
  print(f"πŸ”„ Resetting context for Team {team_num} to prevent memory buildup")
605
 
606
+ # Check if we should restart the server before making request
607
+ if self.request_counts[team_num] == 1 and round_num > 1:
608
+ # Restart server at the beginning of each round after round 1
609
+ print(f"πŸ”„ Restarting Team {team_num} server for round {round_num}")
610
+ await self.restart_agent_server(team_num)
611
+
612
  # Use task_id if we have one for this agent
613
  task_id = self.task_ids.get(team_num)
614
+
615
+ # Limit concurrent requests to prevent overloading
616
+ async with self._request_semaphore:
617
+ result = await self.agent_tools[team_num](prompt, task_id=task_id)
618
 
619
  # Extract and store task_id
620
  new_task_id = extract_task_id(result)
 
644
 
645
  except Exception as e:
646
  error_name = type(e).__name__
647
+ error_str = str(e)
648
+ is_timeout = "timeout" in error_str.lower() or "readtimeout" in error_name.lower()
649
+ is_503 = "503" in error_str or "service unavailable" in error_str.lower()
650
+ is_network = "A2AClientHTTPError" in error_name or is_503
651
 
652
+ if attempt < max_retries - 1 and (is_timeout or is_network):
653
+ # Exponential backoff for retries
654
+ retry_delay = base_delay * (2 ** attempt)
655
+ print(f"⚠️ {'Network error' if is_network else 'Timeout'} for Team {team_num} (attempt {attempt + 1}/{max_retries}), retrying in {retry_delay}s...")
656
+
657
+ # If it's a 503 error, try to restart the server
658
+ if is_503 and attempt == 0:
659
+ print(f"πŸ”§ Attempting to restart Team {team_num} server due to 503 error")
660
+ await self.restart_agent_server(team_num)
661
+
662
  await asyncio.sleep(retry_delay)
663
  continue
664
  else:
 
687
  try:
688
  # Use task_id for continuity
689
  task_id = self.task_ids.get(commenting_team)
690
+
691
+ # Limit concurrent requests
692
+ async with self._request_semaphore:
693
+ result = await self.agent_tools[commenting_team](prompt, task_id=task_id)
694
 
695
  # Extract and store task_id
696
  task_id = extract_task_id(result)