from flask import Flask, render_template_string, request, redirect, url_for, session, jsonify
import random
import string
import json
import os
from flask_socketio import SocketIO, join_room, leave_room, emit
import hashlib
import time
import threading
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError
from urllib.parse import urlparse, parse_qs
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-very-secret-key-here'
socketio = SocketIO(app, async_mode='threading')
REPO_ID = "flpolprojects/Clients"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
ROOMS_DB = os.path.join(app.root_path, 'rooms.json')
USERS_DB = os.path.join(app.root_path, 'users.json')
GAMES_DB = os.path.join(app.root_path, 'games.json')
def load_json(file_path, default={}):
try:
download_db_from_hf()
if os.path.exists(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
return default
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"Ошибка загрузки JSON из {file_path}: {e}")
return default
except Exception as e:
print(f"Непредвиденная ошибка при загрузке: {e}")
return default
def save_json(file_path, data):
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=4, ensure_ascii=False)
upload_db_to_hf()
except OSError as e:
print(f"Ошибка сохранения JSON в {file_path}: {e}")
except Exception as e:
print(f"Непредвиденная ошибка при сохранении: {e}")
def upload_db_to_hf():
try:
api = HfApi()
for file_path, repo_path in [(ROOMS_DB, "rooms.json"), (USERS_DB, "users.json"), (GAMES_DB, "games.json")]:
if os.path.exists(file_path):
api.upload_file(
path_or_fileobj=file_path,
path_in_repo=repo_path,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Backup: {repo_path} ({datetime.now().strftime('%Y-%m-%d %H:%M:%S')})"
)
except Exception as e:
print(f"Ошибка при загрузке файлов на Hugging Face Hub: {e}")
def download_db_from_hf():
try:
api = HfApi()
for file_path, repo_path in [(ROOMS_DB, "rooms.json"), (USERS_DB, "users.json"), (GAMES_DB, "games.json")]:
try:
hf_hub_download(
repo_id=REPO_ID,
filename=repo_path,
repo_type="dataset",
token=HF_TOKEN_READ,
local_dir=".",
local_dir_use_symlinks=False
)
except RepositoryNotFoundError:
if not os.path.exists(file_path):
with open(file_path, 'w') as f:
json.dump({}, f)
except Exception as e:
print(f"Ошибка при скачивании файла {repo_path}: {e}")
except Exception as e:
print(f"Ошибка при скачивании файлов с Hugging Face Hub: {e}")
def periodic_backup():
while True:
upload_db_to_hf()
time.sleep(15)
rooms = load_json(ROOMS_DB)
users = load_json(USERS_DB)
games_data = load_json(GAMES_DB, default={
"crocodile": {
"name": "Крокодил",
"description": "Один игрок показывает слово жестами.",
"min_players": 2,
"max_players": 10,
"state": {}
},
"alias": {
"name": "Alias",
"description": "Один игрок объясняет слово.",
"min_players": 2,
"max_players": 10,
"state": {}
},
"mafia": {
"name": "Мафия",
"description": "Мафия против мирных жителей.",
"min_players": 4,
"max_players": 10,
"state": {}
},
"durak": {
"name": "Дурак",
"description": "Карточная игра.",
"min_players": 2,
"max_players": 6,
"state": {}
}
})
save_json(GAMES_DB, games_data)
def generate_token():
return ''.join(random.choices(string.ascii_letters + string.digits, k=15))
def hash_password(password):
return hashlib.sha256(password.encode('utf-8')).hexdigest()
def get_youtube_id(url):
if not url:
return None
parsed_url = urlparse(url)
if parsed_url.hostname in ('www.youtube.com', 'youtube.com'):
if parsed_url.path == '/watch':
query = parse_qs(parsed_url.query)
return query.get('v', [None])[0]
elif parsed_url.path.startswith('/embed/'):
return parsed_url.path.split('/embed/')[1].split('?')[0]
elif parsed_url.path.startswith('/v/'):
return parsed_url.path.split('/v/')[1].split('?')[0]
elif parsed_url.hostname in ('youtu.be', 'www.youtu.be'):
return parsed_url.path[1:].split('?')[0]
return None
@app.route('/', methods=['GET', 'POST'])
def index():
if 'username' in session:
return redirect(url_for('dashboard'))
if request.method == 'POST':
action = request.form.get('action')
username = request.form.get('username')
password = request.form.get('password')
if action == 'register':
if username in users:
return "Пользователь уже существует", 400
users[username] = {'password': hash_password(password), 'rooms': []}
save_json(USERS_DB, users)
session['username'] = username
return redirect(url_for('dashboard'))
elif action == 'login':
if username in users and users[username]['password'] == hash_password(password):
session['username'] = username
return redirect(url_for('dashboard'))
return "Неверный логин или пароль", 401
return render_template_string('''
Room
Room
''')
@app.route('/dashboard', methods=['GET', 'POST'])
def dashboard():
if 'username' not in session:
return redirect(url_for('index'))
user = users.get(session['username'])
if not user:
session.pop('username', None)
return redirect(url_for('index'))
if request.method == 'POST':
action = request.form.get('action')
if action == 'create':
token = generate_token()
rooms[token] = {'users': [session['username']], 'max_users': 10, 'admin': session['username'], 'current_game': None, 'guests': []}
users[session['username']]['rooms'].append(token)
save_json(ROOMS_DB, rooms)
save_json(USERS_DB, users)
return redirect(url_for('room', token=token))
elif action == 'join':
token = request.form.get('token')
if token in rooms and len(rooms[token]['users']) + len(rooms[token]['guests']) < rooms[token]['max_users']:
if session['username'] not in rooms[token]['users']:
rooms[token]['users'].append(session['username'])
users[session['username']]['rooms'].append(token)
save_json(ROOMS_DB, rooms)
save_json(USERS_DB, users)
return redirect(url_for('room', token=token))
return "Комната не найдена или переполнена", 404
return render_template_string('''
Панель управления
Добро пожаловать, {{ session['username'] }}
''', session=session)
@app.route('/logout', methods=['POST'])
def logout():
session.pop('username', None)
return redirect(url_for('index'))
@app.route('/room/')
def room(token):
if 'username' not in session and 'guest_id' not in session:
return redirect(url_for('index'))
if token not in rooms:
return redirect(url_for('dashboard'))
is_admin = rooms[token]['admin'] == session.get('username')
if 'username' in session:
username = session['username']
is_guest = False
elif 'guest_id' in session:
username = session['guest_id']
is_guest = True
if username not in rooms[token]['guests']:
rooms[token]['guests'].append(username)
save_json(ROOMS_DB, rooms)
else:
return redirect(url_for('guest_login', token=token))
return render_template_string('''
Метавселенная - Комната {{ token }}
Нажмите, чтобы войти в метавселенную
Движение: W, A, S, D
Осмотр: Мышь
Взаимодействие: E
На мобильных устройствах используйте джойстик и правую часть экрана
+
Нажмите [E] для взаимодействия
''', token=token, session=session, is_admin=is_admin, games_data=games_data, username=username, is_guest=is_guest)
@app.route('/join_as_guest/', methods=['GET'])
def join_as_guest(token):
if token not in rooms:
return "Комната не найдена", 404
guest_id = 'Гость_' + ''.join(random.choices(string.digits, k=4))
session['guest_id'] = guest_id
if 'username' in session:
session.pop('username')
return redirect(url_for('room', token=token))
@app.route('/guest_login/')
def guest_login(token):
if token not in rooms:
return "Комната не найдена", 404
return render_template_string('''
Вход для гостей
''', token=token)
@socketio.on('connect')
def handle_connect():
print(f"Client connected: {request.sid}")
@socketio.on('disconnect')
def handle_disconnect_custom():
print(f"Client disconnected: {request.sid}")
@socketio.on('join')
def handle_join(data):
token = data['token']
username = data['username']
join_room(token)
if token not in rooms:
return
all_users = rooms[token]['users'] + rooms[token]['guests']
emit('init_users', {'users': rooms[token]['users'], 'guests': rooms[token]['guests']}, to=request.sid)
emit('user_joined', {
'username': username,
'users': rooms[token]['users'],
'guests': rooms[token]['guests']
}, room=token, include_self=False)
if rooms[token]['current_game']:
game_id = rooms[token]['current_game']
emit('game_started', {'game_id': game_id}, to=request.sid)
if token in games_data.get(game_id, {}).get('state', {}):
emit('update_game_state', {'game_id': game_id, 'state': games_data[game_id]['state'][token]}, to=request.sid)
@socketio.on('leave')
def handle_leave(data):
token = data['token']
username = data['username']
is_guest = data.get('is_guest', False)
leave_room(token)
if token in rooms:
if is_guest:
if username in rooms[token]['guests']:
rooms[token]['guests'].remove(username)
else:
if username in rooms[token]['users']:
rooms[token]['users'].remove(username)
if not rooms[token]['users'] and not rooms[token]['guests']:
del rooms[token]
else:
emit('user_left', {'username': username}, room=token)
save_json(ROOMS_DB, rooms)
@socketio.on('player_move')
def handle_player_move(data):
token = data['token']
if token in rooms:
emit('player_moved', {
'username': data['username'],
'pos': data['pos'],
'quat': data['quat']
}, room=token, include_self=False)
@socketio.on('signal')
def handle_signal(data):
if data['token'] in rooms:
emit('signal', {
'from': data['from'],
'signal': data['signal']
}, room=data['to'])
@socketio.on('start_game')
def handle_start_game(data):
token = data['token']
game_id = data['game_id']
username = session.get('username')
if token in rooms and rooms[token].get('admin') == username:
all_users_in_room = rooms[token]['users'] + rooms[token]['guests']
min_p = games_data[game_id]['min_players']
max_p = games_data[game_id]['max_players']
if not (min_p <= len(all_users_in_room) <= max_p):
return
rooms[token]['current_game'] = game_id
if game_id not in games_data: games_data[game_id] = {'state': {}}
elif 'state' not in games_data[game_id]: games_data[game_id]['state'] = {}
games_data[game_id]['state'][token] = {}
save_json(ROOMS_DB, rooms)
save_json(GAMES_DB, games_data)
emit('game_started', {'game_id': game_id}, room=token)
@socketio.on('set_game_state')
def handle_set_game_state(data):
token = data['token']
game_id = data['game_id']
state = data['state']
username = session.get('username')
if token in rooms and rooms[token].get('admin') == username:
if rooms[token]['current_game'] == game_id:
all_users = rooms[token]['users'] + rooms[token]['guests']
state['players'] = all_users
if (game_id == 'crocodile' or game_id == 'alias') and 'presenter' not in state:
state['presenter'] = random.choice(all_users) if all_users else None
if game_id == 'mafia' and not state.get('roles'):
state['roles'] = assign_mafia_roles(all_users)
games_data[game_id]['state'][token] = state
save_json(GAMES_DB, games_data)
emit('update_game_state', {'game_id': game_id, 'state': state}, room=token)
if state.get('isRunning') and (game_id == 'crocodile' or game_id == 'alias'):
start_timer(token, game_id)
def assign_mafia_roles(users):
num_players = len(users)
num_mafia = 1 if num_players < 6 else 2
roles_list = ['mafia'] * num_mafia + ['civilian'] * (num_players - num_mafia)
random.shuffle(roles_list)
return {user: role for user, role in zip(users, roles_list)}
@socketio.on('game_action')
def handle_game_action(data):
token = data['token']
game_id = data['game_id']
action = data['action']
user = data['user']
value = data.get('value')
if token not in rooms or game_id != rooms[token].get('current_game'): return
current_state = games_data[game_id]['state'].get(token, {})
if not current_state.get('isRunning'): return
if game_id in ['crocodile', 'alias'] and action == 'guess':
if user == current_state.get('presenter'): return
result = "Угадано!" if value.lower() == current_state.get('word', '').lower() else "Не угадано"
current_state.setdefault('guesses', []).append({'user': user, 'value': value, 'result': result})
if result == "Угадано!": current_state['isRunning'] = False
elif game_id == 'mafia' and action == 'vote':
if current_state['phase'] == 'day':
current_state.setdefault('votes', {})[user] = value
all_players = current_state.get('players', [])
if len(current_state['votes']) == len(all_players):
process_mafia_votes(current_state)
games_data[game_id]['state'][token] = current_state
save_json(GAMES_DB, games_data)
emit('update_game_state', {'game_id': game_id, 'state': current_state}, room=token)
def process_mafia_votes(state):
votes = state.get('votes', {})
counts = {}
for vote in votes.values(): counts[vote] = counts.get(vote, 0) + 1
max_votes = 0
voted_out = []
for player, num_votes in counts.items():
if num_votes > max_votes:
max_votes = num_votes
voted_out = [player]
elif num_votes == max_votes:
voted_out.append(player)
if len(voted_out) == 1:
player_to_remove = voted_out[0]
state['players'].remove(player_to_remove)
state['killed'] = player_to_remove
remaining_mafia = sum(1 for p in state['players'] if state['roles'].get(p) == 'mafia')
remaining_civilians = len(state['players']) - remaining_mafia
if remaining_mafia == 0:
state['winner'] = 'Мирные'
state['isRunning'] = False
elif remaining_mafia >= remaining_civilians:
state['winner'] = 'Мафия'
state['isRunning'] = False
state['phase'] = 'night'
state['votes'] = {}
def start_timer(token, game_id):
def timer_loop():
with app.app_context():
while True:
state = games_data[game_id]['state'].get(token)
if not state or not state.get('isRunning') or state.get('timer', 0) <= 0:
if state:
state['isRunning'] = False
games_data[game_id]['state'][token] = state
save_json(GAMES_DB, games_data)
socketio.emit('update_game_state', {'game_id': game_id, 'state': state}, room=token)
break
state['timer'] -= 1
games_data[game_id]['state'][token] = state
save_json(GAMES_DB, games_data)
socketio.emit('update_game_state', {'game_id': game_id, 'state': state}, room=token)
socketio.sleep(1)
socketio.start_background_task(timer_loop)
if __name__ == '__main__':
if not os.path.exists(os.path.join(app.root_path, 'rooms.json')):
with open(os.path.join(app.root_path, 'rooms.json'), 'w') as f: json.dump({}, f)
if not os.path.exists(os.path.join(app.root_path, 'users.json')):
with open(os.path.join(app.root_path, 'users.json'), 'w') as f: json.dump({}, f)
if not os.path.exists(os.path.join(app.root_path, 'games.json')):
save_json(GAMES_DB, games_data)
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
#backup_thread.start()
# ВАЖНО: для работы WebRTC нужен HTTPS. Werkzeug создает самоподписанный сертификат.
# Вам нужно будет разрешить его в браузере (на странице "Дополнительно" -> "Перейти на сайт (небезопасно)").
# Для продакшена используйте полноценный веб-сервер (Nginx) с настоящими SSL-сертификатами.
try:
socketio.run(app, host='0.0.0.0', port=7860, debug=True, allow_unsafe_werkzeug=True, ssl_context='adhoc')
except TypeError:
socketio.run(app, host='0.0.0.0', port=7860, debug=True, allow_unsafe_werkzeug=True)