Spaces:
Configuration error
Configuration error
| """ | |
| Real-Time AI Game Viewer | |
| ------------------------- | |
| Web interface to monitor AI agents, chat, and game state in real-time. | |
| """ | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| from collections import OrderedDict | |
| from flask import Flask, render_template, jsonify, Response | |
| import html | |
| app = Flask(__name__, | |
| template_folder='templates', | |
| static_folder='static') | |
| # Ensure Flask preserves JSON key order | |
| app.config['JSON_SORT_KEYS'] = False | |
| LOGS_DIR = Path("examples/ai_testing/my_games/ai_logs") | |
| SESSION_FILE = Path("examples/ai_testing/my_games/current_session.txt") | |
| def get_current_session(): | |
| """Get current session directory.""" | |
| if SESSION_FILE.exists(): | |
| try: | |
| with open(SESSION_FILE, 'r', encoding='utf-8') as f: | |
| session_path = f.read().strip() | |
| session_dir = Path(session_path) | |
| if session_dir.exists(): | |
| print(f"โ Found current session: {session_dir.name}") | |
| return session_dir | |
| except Exception as e: | |
| print(f"Error reading session file: {e}") | |
| # Fallback: get most recent session | |
| if LOGS_DIR.exists(): | |
| sessions = sorted(LOGS_DIR.glob("session_*"), reverse=True) | |
| if sessions: | |
| print(f"โ Using most recent session: {sessions[0].name}") | |
| return sessions[0] | |
| print("โ No session found") | |
| return None | |
| def get_all_sessions(): | |
| """Get list of all session directories.""" | |
| if not LOGS_DIR.exists(): | |
| return [] | |
| sessions = [] | |
| current_session_path = get_current_session() | |
| for session_dir in sorted(LOGS_DIR.glob("session_*"), reverse=True): | |
| sessions.append({ | |
| "name": session_dir.name, | |
| "path": str(session_dir.absolute()), | |
| "is_current": current_session_path and str(session_dir.absolute()) == str(current_session_path.absolute()) | |
| }) | |
| return sessions | |
| def get_session_data(session_path): | |
| """Get all data for a specific session.""" | |
| session_dir = Path(session_path) | |
| if not session_dir.exists(): | |
| return None | |
| print(f"Loading session data from: {session_dir}") | |
| # Get structured requests data (new format) | |
| requests_file = session_dir / "requests.json" | |
| requests_data = [] | |
| if requests_file.exists(): | |
| try: | |
| with open(requests_file, 'r', encoding='utf-8') as f: | |
| # Use object_pairs_hook to preserve key order | |
| data = json.load(f, object_pairs_hook=OrderedDict) | |
| requests_data = data.get("requests", []) | |
| print(f" โ Loaded {len(requests_data)} requests") | |
| except Exception as e: | |
| print(f" โ Error loading requests: {e}") | |
| else: | |
| print(f" โน No requests file found") | |
| # Get player logs (old format - keep for backwards compatibility) | |
| players = {} | |
| for player_file in session_dir.glob("player_*.md"): | |
| player_name = player_file.stem.replace("player_", "").upper() | |
| try: | |
| with open(player_file, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| players[player_name] = { | |
| "name": player_name, | |
| "content": content, | |
| "html": html.escape(content) | |
| } | |
| print(f" โ Loaded player: {player_name}") | |
| except Exception as e: | |
| print(f" โ Error loading {player_name}: {e}") | |
| players[player_name] = { | |
| "name": player_name, | |
| "content": f"Error loading: {e}", | |
| "html": html.escape(f"Error loading: {e}") | |
| } | |
| # Get chat history | |
| chat_file = session_dir / "chat_history.json" | |
| chat_messages = [] | |
| if chat_file.exists(): | |
| try: | |
| with open(chat_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| chat_messages = data.get("messages", []) | |
| print(f" โ Loaded {len(chat_messages)} chat messages") | |
| except Exception as e: | |
| print(f" โ Error loading chat: {e}") | |
| chat_messages = [{"error": str(e)}] | |
| else: | |
| print(f" โน No chat history file") | |
| # Get agent memories | |
| memory_file = session_dir / "agent_memories.json" | |
| memories = {} | |
| if memory_file.exists(): | |
| try: | |
| with open(memory_file, 'r', encoding='utf-8') as f: | |
| memories = json.load(f) | |
| print(f" โ Loaded memories for {len(memories)} players") | |
| except Exception as e: | |
| print(f" โ Error loading memories: {e}") | |
| memories = {} | |
| else: | |
| print(f" โน No memories file") | |
| # Calculate statistics | |
| new_requests_count = sum(1 for req in requests_data if req.get("is_new", False)) | |
| players_with_requests = {} | |
| for req in requests_data: | |
| player = req["player_name"] | |
| if player not in players_with_requests: | |
| players_with_requests[player] = {"total": 0, "new": 0} | |
| players_with_requests[player]["total"] += 1 | |
| if req.get("is_new", False): | |
| players_with_requests[player]["new"] += 1 | |
| return { | |
| "session_name": session_dir.name, | |
| "session_path": str(session_dir), | |
| "requests": requests_data, | |
| "new_requests_count": new_requests_count, | |
| "players_stats": players_with_requests, | |
| "players": players, | |
| "chat": chat_messages, | |
| "memories": memories | |
| } | |
| def index(): | |
| """Main page - use dynamic viewer.""" | |
| sessions = get_all_sessions() | |
| current_session = get_current_session() | |
| return render_template('viewer_dynamic.html', | |
| sessions=sessions, | |
| current_session=str(current_session) if current_session else None) | |
| def enhanced_viewer(): | |
| """Enhanced viewer (old version).""" | |
| sessions = get_all_sessions() | |
| current_session = get_current_session() | |
| return render_template('viewer_enhanced.html', | |
| sessions=sessions, | |
| current_session=str(current_session) if current_session else None) | |
| def old_viewer(): | |
| """Old viewer for backwards compatibility.""" | |
| sessions = get_all_sessions() | |
| current_session = get_current_session() | |
| return render_template('viewer.html', | |
| sessions=sessions, | |
| current_session=str(current_session) if current_session else None) | |
| def api_sessions(): | |
| """Get list of all sessions.""" | |
| return jsonify(get_all_sessions()) | |
| def api_session(session_path): | |
| """Get data for specific session.""" | |
| data = get_session_data(session_path) | |
| if data is None: | |
| return jsonify({"error": "Session not found"}), 404 | |
| return jsonify(data) | |
| def api_current(): | |
| """Get current session data.""" | |
| current = get_current_session() | |
| if current is None: | |
| return jsonify({"error": "No active session"}), 404 | |
| data = get_session_data(str(current)) | |
| if data is None: | |
| return jsonify({"error": "Session not found"}), 404 | |
| return jsonify(data) | |
| def api_mark_viewed(session_path, request_id): | |
| """Mark a specific request as viewed.""" | |
| session_dir = Path(session_path) | |
| requests_file = session_dir / "requests.json" | |
| if not requests_file.exists(): | |
| return jsonify({"error": "Requests file not found"}), 404 | |
| try: | |
| with open(requests_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f, object_pairs_hook=OrderedDict) | |
| # Mark request as viewed | |
| found = False | |
| for req in data.get("requests", []): | |
| if req["request_id"] == request_id: | |
| req["is_new"] = False | |
| found = True | |
| break | |
| if not found: | |
| return jsonify({"error": "Request not found"}), 404 | |
| # Save back (preserving order) | |
| with open(requests_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| return jsonify({"success": True}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def api_mark_all_viewed(session_path): | |
| """Mark all requests as viewed.""" | |
| session_dir = Path(session_path) | |
| requests_file = session_dir / "requests.json" | |
| if not requests_file.exists(): | |
| return jsonify({"error": "Requests file not found"}), 404 | |
| try: | |
| with open(requests_file, 'r', encoding='utf-8') as f: | |
| data = json.load(f, object_pairs_hook=OrderedDict) | |
| # Mark all as viewed | |
| for req in data.get("requests", []): | |
| req["is_new"] = False | |
| # Save back (preserving order) | |
| with open(requests_file, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| return jsonify({"success": True, "marked_count": len(data.get("requests", []))}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| print("="*80) | |
| print("๐ AI Game Viewer Starting...") | |
| print("="*80) | |
| print(f"\n๐ Watching: {LOGS_DIR.absolute()}") | |
| print(f"๐ Session file: {SESSION_FILE.absolute()}") | |
| print(f"\n๐ Open in browser: http://localhost:5001") | |
| print("\nโ ๏ธ Note: Game board is on http://localhost:5000") | |
| print(" This viewer (5001) shows AI requests/responses") | |
| print(" Game board (5000) shows the actual Catan board") | |
| print("\n" + "="*80 + "\n") | |
| # Check current session on startup | |
| get_current_session() | |
| app.run(debug=False, host='0.0.0.0', port=5001) | |