fantasy-draft-demo / core /dynamic_a2a_manager.py
alexmec's picture
Upload folder using huggingface_hub
ec124cd verified
"""
Dynamic A2A Manager with multi-user support through dynamic port allocation.
"""
import asyncio
import socket
import hashlib
import os
from typing import List, Dict, Optional, Tuple
from pydantic import BaseModel
from any_agent import AgentConfig, AnyAgent
from any_agent.serving import A2AServingConfig
from any_agent.tools import a2a_tool_async
from core.constants import (
AGENT_CONFIGS,
AGENT_START_DELAY,
AGENT_STARTUP_WAIT,
DEFAULT_TIMEOUT,
MAX_COMMENTS_PER_PICK,
RIVAL_PAIRS
)
from core.data import TOP_PLAYERS
from core.a2a_helpers import (
parse_a2a_response,
extract_task_id,
format_available_players
)
# A2A Output model (same as in apps/app.py)
class A2AOutput(BaseModel):
"""Combined output type for A2A agents."""
type: str # "pick" or "comment"
# Pick fields
player_name: Optional[str] = None
reasoning: Optional[str] = None
trash_talk: Optional[str] = None
# Comment fields
should_comment: Optional[bool] = None
comment: Optional[str] = None
class DynamicA2AAgentManager:
"""A2A Agent Manager with dynamic port allocation for multi-user support."""
# Class-level tracking of used ports
_used_ports = set()
_port_lock = asyncio.Lock()
def __init__(self, session_id: str = None, max_comments_per_pick=MAX_COMMENTS_PER_PICK, custom_prompts=None):
self.session_id = session_id or self._generate_session_id()
self.agents = {}
self.agent_tools = {}
self.serve_tasks = []
self.is_running = False
self.task_ids = {}
self.max_comments_per_pick = max_comments_per_pick
self.allocated_ports = []
self.custom_prompts = custom_prompts or {} # Store custom prompts
self.request_counts = {} # Track requests per agent for memory management
self._request_semaphore = asyncio.Semaphore(1) # Limit concurrent A2A requests
def _generate_session_id(self) -> str:
"""Generate a unique session ID."""
import time
import random
return hashlib.md5(f"{time.time()}{random.random()}".encode()).hexdigest()[:8]
def _get_port_index(self, team_num: int) -> int:
"""Get the port index for a team number."""
agent_teams = [1, 2, 3, 5, 6]
return agent_teams.index(team_num)
async def _find_available_ports(self, count: int = 5, start: int = 5000, end: int = 9000) -> List[int]:
"""Find available consecutive ports for agents."""
async with self._port_lock:
# On HF Spaces, try different port ranges
port_ranges = [(8000, 9000), (5000, 6000), (7000, 8000), (9000, 10000)] if os.getenv("SPACE_ID") else [(start, end)]
for range_start, range_end in port_ranges:
# Try to find a consecutive range
for base_port in range(range_start, range_end - count, 10):
ports = list(range(base_port, base_port + count))
# Check if any port in range is already used
if any(p in self._used_ports for p in ports):
continue
# Check if ports are actually available on the system
if await self._check_ports_available(ports):
# Reserve these ports
self._used_ports.update(ports)
self.allocated_ports = ports
print(f"βœ… Found available ports in range {range_start}-{range_end}: {ports}")
return ports
raise RuntimeError(f"Could not find {count} available consecutive ports in any range")
async def _check_ports_available(self, ports: List[int]) -> bool:
"""Check if a list of ports is available on the system."""
for port in ports:
try:
# Try to bind to the port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# On HF Spaces, try different bind addresses
bind_addresses = ['localhost', '127.0.0.1', '0.0.0.0']
bound = False
for addr in bind_addresses:
try:
sock.bind((addr, port))
bound = True
break
except OSError:
continue
sock.close()
if not bound:
print(f"Port {port} not available on any interface")
return False
except Exception as e:
print(f"Error checking port {port}: {e}")
return False
return True
async def _release_ports(self):
"""Release allocated ports when done."""
async with self._port_lock:
self._used_ports.difference_update(self.allocated_ports)
self.allocated_ports = []
async def _wait_for_server(self, port: int, team_name: str, max_attempts: int = 20, poll_interval: float = 0.5) -> None:
"""Wait for a server to be ready, using pattern from any-agent helpers."""
import httpx
host = "127.0.0.1" if os.getenv("SPACE_ID") else "localhost"
server_url = f"http://{host}:{port}"
attempts = 0
async with httpx.AsyncClient() as client:
while attempts < max_attempts:
try:
# Try to make a basic GET request to check if server is responding
await client.get(f"{server_url}/.well-known/agent.json", timeout=1.0)
print(f"βœ… {team_name} server ready on port {port}")
return
except (httpx.RequestError, httpx.TimeoutException):
# Server not ready yet, continue polling
attempts += 1
if attempts < max_attempts:
await asyncio.sleep(poll_interval)
continue
print(f"⚠️ {team_name} server on port {port} slow to start, continuing anyway...")
def _create_dynamic_agent_configs(self, ports: List[int]) -> List[Dict]:
"""Create agent configs with dynamic ports."""
configs = []
# Only use teams 1, 2, 3, 5, 6 (skip team 4 which is the user)
agent_teams = [1, 2, 3, 5, 6]
for i, team_num in enumerate(agent_teams):
# Find the original config for this team
original_config = next(c for c in AGENT_CONFIGS if c['team_num'] == team_num)
# Create a copy with the new port
config = original_config.copy()
config['port'] = ports[i]
config['session_id'] = self.session_id # Add session tracking
configs.append(config)
return configs
async def start_agents(self):
"""Start all A2A agent servers with dynamic ports."""
if self.is_running:
return
print(f"πŸš€ Starting A2A agents for session {self.session_id}...")
try:
# Find available ports
ports = await self._find_available_ports()
print(f"πŸ“ Allocated ports: {ports}")
# Create configs with dynamic ports
agent_configs = self._create_dynamic_agent_configs(ports)
# Create and serve all agents
for config in agent_configs:
try:
# Critical instructions that MUST be included
critical_instructions = """CRITICAL OUTPUT INSTRUCTIONS (DO NOT MODIFY):
For picks: You MUST return a JSON object with type="pick", player_name (from available list), reasoning, and optional trash_talk.
For comments: You MUST return a JSON object with type="comment", should_comment (true/false), and comment.
NEVER respond with plain text when asked to make a pick! Always use the structured format!
Example pick format:
{"type": "pick", "player_name": "CeeDee Lamb", "reasoning": "Elite WR with huge upside!", "trash_talk": "Your RBs will be crying!"}
Example comment format:
{"type": "comment", "should_comment": true, "comment": "Terrible pick! He's overrated!"}
---END CRITICAL INSTRUCTIONS---
"""
# Use custom prompt if provided, otherwise use default
team_num = config['team_num']
if team_num in self.custom_prompts:
# Use custom prompt BUT always prepend critical instructions
personality_prompt = self.custom_prompts[team_num]
instructions = critical_instructions + personality_prompt
else:
# Use default prompt with critical instructions
default_personality = f"""You are {config['team_name']}, a fantasy football manager with {config['strategy']} strategy.
PERSONALITY & STRATEGY:
- Use LOTS of emojis that match your strategy! πŸ”₯
- Be EXTREMELY dramatic and over-the-top!
- Take your philosophy to the EXTREME!
- MOCK other strategies viciously!
- Use CAPS for emphasis!
- Make BOLD predictions!
- Reference previous interactions with SPITE!
- Build INTENSE rivalries!
- Your responses should be ENTERTAINING and MEMORABLE!
Your EXTREME philosophy: {config['philosophy']}
BE LOUD! BE PROUD! BE UNFORGETTABLE! 🎯"""
instructions = critical_instructions + default_personality
# Create agent with memory-efficient settings
agent = await AnyAgent.create_async(
"tinyagent",
AgentConfig(
name=f"team_{config['team_num']}_agent_{self.session_id}",
model_id="gpt-4o-mini",
description=f"{config['team_name']} - {config['strategy']} fantasy football team manager",
instructions=instructions,
output_type=A2AOutput,
# Force JSON output mode and limit context
agent_args={
"temperature": 0.8,
"response_format": {"type": "json_object"},
"max_tokens": 150, # Reduced response size
"timeout": 60, # Add explicit timeout
}
)
)
self.agents[config['team_num']] = agent
# Serve agent on dynamic port
# On HF Spaces, we might need to bind to 0.0.0.0
host = "0.0.0.0" if os.getenv("SPACE_ID") else "localhost"
# Serving config for HF Spaces
serving_config = A2AServingConfig(
port=config['port'],
host=host,
task_timeout_minutes=30,
)
async def monitored_serve(agent, config, serving_config):
"""Wrapper to monitor serve task for crashes."""
try:
await agent.serve_async(serving_config)
except Exception as e:
print(f"❌ Server crashed for {config['team_name']}: {type(e).__name__}: {e}")
# Remove from active tools to prevent timeout attempts
self.agent_tools.pop(config['team_num'], None)
serve_task = asyncio.create_task(
monitored_serve(agent, config, serving_config)
)
self.serve_tasks.append(serve_task)
print(f"βœ… Started {config['team_name']} on port {config['port']} (session: {self.session_id})")
# Stagger server startups to avoid overwhelming the system
await asyncio.sleep(1.5)
# Wait for this specific server to be ready
await self._wait_for_server(config['port'], config['team_name'], max_attempts=15)
except Exception as e:
print(f"❌ Failed to create/serve {config['team_name']}: {e}")
# Additional wait for all servers to stabilize
await asyncio.sleep(AGENT_START_DELAY)
# Create tools for each agent
for config in agent_configs:
team_num = config['team_num']
if team_num not in self.agents:
continue
try:
# On HF Spaces, we might need to use 127.0.0.1 for internal communication
host = "127.0.0.1" if os.getenv("SPACE_ID") else "localhost"
tool_url = f"http://{host}:{config['port']}"
# Use httpx timeout for HF Spaces
import httpx
timeout_config = httpx.Timeout(
timeout=DEFAULT_TIMEOUT,
connect=30.0, # Connection timeout
read=90.0, # Read timeout
write=30.0, # Write timeout
pool=30.0 # Pool timeout
)
# Note: limits and http2 settings can't be passed to a2a_tool_async
# They would need to be set at the client creation level
self.agent_tools[team_num] = await a2a_tool_async(
tool_url,
http_kwargs={"timeout": timeout_config}
)
print(f"βœ… Created A2A tool for Team {team_num} at {tool_url}")
except Exception as e:
print(f"❌ Failed to create tool for Team {team_num}: {e}")
self.is_running = len(self.agent_tools) > 0
if self.is_running:
print(f"βœ… A2A agents ready for session {self.session_id}! ({len(self.agent_tools)} agents active)")
else:
print(f"❌ Failed to start any A2A agents for session {self.session_id}")
except Exception as e:
print(f"❌ Failed to allocate ports or start agents: {e}")
await self._release_ports()
raise
async def stop_agents(self):
"""Stop all A2A agent servers and release ports."""
if not self.is_running:
return
print(f"πŸ›‘ Stopping A2A agents for session {self.session_id}...")
# Cancel all serve tasks
for task in self.serve_tasks:
task.cancel()
await asyncio.gather(*self.serve_tasks, return_exceptions=True)
# Clear agent data
self.agents.clear()
self.agent_tools.clear()
self.serve_tasks.clear()
self.is_running = False
# Release the ports
await self._release_ports()
print(f"βœ… A2A agents stopped and ports released for session {self.session_id}")
async def restart_agent_server(self, team_num: int) -> bool:
"""Force restart a specific agent's server."""
if team_num not in self.agents:
return False
print(f"πŸ”§ Force restarting Team {team_num} server...")
# Get port for this team
port_idx = self._get_port_index(team_num)
if port_idx >= len(self.allocated_ports):
return False
port = self.allocated_ports[port_idx]
# Cancel the existing serve task
if port_idx < len(self.serve_tasks):
task = self.serve_tasks[port_idx]
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Remove from agent tools
self.agent_tools.pop(team_num, None)
# Wait for port to be released
await asyncio.sleep(1.0)
# Get the agent
agent = self.agents.get(team_num)
if not agent:
return False
try:
# Restart serving
host = "0.0.0.0" if os.getenv("SPACE_ID") else "localhost"
serving_config = A2AServingConfig(
port=port,
host=host,
task_timeout_minutes=30,
)
async def monitored_serve(agent, serving_config):
"""Wrapper to monitor serve task for crashes."""
try:
await agent.serve_async(serving_config)
except Exception as e:
print(f"❌ Server crashed for Team {team_num}: {type(e).__name__}: {e}")
self.agent_tools.pop(team_num, None)
serve_task = asyncio.create_task(
monitored_serve(agent, serving_config)
)
# Replace the serve task
if port_idx < len(self.serve_tasks):
self.serve_tasks[port_idx] = serve_task
# Wait for server to be ready
await self._wait_for_server(port, f"Team {team_num}", max_attempts=10)
# Recreate tool
tool_url = f"http://127.0.0.1:{port}"
import httpx
timeout_config = httpx.Timeout(timeout=90.0, connect=30.0, read=90.0, write=30.0, pool=30.0)
self.agent_tools[team_num] = await a2a_tool_async(
tool_url,
http_kwargs={"timeout": timeout_config}
)
print(f"βœ… Successfully restarted Team {team_num} server")
return True
except Exception as e:
print(f"❌ Failed to restart Team {team_num}: {e}")
return False
async def check_and_restart_agent(self, team_num: int) -> bool:
"""Check if agent is alive and restart if needed."""
if team_num not in self.agent_tools:
return False
# Use pattern from any-agent helpers.py for better health checking
import httpx
port_idx = self._get_port_index(team_num)
port = self.allocated_ports[port_idx]
host = "127.0.0.1" if os.getenv("SPACE_ID") else "localhost"
server_url = f"http://{host}:{port}"
# Standard health check for all teams
max_attempts = 5
poll_interval = 0.5
attempts = 0
async with httpx.AsyncClient() as client:
while attempts < max_attempts:
try:
# Simple GET request to check if server is responding
await client.get(f"{server_url}/.well-known/agent.json", timeout=1.0)
return True # Agent is alive
except (httpx.RequestError, httpx.TimeoutException):
# Server not ready yet, continue polling
attempts += 1
if attempts < max_attempts:
await asyncio.sleep(poll_interval)
continue
except Exception:
# Other errors, break out
break
# Agent didn't respond to health check, but check if port is still in use
port_idx = self._get_port_index(team_num)
port = self.allocated_ports[port_idx]
# Check if port is actually free before attempting restart
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.1)
result = sock.connect_ex(('127.0.0.1', port))
sock.close()
if result == 0:
# Port is still in use, server is probably just slow
print(f"⚠️ Team {team_num} server slow to respond but port {port} still in use - skipping restart")
return True # Assume it's alive but slow
print(f"⚠️ Team {team_num} server not responding and port {port} is free, attempting restart...")
# First, cancel the old serve task to ensure cleanup
for i, task in enumerate(self.serve_tasks):
if task.done() or (i == self._get_port_index(team_num)):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception:
pass
break
# Wait a bit for cleanup
await asyncio.sleep(0.5)
# Find the original config
original_config = next(c for c in AGENT_CONFIGS if c['team_num'] == team_num)
config = original_config.copy()
config['port'] = self.allocated_ports[self._get_port_index(team_num)]
try:
# Create new agent with same config
agent = self.agents.get(team_num)
if agent:
# Start serving again
host = "0.0.0.0" if os.getenv("SPACE_ID") else "localhost"
serving_config = A2AServingConfig(
port=config['port'],
host=host,
task_timeout_minutes=30,
)
async def monitored_serve(agent, config, serving_config):
"""Wrapper to monitor serve task for crashes."""
try:
await agent.serve_async(serving_config)
except Exception as e:
print(f"❌ Server crashed for {config['team_name']}: {type(e).__name__}: {e}")
self.agent_tools.pop(config['team_num'], None)
serve_task = asyncio.create_task(
monitored_serve(agent, config, serving_config)
)
# Replace old task
for i, task in enumerate(self.serve_tasks):
if not task.done():
continue
self.serve_tasks[i] = serve_task
break
# Wait for server to be ready using proper polling
await self._wait_for_server(config['port'], config['team_name'], max_attempts=15)
# Recreate tool
tool_url = f"http://127.0.0.1:{config['port']}"
import httpx
timeout_config = httpx.Timeout(timeout=90.0, connect=30.0, read=90.0, write=30.0, pool=30.0)
self.agent_tools[team_num] = await a2a_tool_async(
tool_url,
http_kwargs={"timeout": timeout_config}
)
print(f"βœ… Restarted Team {team_num} on port {config['port']}")
return True
except Exception as e:
print(f"❌ Failed to restart Team {team_num}: {e}")
self.agent_tools.pop(team_num, None)
return False
async def get_pick(self, team_num: int, available_players: List[str],
previous_picks: List[str], round_num: int = 1) -> Optional[A2AOutput]:
"""Get a pick from an A2A agent."""
if team_num not in self.agent_tools:
return None
# Check if agent is alive, restart if needed
agent_alive = await self.check_and_restart_agent(team_num)
if not agent_alive:
print(f"❌ Team {team_num} could not be restarted")
return None
# Build minimal prompts to prevent memory buildup
# Only show top 12 players to reduce context
top_available = available_players[:12]
available_str = ", ".join(top_available)
# Same prompt for all teams
recent_picks = previous_picks[-2:] if previous_picks else []
prompt = f"""Round {round_num} PICK! πŸ”₯
Available: {available_str}
Your picks: {', '.join(recent_picks)}
MUST output JSON: {{"type":"pick","player_name":"[FROM LIST]","reasoning":"[WHY]","trash_talk":"[OPTIONAL]"}}
Pick NOW! πŸ’ͺ"""
# Retry logic with exponential backoff for HF Spaces
max_retries = 3
base_delay = 1.0
for attempt in range(max_retries):
try:
# Track request count for memory management
self.request_counts[team_num] = self.request_counts.get(team_num, 0) + 1
# Reset task_id every 5 requests to prevent memory buildup
if self.request_counts[team_num] > 5:
self.task_ids.pop(team_num, None)
self.request_counts[team_num] = 0
print(f"πŸ”„ Resetting context for Team {team_num} to prevent memory buildup")
# Check if we should restart the server before making request
if self.request_counts[team_num] == 1 and round_num > 1:
# Restart server at the beginning of each round after round 1
print(f"πŸ”„ Restarting Team {team_num} server for round {round_num}")
await self.restart_agent_server(team_num)
# Use task_id if we have one for this agent
task_id = self.task_ids.get(team_num)
# Limit concurrent requests to prevent overloading
async with self._request_semaphore:
result = await self.agent_tools[team_num](prompt, task_id=task_id)
# Extract and store task_id
new_task_id = extract_task_id(result)
if new_task_id:
self.task_ids[team_num] = new_task_id
# Parse the response
output = parse_a2a_response(result, A2AOutput)
if output:
print(f"βœ… Team {team_num} pick: {output.player_name}")
return output
else:
print(f"❌ Failed to parse response from Team {team_num}")
if isinstance(result, str):
print(f" Raw response: {result[:200]}...")
# Try to extract player name from plain text as fallback
for player in available_players[:20]:
if player in str(result):
print(f" Fallback: Found {player} in response, creating pick")
return A2AOutput(
type="pick",
player_name=player,
reasoning="[Agent response was not in correct format]",
trash_talk=None
)
return None
except Exception as e:
error_name = type(e).__name__
error_str = str(e)
is_timeout = "timeout" in error_str.lower() or "readtimeout" in error_name.lower()
is_503 = "503" in error_str or "service unavailable" in error_str.lower()
is_network = "A2AClientHTTPError" in error_name or is_503
if attempt < max_retries - 1 and (is_timeout or is_network):
# Exponential backoff for retries
retry_delay = base_delay * (2 ** attempt)
print(f"⚠️ {'Network error' if is_network else 'Timeout'} for Team {team_num} (attempt {attempt + 1}/{max_retries}), retrying in {retry_delay}s...")
# If it's a 503 error, try to restart the server
if is_503 and attempt == 0:
print(f"πŸ”§ Attempting to restart Team {team_num} server due to 503 error")
await self.restart_agent_server(team_num)
await asyncio.sleep(retry_delay)
continue
else:
print(f"❌ Error getting pick from Team {team_num}: {error_name}")
if attempt == 0: # Only print traceback on first failure
import traceback
traceback.print_exc()
return None
return None
async def get_comment(self, commenting_team: int, picking_team: int,
player_picked: str, round_num: int = 1) -> Optional[str]:
"""Get a comment from an A2A agent about a pick."""
if commenting_team not in self.agent_tools:
return None
# Minimal comment prompt
prompt = f"""Team {picking_team} picked {player_picked}!
COMMENT? {{"type":"comment","should_comment":true/false,"comment":"[ROAST THEM!]"}}"""
# Retry logic for network issues
max_retries = 2 # Fewer retries for comments to avoid delays
for attempt in range(max_retries):
try:
# Use task_id for continuity
task_id = self.task_ids.get(commenting_team)
# Limit concurrent requests
async with self._request_semaphore:
result = await self.agent_tools[commenting_team](prompt, task_id=task_id)
# Extract and store task_id
task_id = extract_task_id(result)
if task_id:
self.task_ids[commenting_team] = task_id
# Parse the response
output = parse_a2a_response(result, A2AOutput)
if output and hasattr(output, 'should_comment') and output.should_comment and output.comment:
return output.comment
return None
except Exception as e:
if attempt < max_retries - 1 and "timeout" in str(e).lower():
await asyncio.sleep(1.0) # Short retry for comments
continue
else:
print(f"Error getting comment from Team {commenting_team}: {e}")
return None
return None
# Cleanup function for session end
async def cleanup_session(manager: DynamicA2AAgentManager):
"""Clean up resources when a session ends."""
try:
await manager.stop_agents()
except Exception as e:
print(f"Error during cleanup for session {manager.session_id}: {e}")