fantasy-draft-demo / core /real_a2a_draft.py
alexmec's picture
Upload folder using huggingface_hub
a27a4ef verified
#!/usr/bin/env python3
"""
Real A2A Implementation for Fantasy Draft Agents
This shows how to properly implement Agent-to-Agent communication
using the any-agent framework's A2A protocol.
"""
import asyncio
from typing import Dict, List, Optional
from pydantic import BaseModel
from any_agent import AgentConfig, AnyAgent
from any_agent.serving import A2AServingConfig
from any_agent.tools import a2a_tool_async
from any_agent.serving.config import default_history_formatter
import httpx
from a2a.client import A2AClient
from a2a.types import MessageSendParams, SendMessageRequest
from uuid import uuid4
# Define structured outputs for agents
class DraftPick(BaseModel):
player_name: str
position: str
reasoning: str
confidence: float
class DraftComment(BaseModel):
is_commenting: bool
comment: Optional[str] = None
target_team: Optional[str] = None
sentiment: Optional[str] = None # "positive", "negative", "neutral"
class DraftResponse(BaseModel):
should_respond: bool
response: Optional[str] = None
# Base A2A Draft Agent
class A2ADraftAgent:
"""Base class for A2A-enabled draft agents."""
def __init__(self, team_name: str, strategy: str, port: int):
self.team_name = team_name
self.strategy = strategy
self.port = port
self.picks = []
self.agent = None
self.serving_task = None
async def initialize(self):
"""Initialize the agent with A2A capabilities."""
self.agent = await AnyAgent.create_async(
"openai", # Using OpenAI for better structured outputs
AgentConfig(
name=f"{self.team_name.lower().replace(' ', '_')}_agent",
model_id="gpt-4o-mini",
instructions=self._get_instructions(),
description=f"{self.team_name} - {self.strategy} draft strategy",
output_type=DraftPick | DraftComment | DraftResponse,
agent_args={"temperature": 0.8} # More personality
)
)
def _get_instructions(self) -> str:
"""Get agent-specific instructions."""
return f"""You are {self.team_name}, a fantasy football team manager.
Your draft strategy: {self.strategy}
Your personality: Competitive, confident in your strategy, willing to trash talk.
When making picks:
- Output a DraftPick with your selection and reasoning
- Be confident about your strategy
- Show personality in your reasoning
When asked to comment on another team's pick:
- Output a DraftComment
- Set is_commenting=true if you want to comment (about 50% of the time)
- Be competitive - trash talk is encouraged!
- Target the team that made the pick
- Express strong opinions about their choice
When asked to respond to a comment:
- Output a DraftResponse
- Set should_respond=true if you want to respond (about 70% of the time)
- Defend your choices aggressively
- Fire back at critics
Remember: This is a competition. Show confidence and personality!"""
async def serve(self):
"""Serve this agent via A2A protocol."""
self.serving_task = asyncio.create_task(
self.agent.serve_async(
A2AServingConfig(
port=self.port,
task_timeout_minutes=60, # 1 hour draft session
history_formatter=default_history_formatter,
endpoint=f"/{self.team_name.lower().replace(' ', '_')}"
)
)
)
print(f"๐Ÿš€ {self.team_name} agent serving on port {self.port}")
async def shutdown(self):
"""Shutdown the agent server."""
if self.serving_task:
self.serving_task.cancel()
try:
await self.serving_task
except asyncio.CancelledError:
pass
# Specific Strategy Implementations
class ZeroRBA2AAgent(A2ADraftAgent):
"""Zero RB strategy agent with A2A."""
def __init__(self, port: int):
super().__init__("Team 1", "Zero RB Strategy", port)
def _get_instructions(self) -> str:
base = super()._get_instructions()
return base + """
Specific to your Zero RB strategy:
- ALWAYS prioritize WRs in early rounds
- Mock teams that take RBs early
- Talk about injury risk and RB volatility
- Get defensive when criticized about ignoring RBs
- Your catchphrase: "RBs get injured, WRs win championships!"
"""
class RobustRBA2AAgent(A2ADraftAgent):
"""Robust RB strategy agent with A2A."""
def __init__(self, port: int):
super().__init__("Team 3", "Robust RB Strategy", port)
def _get_instructions(self) -> str:
base = super()._get_instructions()
return base + """
Specific to your Robust RB strategy:
- ALWAYS take RBs in rounds 1-2
- Mock "fancy" WR strategies as risky
- Emphasize the importance of a strong RB foundation
- Be old-school and traditional in your approach
- Your catchphrase: "Championships are won in the trenches with RBs!"
"""
# Multi-Agent Draft Coordinator
class A2ADraftCoordinator:
"""Coordinates a draft using real A2A communication."""
def __init__(self):
self.agents: Dict[int, A2ADraftAgent] = {}
self.draft_board: Dict[int, List[str]] = {}
self.available_players = [] # Would be populated from data
self.task_ids: Dict[str, str] = {} # Track task IDs for each agent
async def setup_agents(self):
"""Initialize and serve all draft agents."""
# Create agents with different strategies
self.agents[1] = ZeroRBA2AAgent(port=5001)
self.agents[2] = A2ADraftAgent("Team 2", "Best Player Available", port=5002)
self.agents[3] = RobustRBA2AAgent(port=5003)
self.agents[5] = A2ADraftAgent("Team 5", "Upside Hunter", port=5005)
# Initialize and serve all agents
for agent in self.agents.values():
await agent.initialize()
await agent.serve()
# Wait for servers to start
await asyncio.sleep(2)
print("โœ… All agents initialized and serving via A2A")
async def execute_draft_turn(self, team_num: int, round_num: int) -> Dict:
"""Execute a draft turn using A2A communication."""
if team_num not in self.agents:
return {"error": "Team not found"}
agent = self.agents[team_num]
results = {"pick": None, "comments": [], "responses": []}
async with httpx.AsyncClient() as client:
# 1. Get the agent's pick via A2A
try:
# Create A2A client for this agent
a2a_client = await A2AClient.get_client_from_agent_card_url(
client, f"http://localhost:{agent.port}"
)
# Build pick request
pick_prompt = f"""Round {round_num} - Make your pick!
Available top players: {', '.join(self.available_players[:10])}
Your previous picks: {', '.join(agent.picks)}
Output a DraftPick with your selection."""
# Send pick request
pick_request = SendMessageRequest(
id=str(uuid4()),
params=MessageSendParams(
message={
"role": "user",
"parts": [{"kind": "text", "text": pick_prompt}],
"messageId": str(uuid4()),
"contextId": f"draft_session_{team_num}"
}
)
)
# Get pick response
pick_response = await a2a_client.send_message(pick_request)
# Extract task ID for this agent if first interaction
if team_num not in self.task_ids:
self.task_ids[team_num] = pick_response.root.result.id
results["pick"] = pick_response.root.result
# 2. Get comments from other agents
for other_team, other_agent in self.agents.items():
if other_team == team_num:
continue
# Create client for other agent
other_client = await A2AClient.get_client_from_agent_card_url(
client, f"http://localhost:{other_agent.port}"
)
# Ask for comment
comment_prompt = f"""{agent.team_name} just picked {results['pick']['player_name']} in round {round_num}.
Should you comment on this pick? Output a DraftComment."""
comment_request = SendMessageRequest(
id=str(uuid4()),
params=MessageSendParams(
message={
"role": "user",
"parts": [{"kind": "text", "text": comment_prompt}],
"messageId": str(uuid4()),
"contextId": f"draft_session_{other_team}",
"taskId": self.task_ids.get(other_team) # Continue conversation
}
}
)
comment_response = await other_client.send_message(comment_request)
# Update task ID
if other_team not in self.task_ids:
self.task_ids[other_team] = comment_response.root.result.id
# Add comment if agent chose to comment
if comment_response.root.result.get("is_commenting"):
results["comments"].append({
"from": other_agent.team_name,
"comment": comment_response.root.result.get("comment")
})
# 3. Get responses to comments
for comment in results["comments"]:
response_prompt = f"""{comment['from']} said about your pick: "{comment['comment']}"
Do you want to respond? Output a DraftResponse."""
response_request = SendMessageRequest(
id=str(uuid4()),
params=MessageSendParams(
message={
"role": "user",
"parts": [{"kind": "text", "text": response_prompt}],
"messageId": str(uuid4()),
"contextId": f"draft_session_{team_num}",
"taskId": self.task_ids[team_num] # Continue conversation
}
)
)
response_response = await a2a_client.send_message(response_request)
if response_response.root.result.get("should_respond"):
results["responses"].append({
"to": comment['from'],
"response": response_response.root.result.get("response")
})
except Exception as e:
results["error"] = str(e)
return results
async def run_mock_draft(self):
"""Run a complete mock draft with A2A communication."""
await self.setup_agents()
# Simulate 3 rounds
for round_num in range(1, 4):
print(f"\n๐Ÿˆ ROUND {round_num}")
# Snake draft order
if round_num % 2 == 1:
order = [1, 2, 3, 4, 5, 6] # 4 is human, 6 is another AI
else:
order = [6, 5, 4, 3, 2, 1]
for team in order:
if team == 4:
print("๐Ÿ‘ค Your turn! (simulated)")
continue
if team in self.agents:
results = await self.execute_draft_turn(team, round_num)
# Display results
if "pick" in results and results["pick"]:
pick = results["pick"]
print(f"\n๐Ÿ“‹ {self.agents[team].team_name} selects: {pick.get('player_name')}")
print(f" Reasoning: {pick.get('reasoning')}")
# Show comments
for comment in results.get("comments", []):
print(f"\n๐Ÿ’ฌ {comment['from']}: {comment['comment']}")
# Show responses
for response in results.get("responses", []):
print(f" โ†ฉ๏ธ {self.agents[team].team_name}: {response['response']}")
# Shutdown all agents
for agent in self.agents.values():
await agent.shutdown()
print("\nโœ… Draft complete! All agents shut down.")
# Example usage
async def main():
"""Run the A2A draft demo."""
coordinator = A2ADraftCoordinator()
# In a real implementation, populate available players
coordinator.available_players = [
"Christian McCaffrey", "Tyreek Hill", "Justin Jefferson",
"CeeDee Lamb", "Austin Ekeler", "Bijan Robinson",
"Ja'Marr Chase", "Cooper Kupp", "Stefon Diggs"
]
await coordinator.run_mock_draft()
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())