import gradio as gr import requests import random import string import time import json import os from filelock import FileLock # --- КОНФИГУРАЦИЯ --- TOTAL_ROUNDS = 3 POLLINATIONS_URL = "https://text.pollinations.ai/" DB_FILE = "game_db.json" LOCK_FILE = f"{DB_FILE}.lock" MAX_CHARS = 150 # --- БАЗА ДАННЫХ (SELF-HEALING) --- def load_db(): # Если файла нет - создаем if not os.path.exists(DB_FILE): return {} lock = FileLock(LOCK_FILE) try: with lock: if os.path.getsize(DB_FILE) == 0: return {} # Пустой файл with open(DB_FILE, "r", encoding="utf-8") as f: content = f.read() if not content.strip(): return {} return json.loads(content) except Exception as e: print(f"⚠️ DB Error: {e}. Resetting DB.") # Если база сломана - сбрасываем её, чтобы приложение не падало return {} def save_db(data): lock = FileLock(LOCK_FILE) try: with lock: with open(DB_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: print(f"⚠️ Save Error: {e}") # --- AI --- def ask_ai_judge(event, player_action): prompt = ( "Roleplay: Ты рассказчик.\n" f"КАТАСТРОФА: {event}\nДЕЙСТВИЕ: {player_action}\n" "Напиши историю из 3 частей (разделитель '---').\n" "В конце добавь [Survived] или [Not_Survived]." ) try: r = requests.post(POLLINATIONS_URL, data=prompt.encode('utf-8')) return r.text if r.status_code == 200 else "Ошибка ИИ.\n---\n---\n[Survived]" except: return "Связь потеряна.\n---\n---\n[Survived]" def ask_ai_congrats(name, score): try: return requests.get(f"{POLLINATIONS_URL}Поздравь {name} с {score} очками.").text except: return f"Поздравляем {name}!" # --- LOGIC --- def generate_code(): return ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) def add_log(room, msg): room["logs"].append(f"[{time.strftime('%H:%M:%S')}] {msg}") def format_lore(text): if not text: return "" parts = text.split("---") if len(parts) < 3: return f"**История:**\n{text}" return f"### 🎬 АКТ 1\n{parts[0]}\n\n### ⚡ АКТ 2\n{parts[1]}\n\n### 🏁 ФИНАЛ\n{parts[2]}" # --- HANDLERS --- def create_room(name): if not name: return None, "Введите имя!", gr.update(), gr.update() db, code = load_db(), generate_code() db[code] = { "host": name, "phase": "LOBBY", "round": 1, "event": "", "logs": [], "winner_msg": "", "players": {name: {"name": name, "score": 0, "action": None, "result": "", "is_host": True}} } save_db(db) return code, "", gr.update(visible=False), gr.update(visible=True) def join_room(name, code): if not name or not code: return None, "Данные?", gr.update(), gr.update() code = code.upper().strip() db = load_db() if code not in db: return None, "Нет комнаты!", gr.update(), gr.update() room = db[code] if name not in room["players"]: room["players"][name] = {"name": name, "score": 0, "action": None, "result": "", "is_host": False} save_db(db) return code, "", gr.update(visible=False), gr.update(visible=True) def act(code, name, txt): if not code: return db = load_db() if code not in db: return room = db[code] p = room["players"].get(name) if not p: return save = False if room["phase"] == "LOBBY" and p["is_host"]: room["phase"] = "INPUT"; save=True elif room["phase"] == "INPUT": if not room["event"] and p["is_host"] and txt: room["event"] = txt[:150]; save=True for pl in room["players"].values(): pl["action"]=None elif room["event"] and txt and not p["action"]: p["action"] = txt[:150]; save=True if all(pl["action"] for pl in room["players"].values()): room["phase"] = "PROCESSING"; save_db(db); process_ai(code); return elif room["phase"] == "RESULTS" and p["is_host"]: if room["round"] < TOTAL_ROUNDS: room["round"]+=1; room["event"]=""; room["phase"]="INPUT" else: room["phase"]="GAMEOVER"; w=max(room["players"].values(), key=lambda x:x["score"]) room["winner_msg"]=ask_ai_congrats(w["name"], w["score"]) save=True elif room["phase"] == "GAMEOVER" and p["is_host"]: room["phase"]="LOBBY"; room["round"]=1; room["event"]="" for pl in room["players"].values(): pl["score"]=0; pl["action"]=None; pl["result"]="" save=True if save: save_db(db) def process_ai(code): db = load_db() room = db[code] for p in room["players"].values(): lore = ask_ai_judge(room["event"], p["action"]) p["result"] = lore if "[Survived]" in lore: p["score"] += 2 room["phase"] = "RESULTS" save_db(db) # --- REFRESH (PROTECTED) --- def refresh(code, name): # Безопасный возврат, если данных нет empty_ui = (gr.update(visible=True), gr.update(visible=False), "", "", "", gr.update(), gr.update(), "") try: if not code: return empty_ui db = load_db() if code not in db: return empty_ui room = db[code] me = room["players"].get(name) if not me: return empty_ui # Generate UI pl_md = "| Stat | Name | Score |\n|---|---|---|\n" for p in room["players"].values(): s = "✅" if "[Survived]" in p["result"] else "💀" if "[Not_Survived]" in p["result"] else "⏳" pl_md += f"| {s} | {p['name']} | {p['score']} |\n" ph = room["phase"] inp, btn, hdr = gr.update(visible=False), gr.update(visible=False), "" if ph == "LOBBY": hdr = "⏳ ОЖИДАНИЕ" if me["is_host"]: btn = gr.update(visible=True, value="СТАРТ", variant="primary") elif ph == "INPUT": if not room["event"]: hdr = "🎬 ХОСТ ВЫБИРАЕТ" if me["is_host"]: inp = gr.update(visible=True, label="Угроза") btn = gr.update(visible=True, value="ЗАДАТЬ") else: hdr = f"⚠️ {room['event']}" if not me["action"]: inp = gr.update(visible=True, label="Действие") btn = gr.update(visible=True, value="ОТПРАВИТЬ") elif ph == "PROCESSING": hdr = "🤖 ИИ ДУМАЕТ" elif ph == "RESULTS": hdr = "🏁 ИТОГИ" if me["is_host"]: btn = gr.update(visible=True, value="ДАЛЕЕ") elif ph == "GAMEOVER": hdr = f"🏆 ПОБЕДА\n{room['winner_msg']}" if me["is_host"]: btn = gr.update(visible=True, value="РЕСТАРТ") return ( gr.update(visible=False), gr.update(visible=True), "\n".join(room["logs"][-10:]), pl_md, hdr, inp, btn, format_lore(me["result"]) ) except Exception as e: print(f"REFRESH ERROR: {e}") # Возвращаем хоть что-то, чтобы не крашить UI return empty_ui # --- LAUNCH --- with gr.Blocks(title="AI Survival") as demo: s_c, s_n = gr.State(""), gr.State("") gr.Markdown("# 💀 AI SURVIVAL") with gr.Group() as g_log: with gr.Row(): n_in = gr.Textbox(label="Name"); b_c = gr.Button("Create"); c_in = gr.Textbox(label="Code"); b_j = gr.Button("Join") err = gr.Markdown("") with gr.Group(visible=False) as g_game: with gr.Row(): with gr.Column(): m_rm = gr.Markdown("--"); m_pl = gr.Markdown("--"); log = gr.TextArea(lines=10, interactive=False) with gr.Column(): m_st = gr.Markdown("--"); m_lr = gr.Markdown("--"); t_in = gr.Textbox(visible=False); b_ac = gr.Button(visible=False) # 2 секунды таймер (снижаем нагрузку) tmr = gr.Timer(2) tmr.tick(refresh, [s_c, s_n], [g_log, g_game, log, m_pl, m_st, t_in, b_ac, m_lr]) b_c.click(create_room, [n_in], [s_c, err, g_log, g_game]).then(lambda c: f"🏠 {c}", [s_c], [m_rm]) b_j.click(join_room, [n_in, c_in], [s_c, err, g_log, g_game]).then(lambda c: f"🏠 {c}", [s_c], [m_rm]) b_ac.click(act, [s_c, s_n, t_in], None).then(lambda: "", None, [t_in]) if __name__ == "__main__": if not os.path.exists(DB_FILE): with open(DB_FILE, "w") as f: f.write("{}") demo.launch(server_name="0.0.0.0", server_port=7860)