Spaces:
Sleeping
Sleeping
| import pickle | |
| import subprocess | |
| import threading | |
| from filelock import FileLock | |
| import gradio as gr | |
| import multiprocessing | |
| import time | |
| import os | |
| from PIL import Image | |
| import base64 | |
| from io import BytesIO | |
| import numpy as np | |
| from datetime import datetime | |
| from threading import Timer | |
| # Assuming this is the runner for your game pipeline (replace with actual import if different) | |
| from runner.playground_runner import PlaygroundPipelineRunner | |
| # 游戏与关卡的映射字典 | |
| Game_to_Levels = { | |
| "RaceGame": list(range(1, 10)), | |
| "SuperMario": list(range(0, 10)), | |
| "FlappyBird": list(range(1, 8)), | |
| "TempestRun": list(range(0, 5)), | |
| "PongGame": list(range(0, 4)) | |
| } | |
| # 每个游戏的有效动作字典 | |
| valid_actions_dict = { | |
| "RaceGame": ["LEFT", "RIGHT", "UP", "DOWN", "FORWARD", "BACKWARD"], | |
| "PongGame": ["LEFTUP", "LEFTDOWN", "RIGHTUP", "RIGHTDOWN", "NONE"], | |
| "FlappyBird": ["UP", "DOWN", "KEEP", "NONE"], | |
| "SuperMario": ["UP", "LEFT", "RIGHT", "UP+LEFT", "UP+RIGHT", "NONE"], | |
| "TempestRun": ["JUMP", "LEFT", "RIGHT", "SLIDE", "RISE", "NONE"] | |
| } | |
| all_actions = [ | |
| "LEFT", "LEFTUP", "UP+LEFT", "LEFTDOWN", "RIGHT", "RIGHTUP", "UP+RIGHT", "RIGHTDOWN", | |
| "UP", "RISE", "JUMP", "SLIDE", "DOWN", "KEEP", "NONE", "FORWARD", "BACKWARD" | |
| ] | |
| game_pids = {} | |
| alive_game_ids = {} | |
| def remove_old_game_dirs(): | |
| output_dirs = [d for d in os.listdir("./runs") if os.path.isdir(os.path.join("./runs", d))] | |
| for game_id in output_dirs: | |
| if game_id not in alive_game_ids: | |
| run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
| with run_lock: | |
| os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") | |
| elif (datetime.now() - datetime.strptime(alive_game_ids[game_id], '%Y-%m-%d-%H:%M:%S')).days > 1: | |
| # If the game directory is older than 1 day, remove it | |
| run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
| with run_lock: | |
| os.system(f"rm -rf {os.path.join('.', 'runs', game_id)}") | |
| alive_game_ids.pop(game_id) | |
| for game_id in list(alive_game_ids): | |
| if not os.path.exists(os.path.join(".", "runs", game_id)): | |
| alive_game_ids.pop(game_id) | |
| print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), " - Cleaned up old game directories.") | |
| print("Current alive game IDs:", alive_game_ids) | |
| def start_game(game, level, state, req: gr.Request): | |
| """ | |
| Start the game process and yield screenshots for real-time display in Gradio. | |
| Args: | |
| game (str): The selected game name. | |
| level (str): The selected level for the game. | |
| Yields: | |
| PIL.Image: Game screenshots for display. | |
| """ | |
| print(f"Starting {game} at {level}") | |
| # game_id = time.strftime('%Y-%m-%d-%H:%M:%S',time.localtime(time.time())) | |
| game_id = req.session_hash | |
| alive_game_ids[game_id] = datetime.now().strftime('%Y-%m-%d-%H:%M:%S') | |
| # print("start game: game_id: ", game_id) | |
| if game_pids.get(game_id): | |
| # kill the previous game process if it exists | |
| try: | |
| os.kill(game_pids[game_id], 9) | |
| print(f"Killed previous game process with PID: {game_pids[game_id]}") | |
| except Exception as e: | |
| print(f"Error killing previous game process: {e}") | |
| output_dir = os.path.join(".", "runs", game_id) | |
| if not os.path.exists(output_dir): | |
| os.makedirs(output_dir) | |
| action_file = os.path.join(output_dir, f"action_{game_id}.txt") | |
| with open(action_file, "w") as f: | |
| f.write("") | |
| state["action_file"] = action_file | |
| command = f"nohup python3 -u run_game.py --game {game} --level {level} --action_file {action_file} --game_id {game_id} > run.log 2>&1 & echo $!" | |
| with os.popen(command) as f: | |
| # 读取命令的输出,这应该就是 PID | |
| pid_str = f.read().strip() | |
| try: | |
| pid = int(pid_str) | |
| print(f"Game started with PID: {pid}") | |
| game_pids[game_id] = pid | |
| except ValueError: | |
| print(f"Failed to parse PID from command output: {pid_str}") | |
| def write_action(action, state): | |
| action_file = state.get("action_file") | |
| if not action_file: | |
| print("Action file not found in state. Cannot write action.") | |
| return | |
| if not os.path.exists(action_file): | |
| print(f"Action file {action_file} does not exist. Skipping write.") | |
| return | |
| print(f"Writing action: {action} to {action_file}") | |
| with open(action_file, "w") as f: | |
| f.write(action) | |
| def cleanup(req: gr.Request): | |
| game_id = req.session_hash | |
| if game_id in alive_game_ids: | |
| alive_game_ids.pop(game_id) | |
| pid = game_pids.get(game_id) | |
| if pid: | |
| try: | |
| os.kill(pid, 9) | |
| print(f"Killed game process with PID: {pid}") | |
| except Exception as e: | |
| print(f"Error killing game process: {e}") | |
| finally: | |
| remove_old_game_dirs() | |
| def update_image(req: gr.Request=None): | |
| if req is None: | |
| return None | |
| game_id = req.session_hash | |
| output_dir = os.path.join(".", "runs", game_id) | |
| run_lock = FileLock(os.path.join(".", "runs", "run.lock")) | |
| with run_lock: | |
| if not os.path.exists(output_dir): | |
| game_over_image = os.path.join("gameover.jpg") | |
| img_array = np.array(Image.open(game_over_image).convert('RGB')) | |
| return img_array | |
| pkl = os.path.join(output_dir, f"game_{game_id}.pkl") | |
| lock = FileLock(pkl + ".lock") | |
| while(True): | |
| try: | |
| with lock: | |
| info = pickle.load(open(pkl, "rb")) | |
| break | |
| except Exception as e: | |
| print(f"Error reading pickle file: {e}") | |
| time.sleep(0.1) | |
| current_image = info["current_image"] | |
| image_data = base64.b64decode(current_image) | |
| image = Image.open(BytesIO(image_data)).convert('RGB') | |
| image = image.resize((int(image.width * 800 / image.height), 800)) | |
| img_array = np.array(image) | |
| return img_array | |
| with gr.Blocks(title="Game Control Interface") as demo: | |
| state = gr.State(value={}) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| game_dropdown = gr.Dropdown( | |
| choices=["RaceGame", "SuperMario", "FlappyBird", "TempestRun", "PongGame"], | |
| label="Select Game", | |
| value="RaceGame" | |
| ) | |
| level_dropdown = gr.Dropdown( | |
| choices=Game_to_Levels["RaceGame"], | |
| label="Select Level" | |
| ) | |
| start_button = gr.Button("Start") | |
| with gr.Column(scale=8): | |
| screenshot_display = gr.Image(update_image, label="Game Screen", height=800, interactive=False, every=0.5) | |
| game_dropdown.change( | |
| fn=lambda game: gr.update(choices=Game_to_Levels.get(game, [])), | |
| inputs=game_dropdown, | |
| outputs=level_dropdown | |
| ) | |
| start_button.click( | |
| fn=start_game, | |
| inputs=[game_dropdown, level_dropdown, state], | |
| outputs=None | |
| ) | |
| with gr.Row(): | |
| action_buttons = {action: gr.Button(action, visible=True if action in valid_actions_dict.get("RaceGame", []) else False) for action in all_actions} | |
| def update_action_buttons(game): | |
| valid_actions = valid_actions_dict.get(game, []) | |
| return {btn: gr.update(visible=(action in valid_actions)) for action, btn in action_buttons.items()} | |
| game_dropdown.change( | |
| fn=update_action_buttons, | |
| inputs=game_dropdown, | |
| outputs=list(action_buttons.values()) | |
| ) | |
| for action, btn in action_buttons.items(): | |
| btn.click( | |
| fn=write_action, | |
| inputs=[btn, state], | |
| outputs=None, | |
| ) | |
| demo.unload(cleanup) | |
| demo.launch() | |