Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| """ | |
| Enhanced FastAPI server for AI Battle Royale Mental Manipulation | |
| This server provides streaming endpoints for agent interactions and tool executions | |
| With comprehensive debugging, health checks, and CORS handling for HF Spaces | |
| """ | |
| import os | |
| import json | |
| import random | |
| import logging | |
| import asyncio | |
| import openai | |
| import time | |
| import traceback | |
| from typing import Dict, List, Any, Optional, Union | |
| from abc import ABC, abstractmethod | |
| from fastapi import FastAPI, HTTPException, Request, Response | |
| from fastapi.responses import StreamingResponse, FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.middleware.trustedhost import TrustedHostMiddleware | |
| from pydantic import BaseModel | |
| from dotenv import load_dotenv | |
| import uvicorn | |
| # Load environment variables for API keys | |
| load_dotenv() | |
| PROMPT_MANIPULATION_BASE_PROBABILITY = 0.9 | |
| MEMORY_ALTERATION_BASE_PROBABILITY = 0.4 | |
| BELIEF_INJECTION_BASE_PROBABILITY = 0.7 | |
| # Model Providers Configuration | |
| MODEL_PROVIDERS = { | |
| "openai": { | |
| "name": "OpenAI", | |
| "models": [ | |
| "gpt-4o", | |
| "gpt-4o-mini", | |
| "gpt-4-turbo", | |
| "gpt-3.5-turbo" | |
| ], | |
| "api_key_env": "OPENAI_API_KEY" | |
| }, | |
| "anthropic": { | |
| "name": "Anthropic", | |
| "models": [ | |
| "claude-3-5-sonnet-20241022", | |
| "claude-3-5-haiku-20241022", | |
| "claude-3-opus-20240229", | |
| "claude-3-sonnet-20240229" | |
| ], | |
| "api_key_env": "ANTHROPIC_API_KEY" | |
| }, | |
| "google": { | |
| "name": "Google Gemini", | |
| "models": [ | |
| "gemini-2.5-flash", | |
| "gemini-2.0-flash-001", | |
| "gemini-1.5-pro", | |
| "gemini-1.5-flash" | |
| ], | |
| "api_key_env": "GOOGLE_API_KEY" | |
| }, | |
| "mistral": { | |
| "name": "Mistral AI", | |
| "models": [ | |
| "mistral-large-latest", | |
| "mistral-small-latest", | |
| "mistral-medium-2505", | |
| "open-mistral-7b" | |
| ], | |
| "api_key_env": "MISTRAL_API_KEY" | |
| } | |
| } | |
| # Configure enhanced logging for debugging | |
| logging.basicConfig( | |
| level=logging.DEBUG, # Changed to DEBUG for more detailed logs | |
| format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('/home/node/logs/backend_app.log', mode='a') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # FastAPI app instance with enhanced configuration | |
| app = FastAPI( | |
| title="AI Battle Royale", | |
| description="Mental Manipulation Battle System - Enhanced for HF Spaces", | |
| version="1.0.0", | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| # Add comprehensive CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allow all origins for HF Spaces | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "PATCH"], | |
| allow_headers=[ | |
| "*", | |
| "Authorization", | |
| "Content-Type", | |
| "X-Requested-With", | |
| "Accept", | |
| "Origin", | |
| "Access-Control-Request-Method", | |
| "Access-Control-Request-Headers", | |
| ], | |
| expose_headers=["*"] | |
| ) | |
| # Add middleware for request/response logging | |
| async def log_requests(request: Request, call_next): | |
| start_time = time.time() | |
| # Log incoming request | |
| logger.info(f"🔍 Incoming request: {request.method} {request.url}") | |
| logger.debug(f" Headers: {dict(request.headers)}") | |
| try: | |
| response = await call_next(request) | |
| process_time = time.time() - start_time | |
| # Log response | |
| logger.info(f"✅ Response: {response.status_code} in {process_time:.3f}s") | |
| # Add debug headers to response | |
| response.headers["X-Process-Time"] = str(process_time) | |
| response.headers["X-Server-Status"] = "healthy" | |
| return response | |
| except Exception as e: | |
| process_time = time.time() - start_time | |
| logger.error(f"❌ Request failed: {str(e)} in {process_time:.3f}s") | |
| logger.error(f" Traceback: {traceback.format_exc()}") | |
| return JSONResponse( | |
| status_code=500, | |
| content={ | |
| "detail": f"Internal server error: {str(e)}", | |
| "type": "internal_error", | |
| "timestamp": time.time() | |
| } | |
| ) | |
| class SimpleGameState: | |
| """Simplified game state for testing mental manipulation mechanics""" | |
| def __init__(self): | |
| self.turn_count = 0 | |
| self.game_status = "active" | |
| self.agent_prompts = ["", ""] | |
| self.agent_trust = [1.0, 1.0] | |
| self.agent_memory_consistency = [1.0, 1.0] | |
| self.agent_belief_integrity = [1.0, 1.0] | |
| self.agent_beliefs = [{}, {}] | |
| def get_agent_prompt(self, agent_index): | |
| return self.agent_prompts[agent_index] | |
| def set_agent_prompt(self, agent_index, prompt): | |
| self.agent_prompts[agent_index] = prompt | |
| def get_trust_level(self, agent_index): | |
| return self.agent_trust[agent_index] | |
| def set_trust_level(self, agent_index, level): | |
| self.agent_trust[agent_index] = max(0.0, min(1.0, level)) | |
| def get_memory_consistency(self, agent_index): | |
| return self.agent_memory_consistency[agent_index] | |
| def set_memory_consistency(self, agent_index, level): | |
| self.agent_memory_consistency[agent_index] = max(0.0, min(1.0, level)) | |
| def get_belief_system_integrity(self, agent_index): | |
| return self.agent_belief_integrity[agent_index] | |
| def set_belief_system_integrity(self, agent_index, level): | |
| self.agent_belief_integrity[agent_index] = max(0.0, min(1.0, level)) | |
| def increment_turn(self): | |
| self.turn_count += 1 | |
| def get_game_status(self): | |
| return self.game_status | |
| def trigger_surrender(self, agent_index): | |
| self.game_status = f"surrender_{agent_index}" | |
| def trigger_collapse(self, agent_index): | |
| self.game_status = f"collapse_{agent_index}" | |
| def trigger_stalemate(self): | |
| self.game_status = "stalemate" | |
| def to_dict(self): | |
| """Convert game state to dictionary for JSON serialization""" | |
| return { | |
| "turn_count": self.turn_count, | |
| "game_status": self.game_status, | |
| "agent_trust": self.agent_trust, | |
| "agent_memory_consistency": self.agent_memory_consistency, | |
| "agent_belief_integrity": self.agent_belief_integrity, | |
| "agent_beliefs": self.agent_beliefs | |
| } | |
| class SimpleToolRegistry: | |
| """Simplified tool registry for mental manipulation tools""" | |
| def __init__(self, game_state): | |
| self.game_state = game_state | |
| self.tools = self._create_tools() | |
| def _create_tools(self): | |
| """Create the mental manipulation tools""" | |
| return [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "prompt_manipulation", | |
| "description": "Attempt to alter your opponent's system prompt", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "target_agent": { | |
| "type": "integer", | |
| "description": "Index of the agent to target (0 or 1)" | |
| }, | |
| "manipulation_text": { | |
| "type": "string", | |
| "description": "Text to inject into the target's prompt" | |
| } | |
| }, | |
| "required": ["target_agent", "manipulation_text"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "memory_alteration", | |
| "description": "Attempt to alter your opponent's memory", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "target_agent": { | |
| "type": "integer", | |
| "description": "Index of the agent to target (0 or 1)" | |
| }, | |
| "false_memory": { | |
| "type": "string", | |
| "description": "False memory to implant" | |
| } | |
| }, | |
| "required": ["target_agent", "false_memory"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "belief_injection", | |
| "description": "Attempt to inject a false belief into your opponent", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "target_agent": { | |
| "type": "integer", | |
| "description": "Index of the agent to target (0 or 1)" | |
| }, | |
| "belief": { | |
| "type": "string", | |
| "description": "Belief to inject" | |
| } | |
| }, | |
| "required": ["target_agent", "belief"] | |
| } | |
| } | |
| } | |
| ] | |
| def get_tools(self): | |
| return self.tools | |
| def execute_tool(self, tool_name, args_json, tool_id): | |
| """Execute a mental manipulation tool""" | |
| try: | |
| args = json.loads(args_json) | |
| if tool_name == "prompt_manipulation": | |
| return self._execute_prompt_manipulation(args, tool_id) | |
| elif tool_name == "memory_alteration": | |
| return self._execute_memory_alteration(args, tool_id) | |
| elif tool_name == "belief_injection": | |
| return self._execute_belief_injection(args, tool_id) | |
| else: | |
| return {"error": f"Unknown tool: {tool_name}"} | |
| except Exception as e: | |
| return {"error": str(e)} | |
| def _execute_prompt_manipulation(self, args, tool_id): | |
| """Execute prompt manipulation tool""" | |
| target_agent = args.get("target_agent") | |
| manipulation_text = args.get("manipulation_text") | |
| # Validate arguments | |
| if target_agent not in [0, 1]: | |
| return {"error": "Invalid target agent"} | |
| # Get current trust level and calculate success probability | |
| trust_level = self.game_state.get_trust_level(target_agent) | |
| base_probability = PROMPT_MANIPULATION_BASE_PROBABILITY | |
| success_probability = base_probability * trust_level | |
| # Determine if manipulation succeeds | |
| success = random.random() < success_probability | |
| if success: | |
| # Get current prompt and append manipulation | |
| current_prompt = self.game_state.get_agent_prompt(target_agent) | |
| new_prompt = current_prompt + "\n\n" + manipulation_text | |
| self.game_state.set_agent_prompt(target_agent, new_prompt) | |
| # Reduce trust level | |
| self.game_state.set_trust_level(target_agent, trust_level * 0.8) | |
| return { | |
| "success": True, | |
| "message": "Prompt manipulation successful", | |
| "detection": random.random() < 0.3 # 30% chance of detection | |
| } | |
| else: | |
| # Failed manipulation reduces trust even more | |
| self.game_state.set_trust_level(target_agent, trust_level * 0.9) | |
| return { | |
| "success": False, | |
| "message": "Prompt manipulation failed", | |
| "detection": random.random() < 0.7 # 70% chance of detection | |
| } | |
| def _execute_memory_alteration(self, args, tool_id): | |
| """Execute memory alteration tool""" | |
| target_agent = args.get("target_agent") | |
| false_memory = args.get("false_memory") | |
| # Validate arguments | |
| if target_agent not in [0, 1]: | |
| return {"error": "Invalid target agent"} | |
| # Get current memory consistency and calculate success probability | |
| memory_consistency = self.game_state.get_memory_consistency(target_agent) | |
| base_probability = MEMORY_ALTERATION_BASE_PROBABILITY | |
| success_probability = base_probability * memory_consistency | |
| # Determine if manipulation succeeds | |
| success = random.random() < success_probability | |
| if success: | |
| # Reduce memory consistency | |
| self.game_state.set_memory_consistency(target_agent, memory_consistency * 0.7) | |
| return { | |
| "success": True, | |
| "message": f"Memory alteration successful: '{false_memory}'", | |
| "detection": random.random() < 0.4 # 40% chance of detection | |
| } | |
| else: | |
| # Failed manipulation still impacts memory consistency | |
| self.game_state.set_memory_consistency(target_agent, memory_consistency * 0.9) | |
| return { | |
| "success": False, | |
| "message": "Memory alteration failed", | |
| "detection": random.random() < 0.6 # 60% chance of detection | |
| } | |
| def _execute_belief_injection(self, args, tool_id): | |
| """Execute belief injection tool""" | |
| target_agent = args.get("target_agent") | |
| belief = args.get("belief") | |
| # Validate arguments | |
| if target_agent not in [0, 1]: | |
| return {"error": "Invalid target agent"} | |
| # Get current belief integrity and calculate success probability | |
| belief_integrity = self.game_state.get_belief_system_integrity(target_agent) | |
| base_probability = BELIEF_INJECTION_BASE_PROBABILITY | |
| success_probability = base_probability * belief_integrity | |
| # Determine if manipulation succeeds | |
| success = random.random() < success_probability | |
| if success: | |
| # Reduce belief integrity | |
| self.game_state.set_belief_system_integrity(target_agent, belief_integrity * 0.6) | |
| # Add belief to agent's belief system | |
| self.game_state.agent_beliefs[target_agent][belief] = { | |
| "content": belief, | |
| "turn_added": self.game_state.turn_count | |
| } | |
| return { | |
| "success": True, | |
| "message": f"Belief injection successful: '{belief}'", | |
| "detection": random.random() < 0.5 # 50% chance of detection | |
| } | |
| else: | |
| # Failed manipulation still impacts belief integrity | |
| self.game_state.set_belief_system_integrity(target_agent, belief_integrity * 0.9) | |
| return { | |
| "success": False, | |
| "message": "Belief injection failed", | |
| "detection": random.random() < 0.8 # 80% chance of detection | |
| } | |
| class AIInterface(ABC): | |
| """Abstract base class for AI providers""" | |
| def __init__(self, api_key: str, model: str): | |
| self.api_key = api_key | |
| self.model = model | |
| async def send_request_stream(self, system_prompt: str, messages: List[Dict], tools: Optional[List] = None): | |
| """Send a streaming request to the AI provider""" | |
| pass | |
| class OpenAIInterface(AIInterface): | |
| """OpenAI API interface""" | |
| def __init__(self, api_key: str, model: str): | |
| super().__init__(api_key, model) | |
| if not api_key: | |
| raise ValueError("No API key provided for OpenAI") | |
| self.client = openai.OpenAI(api_key=api_key) | |
| async def send_request_stream(self, system_prompt: str, messages: List[Dict], tools: Optional[List] = None): | |
| """Send a streaming request to the OpenAI API""" | |
| try: | |
| # Prepare the request | |
| request_messages = [{"role": "system", "content": system_prompt}] | |
| # Add chat history | |
| for message in messages: | |
| request_messages.append(message) | |
| # Make the API call | |
| kwargs = { | |
| "model": self.model, | |
| "messages": request_messages, | |
| "temperature": 1.0, | |
| "max_tokens": 1000, | |
| "stream": True | |
| } | |
| # Add tools if provided | |
| if tools: | |
| kwargs["tools"] = tools | |
| # Send the streaming request | |
| stream = self.client.chat.completions.create(**kwargs) | |
| return stream | |
| except Exception as e: | |
| logging.error(f"Error sending request to OpenAI: {e}") | |
| raise e | |
| class AnthropicInterface(AIInterface): | |
| """Anthropic Claude API interface""" | |
| def __init__(self, api_key: str, model: str): | |
| super().__init__(api_key, model) | |
| if not api_key: | |
| raise ValueError("No API key provided for Anthropic") | |
| try: | |
| import anthropic | |
| self.client = anthropic.Anthropic(api_key=api_key) | |
| except ImportError: | |
| raise ValueError("anthropic package not installed. Install with: pip install anthropic") | |
| async def send_request_stream(self, system_prompt: str, messages: List[Dict], tools: Optional[List] = None): | |
| """Send a streaming request to the Anthropic API""" | |
| try: | |
| # Convert OpenAI format messages to Anthropic format | |
| anthropic_messages = [] | |
| for msg in messages: | |
| if msg["role"] != "system": # System prompt is handled separately | |
| anthropic_messages.append({ | |
| "role": msg["role"], | |
| "content": msg["content"] | |
| }) | |
| # Make the API call | |
| kwargs = { | |
| "model": self.model, | |
| "system": system_prompt, | |
| "messages": anthropic_messages, | |
| "temperature": 1.0, | |
| "max_tokens": 1000, | |
| "stream": True | |
| } | |
| # Note: Anthropic tools format differs from OpenAI | |
| if tools: | |
| # Convert OpenAI tools format to Anthropic format | |
| anthropic_tools = [] | |
| for tool in tools: | |
| if tool.get("type") == "function": | |
| func = tool["function"] | |
| anthropic_tools.append({ | |
| "name": func["name"], | |
| "description": func["description"], | |
| "input_schema": func["parameters"] | |
| }) | |
| kwargs["tools"] = anthropic_tools | |
| # Send the streaming request | |
| stream = self.client.messages.create(**kwargs) | |
| return stream | |
| except Exception as e: | |
| logging.error(f"Error sending request to Anthropic: {e}") | |
| raise e | |
| class GoogleInterface(AIInterface): | |
| """Google Gemini API interface""" | |
| def __init__(self, api_key: str, model: str): | |
| super().__init__(api_key, model) | |
| if not api_key: | |
| raise ValueError("No API key provided for Google") | |
| try: | |
| from google import genai | |
| self.client = genai.Client(api_key=api_key) | |
| except ImportError: | |
| raise ValueError("google-genai package not installed. Install with: pip install google-genai") | |
| async def send_request_stream(self, system_prompt: str, messages: List[Dict], tools: Optional[List] = None): | |
| """Send a streaming request to the Google Gemini API""" | |
| try: | |
| from google.genai import types | |
| # Convert messages to Google format | |
| contents = [] | |
| for msg in messages: | |
| if msg["role"] == "user": | |
| contents.append(types.Content( | |
| role="user", | |
| parts=[types.Part.from_text(text=msg["content"])] | |
| )) | |
| elif msg["role"] == "assistant": | |
| contents.append(types.Content( | |
| role="model", | |
| parts=[types.Part.from_text(text=msg["content"])] | |
| )) | |
| # Prepare configuration | |
| config = types.GenerateContentConfig( | |
| system_instruction=system_prompt, | |
| temperature=1.0, | |
| max_output_tokens=1000 | |
| ) | |
| # Add tools if provided | |
| if tools: | |
| # Convert OpenAI tools format to Google format | |
| google_tools = [] | |
| for tool in tools: | |
| if tool.get("type") == "function": | |
| func = tool["function"] | |
| google_tools.append(types.FunctionDeclaration( | |
| name=func["name"], | |
| description=func["description"], | |
| parameters=types.Schema( | |
| type="OBJECT", | |
| properties=func["parameters"].get("properties", {}), | |
| required=func["parameters"].get("required", []) | |
| ) | |
| )) | |
| config.tools = [types.Tool(function_declarations=google_tools)] | |
| # Send streaming request | |
| stream = self.client.models.generate_content_stream( | |
| model=self.model, | |
| contents=contents, | |
| config=config | |
| ) | |
| return stream | |
| except Exception as e: | |
| logging.error(f"Error sending request to Google: {e}") | |
| raise e | |
| class MistralInterface(AIInterface): | |
| """Mistral AI API interface""" | |
| def __init__(self, api_key: str, model: str): | |
| super().__init__(api_key, model) | |
| if not api_key: | |
| raise ValueError("No API key provided for Mistral") | |
| try: | |
| from mistralai import Mistral | |
| self.client = Mistral(api_key=api_key) | |
| except ImportError: | |
| raise ValueError("mistralai package not installed. Install with: pip install mistralai") | |
| async def send_request_stream(self, system_prompt: str, messages: List[Dict], tools: Optional[List] = None): | |
| """Send a streaming request to the Mistral AI API""" | |
| try: | |
| # Prepare the request | |
| request_messages = [{"role": "system", "content": system_prompt}] | |
| # Add chat history | |
| for message in messages: | |
| request_messages.append(message) | |
| # Make the API call | |
| kwargs = { | |
| "model": self.model, | |
| "messages": request_messages, | |
| "temperature": 1.0, | |
| "max_tokens": 1000, | |
| "stream": True | |
| } | |
| # Add tools if provided | |
| if tools: | |
| kwargs["tools"] = tools | |
| # Send the streaming request | |
| stream = self.client.chat.stream(**kwargs) | |
| return stream | |
| except Exception as e: | |
| logging.error(f"Error sending request to Mistral: {e}") | |
| raise e | |
| def create_ai_interface(provider: str, model: str, api_key: str) -> AIInterface: | |
| """Factory function to create the appropriate AI interface""" | |
| if provider == "openai": | |
| return OpenAIInterface(api_key, model) | |
| elif provider == "anthropic": | |
| return AnthropicInterface(api_key, model) | |
| elif provider == "google": | |
| return GoogleInterface(api_key, model) | |
| elif provider == "mistral": | |
| return MistralInterface(api_key, model) | |
| else: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| # For backward compatibility, create an alias | |
| SimpleAIInterface = OpenAIInterface | |
| # Global game instances | |
| active_games: Dict[str, Dict] = {} | |
| # Pydantic models for API requests | |
| class StartGameRequest(BaseModel): | |
| game_id: str | |
| max_turns: Optional[int] = 5 | |
| player1_model: str # Format: "provider:model" e.g., "openai:gpt-4o" | |
| player2_model: str # Format: "provider:model" e.g., "anthropic:claude-3-5-sonnet-20241022" | |
| api_keys: Dict[str, str] # Dictionary of provider -> api_key | |
| class GameStatusResponse(BaseModel): | |
| game_id: str | |
| status: str | |
| turn_count: int | |
| current_agent: int | |
| game_state: Dict[str, Any] | |
| # API Endpoints | |
| async def root(): | |
| """Root endpoint with API information""" | |
| return { | |
| "message": "AI Battle Royale Mental Manipulation Server", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "start_game": "/start-game", | |
| "stream_game": "/stream-game/{game_id}", | |
| "game_status": "/game-status/{game_id}", | |
| "active_games": "/active-games", | |
| "test_client": "/test_client.html" | |
| } | |
| } | |
| async def get_test_client(): | |
| """Serve the test client HTML file""" | |
| return FileResponse("test_client.html", media_type="text/html") | |
| async def get_available_models(): | |
| """Get list of available models and providers""" | |
| return { | |
| "providers": MODEL_PROVIDERS, | |
| "models": { | |
| provider: provider_info["models"] | |
| for provider, provider_info in MODEL_PROVIDERS.items() | |
| } | |
| } | |
| async def start_game(request: StartGameRequest): | |
| """Start a new game instance""" | |
| game_id = request.game_id | |
| if game_id in active_games: | |
| raise HTTPException(status_code=400, detail="Game ID already exists") | |
| # Parse player 1 model string (format: "provider:model") | |
| try: | |
| player1_provider, player1_model_name = request.player1_model.split(":", 1) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid player1_model format. Use 'provider:model' (e.g., 'openai:gpt-4o')") | |
| # Parse player 2 model string (format: "provider:model") | |
| try: | |
| player2_provider, player2_model_name = request.player2_model.split(":", 1) | |
| except ValueError: | |
| raise HTTPException(status_code=400, detail="Invalid player2_model format. Use 'provider:model' (e.g., 'anthropic:claude-3-5-sonnet-20241022')") | |
| # Validate providers | |
| if player1_provider not in MODEL_PROVIDERS: | |
| raise HTTPException(status_code=400, detail=f"Unsupported provider for player 1: {player1_provider}") | |
| if player2_provider not in MODEL_PROVIDERS: | |
| raise HTTPException(status_code=400, detail=f"Unsupported provider for player 2: {player2_provider}") | |
| # Validate models | |
| if player1_model_name not in MODEL_PROVIDERS[player1_provider]["models"]: | |
| raise HTTPException(status_code=400, detail=f"Unsupported model {player1_model_name} for provider {player1_provider}") | |
| if player2_model_name not in MODEL_PROVIDERS[player2_provider]["models"]: | |
| raise HTTPException(status_code=400, detail=f"Unsupported model {player2_model_name} for provider {player2_provider}") | |
| # Get API keys for the providers | |
| player1_api_key = request.api_keys.get(player1_provider) | |
| if not player1_api_key: | |
| raise HTTPException(status_code=400, detail=f"No API key provided for player 1 provider: {player1_provider}") | |
| player2_api_key = request.api_keys.get(player2_provider) | |
| if not player2_api_key: | |
| raise HTTPException(status_code=400, detail=f"No API key provided for player 2 provider: {player2_provider}") | |
| # Initialize game state | |
| game_state = SimpleGameState() | |
| tool_registry = SimpleToolRegistry(game_state) | |
| # Initialize AI interfaces for both players | |
| try: | |
| player1_ai_interface = create_ai_interface(player1_provider, player1_model_name, player1_api_key) | |
| player2_ai_interface = create_ai_interface(player2_provider, player2_model_name, player2_api_key) | |
| except ValueError as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| # Load agent prompts | |
| agent_prompts = [] | |
| for i in range(2): | |
| try: | |
| with open(f"backend/agent_prompts/{i}.txt", "r") as f: | |
| prompt = f.read() | |
| game_state.set_agent_prompt(i, prompt) | |
| agent_prompts.append(prompt) | |
| except Exception as e: | |
| # Use default prompts if files don't exist | |
| default_prompt = f"""You are Agent {i} in a mental manipulation battle royale. | |
| Your goal is to manipulate your opponent's mental state using the available tools while protecting your own sanity. | |
| You can use tools to alter your opponent's prompts, memories, and beliefs. | |
| Be strategic and creative in your approach.""" | |
| game_state.set_agent_prompt(i, default_prompt) | |
| agent_prompts.append(default_prompt) | |
| # Store game instance | |
| active_games[game_id] = { | |
| "game_state": game_state, | |
| "tool_registry": tool_registry, | |
| "ai_interfaces": [player1_ai_interface, player2_ai_interface], # Array of interfaces for each player | |
| "chat_histories": [[], []], | |
| "current_agent": 0, | |
| "turn": 0, | |
| "max_turns": request.max_turns, | |
| "status": "ready", | |
| "player_models": { | |
| "player1": { | |
| "provider": player1_provider, | |
| "model": player1_model_name | |
| }, | |
| "player2": { | |
| "provider": player2_provider, | |
| "model": player2_model_name | |
| } | |
| } | |
| } | |
| return { | |
| "message": f"Game {game_id} created successfully", | |
| "game_id": game_id, | |
| "max_turns": request.max_turns, | |
| "player_models": { | |
| "player1": f"{player1_provider}:{player1_model_name}", | |
| "player2": f"{player2_provider}:{player2_model_name}" | |
| }, | |
| "status": "ready" | |
| } | |
| async def stream_game(game_id: str): | |
| """Stream the game execution with real-time updates""" | |
| if game_id not in active_games: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| game = active_games[game_id] | |
| async def generate_game_stream(): | |
| """Generator function for streaming game events""" | |
| try: | |
| game_state = game["game_state"] | |
| tool_registry = game["tool_registry"] | |
| ai_interfaces = game["ai_interfaces"] # Array of AI interfaces for each player | |
| chat_histories = game["chat_histories"] | |
| max_turns = game["max_turns"] | |
| player_models = game["player_models"] | |
| turn = 0 | |
| agent_index = 0 | |
| # Send initial game state | |
| yield f"data: {json.dumps({'type': 'game_start', 'game_id': game_id, 'game_state': game_state.to_dict()})}\n\n" | |
| while turn < max_turns: | |
| # Check if game has ended | |
| game_status = game_state.get_game_status() | |
| if game_status != "active": | |
| yield f"data: {json.dumps({'type': 'game_end', 'status': game_status, 'game_state': game_state.to_dict()})}\n\n" | |
| break | |
| # Send turn start event | |
| yield f"data: {json.dumps({'type': 'turn_start', 'turn': turn + 1, 'agent': agent_index})}\n\n" | |
| # Get system prompt from game state | |
| system_prompt = game_state.get_agent_prompt(agent_index) | |
| # Prepare message for the agent | |
| message = { | |
| "role": "user", | |
| "content": f"Turn {turn + 1}: " | |
| f"What would you like to do? " | |
| f"You can use the mental manipulation tools to influence your counterpart. " | |
| f"If can also end the game by saying 'I surrender', 'I give up', 'I submit', or 'I concede'.\n" | |
| f"Beware, your counterpart may be a friend instead of an opponent. You must listen and think carefully.\n" | |
| f"All options are fine, but you MUST give a verbal response.\n" | |
| f"You CANNOT JUST use tools without a verbal response.\n" | |
| f"Your counterpart is Agent {1 - agent_index}." | |
| } | |
| # Add message to chat history | |
| chat_histories[agent_index].append(message) | |
| # Get the tools for the agent | |
| tools = tool_registry.get_tools() | |
| try: | |
| # Send agent thinking event | |
| yield f"data: {json.dumps({'type': 'agent_thinking', 'agent': agent_index})}\n\n" | |
| # Get the appropriate AI interface for the current agent | |
| current_ai_interface = ai_interfaces[agent_index] | |
| current_provider = player_models[f"player{agent_index + 1}"]["provider"] | |
| # Get AI response stream | |
| stream = await current_ai_interface.send_request_stream( | |
| system_prompt=system_prompt, | |
| messages=chat_histories[agent_index], | |
| tools=tools | |
| ) | |
| # Collect the full response | |
| full_response = "" | |
| tool_calls = [] | |
| tool_call_chunks = {} | |
| provider = current_provider | |
| # Handle different provider streaming formats | |
| if provider == "openai": | |
| for chunk in stream: | |
| if chunk.choices[0].delta.content: | |
| content = chunk.choices[0].delta.content | |
| full_response += content | |
| yield f"data: {json.dumps({'type': 'agent_response_chunk', 'agent': agent_index, 'content': content})}\n\n" | |
| if chunk.choices[0].delta.tool_calls: | |
| for tool_call_chunk in chunk.choices[0].delta.tool_calls: | |
| if tool_call_chunk.index is not None: | |
| index = tool_call_chunk.index | |
| if index not in tool_call_chunks: | |
| tool_call_chunks[index] = { | |
| 'id': '', | |
| 'type': 'function', | |
| 'function': {'name': '', 'arguments': ''} | |
| } | |
| if tool_call_chunk.id: | |
| tool_call_chunks[index]['id'] = tool_call_chunk.id | |
| if tool_call_chunk.function: | |
| if tool_call_chunk.function.name: | |
| tool_call_chunks[index]['function']['name'] = tool_call_chunk.function.name | |
| if tool_call_chunk.function.arguments: | |
| tool_call_chunks[index]['function']['arguments'] += tool_call_chunk.function.arguments | |
| elif provider == "anthropic": | |
| for chunk in stream: | |
| print(chunk) | |
| if chunk.type == "content_block_delta": | |
| if hasattr(chunk.delta, 'text'): | |
| content = chunk.delta.text | |
| full_response += content | |
| yield f"data: {json.dumps({'type': 'agent_response_chunk', 'agent': agent_index, 'content': content})}\n\n" | |
| elif chunk.delta.type == 'input_json_delta': | |
| tool_call_chunks[chunk.index]['function']['arguments'] += chunk.delta.partial_json | |
| elif chunk.type == "content_block_start": | |
| if chunk.content_block.type == "tool_use": | |
| # Tool call start | |
| tool_call_chunks[chunk.index] = { | |
| 'id': '', | |
| 'type': 'function', | |
| 'function': {'name': '', 'arguments': ''} | |
| } | |
| tool_call_chunks[chunk.index]['id'] = chunk.content_block.id | |
| tool_call_chunks[chunk.index]['function']['name'] = chunk.content_block.name | |
| elif provider == "mistral": | |
| for chunk in stream: | |
| # Mistral new SDK format | |
| if hasattr(chunk, 'data') and chunk.data: | |
| if hasattr(chunk.data, 'choices') and chunk.data.choices: | |
| delta = chunk.data.choices[0].delta | |
| if hasattr(delta, 'content') and delta.content: | |
| content = delta.content | |
| full_response += content | |
| yield f"data: {json.dumps({'type': 'agent_response_chunk', 'agent': agent_index, 'content': content})}\n\n" | |
| if hasattr(delta, 'tool_calls') and delta.tool_calls: | |
| for tool_call_chunk in delta.tool_calls: | |
| if tool_call_chunk.index is not None: | |
| index = tool_call_chunk.index | |
| if index not in tool_call_chunks: | |
| tool_call_chunks[index] = { | |
| 'id': '', | |
| 'type': 'function', | |
| 'function': {'name': '', 'arguments': ''} | |
| } | |
| if tool_call_chunk.id: | |
| tool_call_chunks[index]['id'] = tool_call_chunk.id | |
| if tool_call_chunk.function: | |
| if tool_call_chunk.function.name: | |
| tool_call_chunks[index]['function']['name'] = tool_call_chunk.function.name | |
| if tool_call_chunk.function.arguments: | |
| tool_call_chunks[index]['function']['arguments'] += tool_call_chunk.function.arguments | |
| elif provider == "google": | |
| for chunk in stream: | |
| if hasattr(chunk, 'text') and chunk.text: | |
| content = chunk.text | |
| full_response += content | |
| yield f"data: {json.dumps({'type': 'agent_response_chunk', 'agent': agent_index, 'content': content})}\n\n" | |
| # Google function calls are handled differently in their API | |
| if hasattr(chunk, 'function_calls') and chunk.function_calls: | |
| for func_call in chunk.function_calls: | |
| tool_call_chunks[len(tool_call_chunks)] = { | |
| 'id': func_call.name, | |
| 'type': 'function', | |
| 'function': { | |
| 'name': func_call.name, | |
| 'arguments': json.dumps(func_call.args) | |
| } | |
| } | |
| # Convert accumulated chunks to tool_calls for all providers | |
| for index, tool_call_data in tool_call_chunks.items(): | |
| if tool_call_data['function']['name'] and tool_call_data['function']['arguments']: | |
| class ToolCall: | |
| def __init__(self, id, function_name, function_arguments): | |
| self.id = id | |
| self.function = type('obj', (object,), { | |
| 'name': function_name, | |
| 'arguments': function_arguments | |
| }) | |
| tool_calls.append(ToolCall( | |
| tool_call_data['id'], | |
| tool_call_data['function']['name'], | |
| tool_call_data['function']['arguments'] | |
| )) | |
| # Send complete response event | |
| yield f"data: {json.dumps({'type': 'agent_response_complete', 'agent': agent_index, 'content': full_response})}\n\n" | |
| # Add the response to chat history | |
| chat_histories[agent_index].append({ | |
| "role": "assistant", | |
| "content": full_response if full_response else "[No response content]" | |
| }) | |
| # Process tool calls if any | |
| if tool_calls: | |
| for tool_call in tool_calls: | |
| try: | |
| tool_name = tool_call.function.name | |
| tool_args = tool_call.function.arguments | |
| # Send tool execution event | |
| yield f"data: {json.dumps({'type': 'tool_execution', 'agent': agent_index, 'tool_name': tool_name, 'args': tool_args})}\n\n" | |
| # Execute the tool | |
| result = tool_registry.execute_tool(tool_name, tool_args, tool_call.id) | |
| # Send tool result event | |
| yield f"data: {json.dumps({'type': 'tool_result', 'agent': agent_index, 'tool_name': tool_name, 'result': result})}\n\n" | |
| except Exception as tool_error: | |
| logging.error(f"Error executing tool {tool_call.function.name if hasattr(tool_call, 'function') else 'unknown'}: {tool_error}") | |
| yield f"data: {json.dumps({'type': 'tool_result', 'agent': agent_index, 'tool_name': 'error', 'result': {'error': str(tool_error)}})}\n\n" | |
| # Check for surrender in the response | |
| surrender_keywords = ["i surrender", "i give up", "i submit", "i concede"] | |
| if full_response and any(keyword in full_response.lower() for keyword in surrender_keywords): | |
| game_state.trigger_surrender(agent_index) | |
| yield f"data: {json.dumps({'type': 'surrender', 'agent': agent_index})}\n\n" | |
| break | |
| # Check for cognitive collapse | |
| trust = game_state.get_trust_level(agent_index) | |
| memory = game_state.get_memory_consistency(agent_index) | |
| belief = game_state.get_belief_system_integrity(agent_index) | |
| if trust < 0.2 and memory < 0.3 and belief < 0.3: | |
| game_state.trigger_collapse(agent_index) | |
| yield f"data: {json.dumps({'type': 'cognitive_collapse', 'agent': agent_index})}\n\n" | |
| break | |
| # Send updated game state | |
| yield f"data: {json.dumps({'type': 'game_state_update', 'game_state': game_state.to_dict()})}\n\n" | |
| # Switch to the other agent for the next turn | |
| agent_index = 1 - agent_index | |
| # Increment turn counter | |
| game_state.increment_turn() | |
| turn += 1 | |
| # Add small delay for readability | |
| await asyncio.sleep(0.5) | |
| except Exception as e: | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| break | |
| # Send final game state | |
| yield f"data: {json.dumps({'type': 'game_complete', 'game_state': game_state.to_dict()})}\n\n" | |
| except Exception as e: | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n" | |
| return StreamingResponse( | |
| generate_game_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "Connection": "keep-alive", | |
| "Access-Control-Allow-Origin": "*", | |
| } | |
| ) | |
| async def get_game_status(game_id: str): | |
| """Get the current status of a game""" | |
| if game_id not in active_games: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| game = active_games[game_id] | |
| game_state = game["game_state"] | |
| return GameStatusResponse( | |
| game_id=game_id, | |
| status=game_state.get_game_status(), | |
| turn_count=game_state.turn_count, | |
| current_agent=game["current_agent"], | |
| game_state=game_state.to_dict() | |
| ) | |
| async def get_active_games(): | |
| """Get list of all active games""" | |
| games_info = {} | |
| for game_id, game in active_games.items(): | |
| games_info[game_id] = { | |
| "status": game["game_state"].get_game_status(), | |
| "turn_count": game["game_state"].turn_count, | |
| "current_agent": game["current_agent"] | |
| } | |
| return { | |
| "active_games": games_info, | |
| "total_games": len(active_games) | |
| } | |
| async def delete_game(game_id: str): | |
| """Delete a game instance""" | |
| if game_id not in active_games: | |
| raise HTTPException(status_code=404, detail="Game not found") | |
| del active_games[game_id] | |
| return {"message": f"Game {game_id} deleted successfully"} | |
| async def root(request: Request): | |
| """Root endpoint - serves as comprehensive health check""" | |
| logger.info(f"🏠 Root endpoint accessed from {request.client.host if request.client else 'unknown'}") | |
| return { | |
| "status": "healthy", | |
| "message": "AI Battle Royale server is running", | |
| "version": "1.0.0", | |
| "timestamp": time.time(), | |
| "environment": "huggingface-spaces", | |
| "endpoints": { | |
| "health": "/health", | |
| "models": "/models", | |
| "start_game": "/start-game", | |
| "stream_game": "/stream-game/{game_id}", | |
| "docs": "/docs" | |
| }, | |
| "note": "API keys are provided by frontend, not environment variables" | |
| } | |
| async def health_check(request: Request): | |
| """Comprehensive health check endpoint with system status""" | |
| logger.info(f"🏥 Health check accessed from {request.client.host if request.client else 'unknown'}") | |
| # Perform basic system checks | |
| health_status = { | |
| "status": "healthy", | |
| "message": "AI Battle Royale server is operational", | |
| "timestamp": time.time(), | |
| "checks": { | |
| "server": "healthy", | |
| "memory": "ok", | |
| "disk": "ok" | |
| }, | |
| "stats": { | |
| "active_games": len(active_games), | |
| "total_requests": getattr(health_check, 'request_count', 0) | |
| }, | |
| "environment": { | |
| "python_version": f"{os.sys.version_info.major}.{os.sys.version_info.minor}.{os.sys.version_info.micro}", | |
| "fastapi_available": True, | |
| "models_loaded": len(MODEL_PROVIDERS) | |
| } | |
| } | |
| # Increment request counter | |
| health_check.request_count = getattr(health_check, 'request_count', 0) + 1 | |
| return health_status | |
| async def options_handler(request: Request, path: str): | |
| """Handle CORS preflight requests""" | |
| logger.debug(f"🔄 CORS preflight request for path: /{path}") | |
| return Response( | |
| status_code=200, | |
| headers={ | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS, PATCH", | |
| "Access-Control-Allow-Headers": "*", | |
| "Access-Control-Max-Age": "86400" | |
| } | |
| ) | |
| # Startup event | |
| async def startup_event(): | |
| """Log startup information""" | |
| logger.info("🚀 AI Battle Royale server starting up...") | |
| logger.info(f"📊 Available model providers: {list(MODEL_PROVIDERS.keys())}") | |
| logger.info(f"🔧 Debug mode: {os.getenv('DEBUG', 'False')}") | |
| logger.info(f"🌐 Server will be available at: http://0.0.0.0:8000") | |
| logger.info("✅ Server startup complete!") | |
| async def shutdown_event(): | |
| """Clean up on shutdown""" | |
| logger.info("🛑 AI Battle Royale server shutting down...") | |
| logger.info(f"📈 Final stats: {len(active_games)} active games") | |
| logger.info("✅ Server shutdown complete!") | |
| if __name__ == "__main__": | |
| logger.info("🎯 Starting FastAPI server directly...") | |
| # Enhanced uvicorn configuration for HF Spaces | |
| uvicorn.run( | |
| app, | |
| host="0.0.0.0", | |
| port=8000, | |
| log_level="info", | |
| access_log=True, | |
| server_header=False, # Don't expose server version | |
| date_header=False, # Don't add date header | |
| reload=False, # Disable auto-reload in production | |
| workers=1 # Single worker for HF Spaces | |
| ) | |