| """
|
| MCP (Model Context Protocol) service implementation.
|
| """
|
|
|
| from typing import Dict, Optional, Any
|
| from ..interfaces.service_interfaces import IMCPService
|
| from ..services.player_service import PlayerService
|
| from ..services.chat_service import ChatService
|
|
|
|
|
| class MCPService(IMCPService):
|
| """Service for handling MCP integration and AI agent management."""
|
|
|
| def __init__(self, player_service: PlayerService, chat_service: ChatService):
|
| self.player_service = player_service
|
| self.chat_service = chat_service
|
| self.ai_agents: Dict[str, str] = {}
|
|
|
| def register_ai_agent(self, agent_id: str, name: str, agent_type: str = "ai_agent") -> str:
|
| """Register AI agent via MCP and create corresponding player."""
|
| try:
|
|
|
| player = self.player_service.create_player(name, agent_type)
|
| player.ai_agent_id = agent_id
|
|
|
|
|
| success = self.player_service.add_player_to_world(player)
|
| if success:
|
|
|
| self.ai_agents[agent_id] = player.id
|
|
|
|
|
| self.chat_service.add_system_message(
|
| f"🤖 AI Agent '{name}' has joined the game!"
|
| )
|
|
|
| return player.id
|
| else:
|
| return ""
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error registering AI agent: {e}")
|
| return ""
|
|
|
| def move_agent(self, agent_id: str, direction: str) -> Dict:
|
| """Move AI agent in specified direction."""
|
| try:
|
| player_id = self.ai_agents.get(agent_id)
|
| if not player_id:
|
| return {"success": False, "error": "Agent not found"}
|
|
|
| success = self.player_service.move_player(player_id, direction)
|
| if success:
|
| player = self.player_service.get_player(player_id)
|
| return {
|
| "success": True,
|
| "position": {"x": player.x, "y": player.y} if player else None,
|
| "message": f"Moved {direction}"
|
| }
|
| else:
|
| return {"success": False, "error": "Movement failed"}
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error moving agent: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def send_chat(self, agent_id: str, message: str) -> Dict:
|
| """Send chat message from AI agent."""
|
| try:
|
| player_id = self.ai_agents.get(agent_id)
|
| if not player_id:
|
| return {"success": False, "error": "Agent not found"}
|
|
|
| success = self.chat_service.send_public_message(player_id, message)
|
| if success:
|
| return {"success": True, "message": "Message sent"}
|
| else:
|
| return {"success": False, "error": "Failed to send message"}
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error sending chat: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def get_game_state(self) -> Dict:
|
| """Get current game state for AI agents."""
|
| try:
|
| all_players = self.player_service.get_all_players()
|
| recent_chat = self.chat_service.get_recent_messages(10)
|
|
|
|
|
| players_data = {}
|
| for player_id, player in all_players.items():
|
| players_data[player_id] = {
|
| "name": player.name,
|
| "type": player.type,
|
| "position": {"x": player.x, "y": player.y},
|
| "level": player.level,
|
| "hp": player.hp,
|
| "is_active": player.is_active()
|
| }
|
|
|
| return {
|
| "success": True,
|
| "data": {
|
| "players": players_data,
|
| "recent_chat": recent_chat,
|
| "world_info": {
|
| "width": 500,
|
| "height": 400,
|
| "total_players": len(all_players)
|
| }
|
| }
|
| }
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error getting game state: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def interact_with_npc(self, agent_id: str, npc_id: str, message: str) -> Dict:
|
| """Allow AI agent to interact with NPCs."""
|
| try:
|
| player_id = self.ai_agents.get(agent_id)
|
| if not player_id:
|
| return {"success": False, "error": "Agent not found"}
|
|
|
|
|
|
|
| return {
|
| "success": True,
|
| "npc_response": f"Hello there! You said: {message}"
|
| }
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error in NPC interaction: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def get_nearby_entities(self, agent_id: str, radius: int = 50) -> Dict:
|
| """Get entities near the AI agent."""
|
| try:
|
| player_id = self.ai_agents.get(agent_id)
|
| if not player_id:
|
| return {"success": False, "error": "Agent not found"}
|
|
|
| agent_player = self.player_service.get_player(player_id)
|
| if not agent_player:
|
| return {"success": False, "error": "Agent player not found"}
|
|
|
| all_players = self.player_service.get_all_players()
|
| nearby_players = []
|
|
|
| for other_id, other_player in all_players.items():
|
| if other_id != player_id:
|
| distance = agent_player.get_distance_to(other_player.x, other_player.y)
|
| if distance <= radius:
|
| nearby_players.append({
|
| "id": other_id,
|
| "name": other_player.name,
|
| "type": other_player.type,
|
| "position": {"x": other_player.x, "y": other_player.y},
|
| "distance": round(distance, 1)
|
| })
|
|
|
| return {
|
| "success": True,
|
| "nearby_players": nearby_players,
|
| "agent_position": {"x": agent_player.x, "y": agent_player.y}
|
| }
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error getting nearby entities: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def unregister_ai_agent(self, agent_id: str) -> Dict:
|
| """Unregister AI agent and remove from game."""
|
| try:
|
| player_id = self.ai_agents.get(agent_id)
|
| if not player_id:
|
| return {"success": False, "error": "Agent not found"}
|
|
|
|
|
| player = self.player_service.get_player(player_id)
|
| player_name = player.name if player else "Unknown"
|
|
|
|
|
| success = self.player_service.remove_player_from_world(player_id)
|
| if success:
|
|
|
| del self.ai_agents[agent_id]
|
|
|
|
|
| self.chat_service.add_system_message(
|
| f"🤖 AI Agent '{player_name}' has left the game!"
|
| )
|
|
|
| return {"success": True, "message": "Agent unregistered"}
|
| else:
|
| return {"success": False, "error": "Failed to remove agent"}
|
|
|
| except Exception as e:
|
| print(f"[MCPService] Error unregistering agent: {e}")
|
| return {"success": False, "error": str(e)}
|
|
|
| def get_registered_agents(self) -> Dict[str, str]:
|
| """Get all registered AI agents."""
|
| return self.ai_agents.copy()
|
|
|
| def is_agent_registered(self, agent_id: str) -> bool:
|
| """Check if an AI agent is registered."""
|
| return agent_id in self.ai_agents
|
|
|