| from fastapi import FastAPI, HTTPException, Request, Response |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import JSONResponse, FileResponse |
| import requests |
| import os |
| import random |
| from typing import List, Dict, Any, Optional |
| from functools import lru_cache |
| from .game import Game |
| from ..models.pokemon import Pokemon |
| from ..utils.data_loader import data_loader |
| from ..models.moveset import get_strategic_moveset, get_all_pokemon_sets, get_random_battle_ready_pokemon, get_battle_ready_pokemon_list, BATTLE_ONLY_FORM_SUFFIXES |
| from ..utils.pokemon_utils import POKEAPI_NAME_MAP, get_mandatory_item |
| import json |
| import re |
| from dotenv import load_dotenv |
| from supabase import create_client, Client |
|
|
| load_dotenv() |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| supabase_url = os.getenv("SUPABASE_URL") |
| supabase_key = os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| bucket_name = os.getenv("SUPABASE_BUCKET_NAME", "pokemon-music") |
|
|
| supabase: Client = None |
| if supabase_url and supabase_key and "PASTE_YOUR" not in supabase_key: |
| try: |
| supabase = create_client(supabase_url, supabase_key) |
| print("✓ Supabase client initialized") |
| except Exception as e: |
| print(f"Error initializing Supabase client: {e}") |
|
|
| game_instance = None |
|
|
| |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| DATA_DIR = os.path.join(BASE_DIR, 'data') |
|
|
| |
| POKEMON_ID_MAP = {} |
| try: |
| id_map_path = os.path.join(DATA_DIR, 'pokemon_ids.json') |
| with open(id_map_path, 'r') as f: |
| POKEMON_ID_MAP = json.load(f) |
| print(f"✓ Loaded {len(POKEMON_ID_MAP)} Pokémon ID mappings") |
| except Exception as e: |
| print(f"⚠️ Warning: Could not load pokemon_ids.json at {id_map_path}: {e}") |
|
|
| music_path = os.path.join(os.path.dirname(__file__), 'music') |
| if os.path.exists(music_path): |
| app.mount("/api/music/local", StaticFiles(directory=music_path), name="music") |
|
|
|
|
| def get_public_base_url(request: Request) -> str: |
| configured = os.getenv("PUBLIC_BASE_URL") or os.getenv("NEXT_PUBLIC_API_URL") |
| if configured: |
| return configured.rstrip('/') |
|
|
| forwarded_proto = request.headers.get("x-forwarded-proto") |
| forwarded_host = request.headers.get("x-forwarded-host") |
| host = forwarded_host or request.headers.get("host") |
|
|
| if host: |
| scheme = forwarded_proto or request.url.scheme |
| if scheme == "http" and host not in {"localhost", "127.0.0.1"} and not host.startswith("localhost:") and not host.startswith("127.0.0.1:"): |
| scheme = "https" |
| return f"{scheme}://{host}" |
|
|
| return str(request.base_url).rstrip('/') |
|
|
| @app.get("/api/audio/signed-url/{filename:path}") |
| async def get_signed_url(filename: str, request: Request): |
| if not supabase: |
| print(f"[AUDIO] ⚠️ Supabase not configured. Falling back to local: {filename}") |
| base_url = get_public_base_url(request) |
| return {"url": f"{base_url}/api/music/local/{filename}", "source": "local"} |
| |
| try: |
| response = supabase.storage.from_(bucket_name).create_signed_url(filename, 3600) |
| if "error" in response: |
| print(f"[AUDIO] ❌ Supabase error: {response['error']}") |
| base_url = get_public_base_url(request) |
| return {"url": f"{base_url}/api/music/local/{filename}", "source": "local"} |
| |
| print(f"[AUDIO] ✅ Serving {filename} from Supabase") |
| return {"url": response["signedURL"], "source": "supabase"} |
| except Exception as e: |
| print(f"[AUDIO] ❌ Exception: {e}") |
| base_url = get_public_base_url(request) |
| return {"url": f"{base_url}/api/music/local/{filename}", "source": "local"} |
|
|
| @app.get("/api/audio/battle-tracks") |
| async def get_battle_tracks(): |
| if not supabase: |
| return {"success": False, "tracks": []} |
|
|
| try: |
| entries = supabase.storage.from_(bucket_name).list("battle") |
| tracks = [] |
|
|
| for entry in entries or []: |
| name = entry.get("name") if isinstance(entry, dict) else None |
| if not name: |
| continue |
|
|
| lower_name = name.lower() |
| if not lower_name.endswith((".mp3", ".ogg", ".wav", ".m4a", ".flac")): |
| continue |
|
|
| tracks.append(f"battle/{name}") |
|
|
| return {"success": True, "tracks": sorted(tracks)} |
| except Exception as e: |
| print(f"[AUDIO] ❌ Failed to list battle tracks: {e}") |
| return {"success": False, "tracks": []} |
|
|
| @app.get("/api/pokemon/cry/{pokemon_name}") |
| async def pokemon_cry(pokemon_name: str): |
| try: |
| |
| normalized = pokemon_name.lower().replace(' ', '').replace('-', '') |
| api_name = POKEAPI_NAME_MAP.get(normalized, pokemon_name.lower()) |
| pokemon_id = POKEMON_ID_MAP.get(api_name) or POKEMON_ID_MAP.get(normalized) |
| |
| |
| identifier = str(pokemon_id) if pokemon_id else api_name |
| |
| url = f'https://pokeapi.co/api/v2/pokemon/{identifier}' |
| response = requests.get(url, timeout=5) |
| if response.status_code == 200: |
| pokemon_data = response.json() |
| cry_url = None |
| |
| if 'cries' in pokemon_data and 'latest' in pokemon_data['cries']: |
| cry_url = pokemon_data['cries']['latest'] |
| |
| if not cry_url: |
| pokemon_id = pokemon_data.get('id') |
| if pokemon_id: |
| cry_url = f"https://raw.githubusercontent.com/PokeAPI/cries/main/cries/pokemon/latest/{pokemon_id}.ogg" |
| |
| if cry_url: |
| cry_response = requests.get(cry_url, timeout=5) |
| if cry_response.status_code == 200: |
| return Response(content=cry_response.content, media_type='audio/ogg') |
| |
| fallback_path = os.path.join(music_path, 'nidorino.ogg') |
| if os.path.exists(fallback_path): |
| return FileResponse(fallback_path) |
| return Response(status_code=404, content="Cry not found") |
| except Exception as e: |
| print(f"Error fetching Pokémon cry: {e}") |
| fallback_path = os.path.join(music_path, 'nidorino.ogg') |
| if os.path.exists(fallback_path): |
| return FileResponse(fallback_path) |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| from ..utils.pokemon_api import get_best_sprite, get_pokemon_data, to_display_name |
|
|
| def get_pokemon_moves(pokemon_data): |
| pokemon_name = pokemon_data['name'].lower() |
| strategic_moves = get_strategic_moveset(pokemon_name, debug=False) |
| |
| if strategic_moves: |
| return [{'name': move_name} for move_name in strategic_moves[:4]] |
| return [{'name': 'tackle'}, {'name': 'growl'}] |
|
|
| @lru_cache(maxsize=1) |
| def get_comprehensive_pokemon_list() -> List[str]: |
| |
| try: |
| names = set(get_battle_ready_pokemon_list()) |
| except: |
| names = set() |
| |
| |
| if 'POKEMON_ID_MAP' in globals() and POKEMON_ID_MAP: |
| for n in POKEMON_ID_MAP.keys(): |
| names.add(n) |
| |
| |
| try: |
| names_path = os.path.join(DATA_DIR, 'all_pokemon_names.json') |
| if os.path.exists(names_path): |
| with open(names_path, 'r') as f: |
| api_names = json.load(f) |
| for n in api_names: |
| names.add(n) |
| except Exception as e: |
| print(f"Error loading PokeAPI names: {e}") |
| |
| |
| filtered_names = [] |
| lower_suffixes = [s.lower() for s in BATTLE_ONLY_FORM_SUFFIXES] |
| for n in names: |
| if any(n.lower().endswith(s) for s in lower_suffixes): |
| continue |
| filtered_names.append(n) |
| |
| return sorted(list(set(filtered_names))) |
|
|
| @app.get("/api/search-pokemon") |
| async def search_pokemon(q: str = "", sets_only: bool = False): |
| query = q.lower().strip() |
| if not query or len(query) < 2: |
| return {"success": False, "results": []} |
| |
| try: |
| |
| all_names = get_comprehensive_pokemon_list() |
| |
| |
| matches = [p for p in all_names if query in p.lower()][:20] |
| |
| results = [] |
| for pokemon_name in matches: |
| |
| moveset = get_strategic_moveset(pokemon_name, debug=False) |
| has_sets = bool(moveset) |
| |
| if sets_only and not has_sets: |
| continue |
| |
| normalized = pokemon_name.lower().replace(' ', '').replace('-', '').replace('.', '') |
| api_name = POKEAPI_NAME_MAP.get(normalized, pokemon_name.lower().replace(' ', '-').replace('.', '')) |
| |
| |
| if any(r['name'] == api_name for r in results): |
| continue |
| |
| |
| |
| results.append({ |
| 'name': api_name, |
| 'display_name': to_display_name(pokemon_name), |
| 'item': get_mandatory_item(pokemon_name) or '', |
| 'moveset': moveset[:4] if moveset else ['Tackle'], |
| 'has_sets': has_sets |
| }) |
| |
| if len(results) >= 15: |
| break |
| |
| return {"success": True, "results": results} |
| except Exception as e: |
| print(f"Error searching Pokémon: {e}") |
| return {"success": False, "results": []} |
|
|
| @app.get("/api/get_moveset/{pokemon_name}") |
| async def get_moveset(pokemon_name: str): |
| try: |
| moveset = get_strategic_moveset(pokemon_name, debug=False) |
| if moveset: |
| return {'success': True, 'moves': moveset[:4]} |
| return {'success': True, 'moves': ['tackle', 'growl', 'scratch', 'leer']} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/get_all_sets/{pokemon_name}") |
| async def get_all_sets(pokemon_name: str): |
| try: |
| all_sets = get_all_pokemon_sets(pokemon_name, debug=False) |
| if all_sets: |
| flattened_sets = [] |
| for format_name, sets in all_sets.items(): |
| for set_name, set_data in sets.items(): |
| moves = set_data.get('moves', [])[:4] |
| if moves: |
| flattened_sets.append({ |
| 'format': format_name, |
| 'set_name': set_name, |
| 'moves': moves, |
| 'item': set_data.get('item', ''), |
| 'ability': set_data.get('ability', ''), |
| 'nature': set_data.get('nature', ''), |
| 'evs': set_data.get('evs', {}) |
| }) |
| return {'success': True, 'sets': flattened_sets} |
| return {'success': False, 'error': 'No sets found'} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/search-pokemon-optimized") |
| async def search_pokemon_optimized(q: str = ""): |
| return await search_pokemon(q) |
|
|
| @app.get("/api/battle-ready-pokemon") |
| async def get_battle_ready_list(): |
| try: |
| return {"success": True, "pokemon": get_battle_ready_pokemon_list()} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| @app.get("/api/pokemon/{name}/moves") |
| async def get_pokemon_learnset_api(name: str): |
| moves = data_loader.get_pokemon_moves(name) |
| return moves |
|
|
| @app.get("/api/all-moves") |
| async def get_all_moves_list(): |
| try: |
| from ..utils.data_loader import data_loader |
| moves = sorted(list(set([m['name'] for m in data_loader.moves_data.values() if 'name' in m]))) |
| return {"success": True, "moves": moves} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| @app.get("/api/items") |
| async def get_all_items_list(): |
| try: |
| from ..utils.data_loader import data_loader |
| |
| items = [] |
| for item_id, item_data in data_loader.items_data.items(): |
| name = item_data.get('name') |
| if name: |
| items.append(name) |
| return {"success": True, "items": sorted(list(set(items)))} |
| except Exception as e: |
| return {"success": False, "error": str(e)} |
|
|
| @app.post("/api/start") |
| async def start_game(request: Request): |
| global game_instance |
| try: |
| data = await request.json() |
| |
| |
| player_team_raw = data.get('team') |
| if isinstance(player_team_raw, list): |
| |
| pass |
| elif player_team_raw == 'random': |
| player_team_raw = [] |
| while len(player_team_raw) < 6: |
| name = get_random_battle_ready_pokemon() |
| if not any(p['name'].lower() == name.lower() for p in player_team_raw): |
| player_team_raw.append({'name': name}) |
| elif not player_team_raw: |
| |
| p_name = data.get('pokemon', 'pikachu').lower() |
| p_set = data.get('selected_set', {}) |
| player_team_raw = [{ |
| 'name': p_name, |
| 'ability': p_set.get('ability'), |
| 'item': p_set.get('item'), |
| 'moves': p_set.get('moves', []), |
| 'shiny': p_set.get('shiny', False) |
| }] |
| |
| opponent_choice = data.get('opponent', 'charizard').lower() |
| opponent_team_raw = [] |
| battle_mode = data.get('mode', '1v1') |
| if opponent_choice == 'random': |
| team_size = 6 if battle_mode == '6v6' else 1 |
| for _ in range(team_size): |
| name = get_random_battle_ready_pokemon() |
| while any(p['name'].lower() == name.lower() for p in opponent_team_raw): |
| name = get_random_battle_ready_pokemon() |
| opponent_team_raw.append({'name': name}) |
| elif battle_mode == '6v6': |
| opponent_team_raw = [{'name': opponent_choice}] |
| while len(opponent_team_raw) < 6: |
| name = get_random_battle_ready_pokemon() |
| if name.lower() not in [p['name'].lower() for p in opponent_team_raw]: |
| opponent_team_raw.append({'name': name}) |
| else: |
| opponent_team_raw = [{'name': opponent_choice}] |
|
|
| player_team_processed = [] |
| for p in player_team_raw: |
| p_data = get_pokemon_data(p['name']) |
| if not p_data: |
| continue |
| moves = [{'name': m} for m in p.get('moves', [])] |
| if not moves: |
| moves = get_pokemon_moves(p_data) |
| |
| mandatory = get_mandatory_item(p['name']) |
| config = { |
| 'name': p_data['name'], |
| 'types': [t['type']['name'] for t in p_data['types']], |
| 'sprite_url': get_best_sprite(p_data, side='back', shiny=p.get('shiny', False)), |
| 'stats': p_data['stats'], |
| 'moves': moves, |
| 'cry_url': p_data.get('cries', {}).get('latest', ''), |
| 'ability': p.get('ability') or p_data.get('abilities', [{}])[0].get('ability', {}).get('name', 'noability'), |
| 'item': p.get('item') or mandatory or '' |
| } |
| player_team_processed.append(config) |
| |
| opponent_team_processed = [] |
| for o in opponent_team_raw: |
| o_data = get_pokemon_data(o['name']) |
| if not o_data: |
| continue |
| o_name = o['name'].lower() |
| |
| o_moves = [] |
| o_config = {} |
| o_sets = get_all_pokemon_sets(o_name) |
| if o_sets: |
| all_sets = [] |
| for fmt in o_sets: |
| for set_name, s in o_sets[fmt].items(): |
| all_sets.append(s) |
| |
| if all_sets: |
| best_set = random.choice(all_sets) |
| mandatory = get_mandatory_item(o_name) |
| o_config = { |
| 'evs': best_set.get('evs', {}), |
| 'ivs': best_set.get('ivs', {}), |
| 'nature': best_set.get('nature', 'Hardy'), |
| 'ability': best_set.get('ability', 'noability'), |
| 'item': best_set.get('item', mandatory or '') |
| } |
| if best_set.get('moves'): |
| o_moves = [{'name': m} for m in best_set['moves']] |
| |
| if not o_moves: |
| o_moves = get_pokemon_moves(o_data) |
| |
| config = { |
| 'name': o_data['name'], |
| 'types': [t['type']['name'] for t in o_data['types']], |
| 'sprite_url': get_best_sprite(o_data, side='front', shiny=False), |
| 'stats': o_data['stats'], |
| 'moves': o_moves, |
| 'cry_url': o_data.get('cries', {}).get('latest', ''), |
| **o_config |
| } |
| opponent_team_processed.append(config) |
|
|
| game_instance = Game() |
| initial_events = game_instance.start_battle(player_team_processed, opponent_team_processed) |
| |
| |
| return { |
| "success": True, |
| "weather": game_instance.weather, |
| "player_pokemon": { |
| "name": game_instance.player_pokemon.name, |
| "current_hp": game_instance.player_pokemon.current_hp, |
| "max_hp": game_instance.player_pokemon.max_hp, |
| "sprite": game_instance.player_pokemon.sprite_url, |
| "cry_url": game_instance.player_pokemon.cry_url, |
| "types": game_instance.player_pokemon.types, |
| "level": game_instance.player_pokemon.level, |
| "status_effects": game_instance.player_pokemon.get_status_display(), |
| "substitute_hp": game_instance.player_pokemon.substitute_hp, |
| "can_mega_evolve": game_instance.can_mega_evolve(True) |
| }, |
| "opponent_pokemon": { |
| "name": game_instance.opponent_pokemon.name, |
| "current_hp": game_instance.opponent_pokemon.current_hp, |
| "max_hp": game_instance.opponent_pokemon.max_hp, |
| "sprite": game_instance.opponent_pokemon.sprite_url, |
| "cry_url": game_instance.opponent_pokemon.cry_url, |
| "types": game_instance.opponent_pokemon.types, |
| "level": game_instance.opponent_pokemon.level, |
| "status_effects": game_instance.opponent_pokemon.get_status_display(), |
| "substitute_hp": game_instance.opponent_pokemon.substitute_hp |
| }, |
| "player_moves": [m.to_dict() for m in game_instance.player_pokemon.moves.values()], |
| "player_team": [p.to_dict() for p in game_instance.player_team], |
| "opponent_team": [p.to_dict() for p in game_instance.opponent_team], |
| "start_events": initial_events |
| } |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/random-team") |
| async def get_random_team(): |
| try: |
| team = [] |
| max_attempts = 20 |
| attempts = 0 |
| while len(team) < 6 and attempts < max_attempts: |
| attempts += 1 |
| name = get_random_battle_ready_pokemon() |
| |
| if any(p['name'].lower() == name.lower() for p in team): |
| continue |
| |
| p_data = get_pokemon_data(name) |
| |
| p_sets = get_all_pokemon_sets(name) |
| best_set = None |
| if p_sets: |
| all_sets = [] |
| for fmt in p_sets: |
| for set_name, s in p_sets[fmt].items(): |
| all_sets.append(s) |
| if all_sets: |
| best_set = random.choice(all_sets) |
| |
| config = { |
| 'name': p_data['name'], |
| 'types': [t['type']['name'] for t in p_data['types']], |
| 'sprite_url': get_best_sprite(p_data, side='front', shiny=False), |
| 'ability': best_set.get('ability', 'Unknown') if best_set else 'Unknown', |
| 'item': best_set.get('item', 'None') if best_set else 'None' |
| } |
| team.append(config) |
| return {"success": True, "team": team} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| @app.get("/api/team") |
| async def get_team(): |
| global game_instance |
| if not game_instance: |
| return {"player_team": []} |
| return { |
| "player_team": [ |
| { |
| "index": i, |
| "name": p.name, |
| "hp": p.current_hp, |
| "max_hp": p.max_hp, |
| "is_fainted": p.is_fainted(), |
| "sprite": p.sprite_url, |
| "status": p.get_status_display() |
| } for i, p in enumerate(game_instance.player_team) |
| ] |
| } |
|
|
| @app.post("/api/move") |
| async def move(request: Request): |
| global game_instance |
| if game_instance is None: |
| raise HTTPException(status_code=400, detail="Battle not initialized") |
|
|
| data = await request.json() |
| move_name = data.get('move') |
| switch_index = data.get('switch_index') |
| mega = data.get('mega', False) |
| |
| turn_info = game_instance.process_turn(move_name=move_name, switch_index=switch_index, mega=mega) |
| |
| if 'action_order' in turn_info: |
| del turn_info['action_order'] |
| |
| response_data = { |
| "success": True, |
| "player_pokemon": { |
| "name": game_instance.player_pokemon.name, |
| "current_hp": game_instance.player_pokemon.current_hp, |
| "max_hp": game_instance.player_pokemon.max_hp, |
| "sprite": game_instance.player_pokemon.sprite_url, |
| "cry_url": game_instance.player_pokemon.cry_url, |
| "types": game_instance.player_pokemon.types, |
| "level": game_instance.player_pokemon.level, |
| "status_effects": game_instance.player_pokemon.get_status_display(), |
| "substitute_hp": game_instance.player_pokemon.substitute_hp, |
| "can_mega_evolve": game_instance.can_mega_evolve(True) |
| }, |
| "opponent_pokemon": { |
| "name": game_instance.opponent_pokemon.name, |
| "current_hp": game_instance.opponent_pokemon.current_hp, |
| "max_hp": game_instance.opponent_pokemon.max_hp, |
| "sprite": game_instance.opponent_pokemon.sprite_url, |
| "cry_url": game_instance.opponent_pokemon.cry_url, |
| "types": game_instance.opponent_pokemon.types, |
| "level": game_instance.opponent_pokemon.level, |
| "status_effects": game_instance.opponent_pokemon.get_status_display(), |
| "substitute_hp": game_instance.opponent_pokemon.substitute_hp |
| }, |
| "player_moves": [m.to_dict() for m in game_instance.player_pokemon.moves.values()], |
| "player_team": [p.to_dict() for p in game_instance.player_team], |
| "opponent_team": [p.to_dict() for p in game_instance.opponent_team], |
| "turn_info": turn_info, |
| "pending_player_switch": game_instance.pending_player_self_switch, |
| "is_game_over": game_instance.battle_over, |
| "battle_result": game_instance.get_battle_result() if game_instance.battle_over else None |
| } |
| |
| print(f"DEBUG: Move {move_name} result: {list(response_data.keys())}") |
| if response_data["turn_info"]: |
| print(f"DEBUG: Turn Info Events: {len(response_data['turn_info'].get('battle_events', []))}") |
| |
| return response_data |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| port = int(os.getenv("PORT", 7860)) |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|