Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import random | |
| import string | |
| import time | |
| import json | |
| import os | |
| from filelock import FileLock | |
| # --- КОНФИГУРАЦИЯ --- | |
| TOTAL_ROUNDS = 3 | |
| POLLINATIONS_URL = "https://text.pollinations.ai/" | |
| DB_FILE = "game_db.json" | |
| LOCK_FILE = f"{DB_FILE}.lock" | |
| MAX_CHARS = 150 | |
| # --- БАЗА ДАННЫХ (SELF-HEALING) --- | |
| def load_db(): | |
| # Если файла нет - создаем | |
| if not os.path.exists(DB_FILE): | |
| return {} | |
| lock = FileLock(LOCK_FILE) | |
| try: | |
| with lock: | |
| if os.path.getsize(DB_FILE) == 0: return {} # Пустой файл | |
| with open(DB_FILE, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if not content.strip(): return {} | |
| return json.loads(content) | |
| except Exception as e: | |
| print(f"⚠️ DB Error: {e}. Resetting DB.") | |
| # Если база сломана - сбрасываем её, чтобы приложение не падало | |
| return {} | |
| def save_db(data): | |
| lock = FileLock(LOCK_FILE) | |
| try: | |
| with lock: | |
| with open(DB_FILE, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"⚠️ Save Error: {e}") | |
| # --- AI --- | |
| def ask_ai_judge(event, player_action): | |
| prompt = ( | |
| "Roleplay: Ты рассказчик.\n" | |
| f"КАТАСТРОФА: {event}\nДЕЙСТВИЕ: {player_action}\n" | |
| "Напиши историю из 3 частей (разделитель '---').\n" | |
| "В конце добавь [Survived] или [Not_Survived]." | |
| ) | |
| try: | |
| r = requests.post(POLLINATIONS_URL, data=prompt.encode('utf-8')) | |
| return r.text if r.status_code == 200 else "Ошибка ИИ.\n---\n---\n[Survived]" | |
| except: return "Связь потеряна.\n---\n---\n[Survived]" | |
| def ask_ai_congrats(name, score): | |
| try: return requests.get(f"{POLLINATIONS_URL}Поздравь {name} с {score} очками.").text | |
| except: return f"Поздравляем {name}!" | |
| # --- LOGIC --- | |
| def generate_code(): | |
| return ''.join(random.choices(string.ascii_uppercase + string.digits, k=4)) | |
| def add_log(room, msg): | |
| room["logs"].append(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
| def format_lore(text): | |
| if not text: return "" | |
| parts = text.split("---") | |
| if len(parts) < 3: return f"**История:**\n{text}" | |
| return f"### 🎬 АКТ 1\n{parts[0]}\n\n### ⚡ АКТ 2\n{parts[1]}\n\n### 🏁 ФИНАЛ\n{parts[2]}" | |
| # --- HANDLERS --- | |
| def create_room(name): | |
| if not name: return None, "Введите имя!", gr.update(), gr.update() | |
| db, code = load_db(), generate_code() | |
| db[code] = { | |
| "host": name, "phase": "LOBBY", "round": 1, "event": "", "logs": [], "winner_msg": "", | |
| "players": {name: {"name": name, "score": 0, "action": None, "result": "", "is_host": True}} | |
| } | |
| save_db(db) | |
| return code, "", gr.update(visible=False), gr.update(visible=True) | |
| def join_room(name, code): | |
| if not name or not code: return None, "Данные?", gr.update(), gr.update() | |
| code = code.upper().strip() | |
| db = load_db() | |
| if code not in db: return None, "Нет комнаты!", gr.update(), gr.update() | |
| room = db[code] | |
| if name not in room["players"]: | |
| room["players"][name] = {"name": name, "score": 0, "action": None, "result": "", "is_host": False} | |
| save_db(db) | |
| return code, "", gr.update(visible=False), gr.update(visible=True) | |
| def act(code, name, txt): | |
| if not code: return | |
| db = load_db() | |
| if code not in db: return | |
| room = db[code] | |
| p = room["players"].get(name) | |
| if not p: return | |
| save = False | |
| if room["phase"] == "LOBBY" and p["is_host"]: | |
| room["phase"] = "INPUT"; save=True | |
| elif room["phase"] == "INPUT": | |
| if not room["event"] and p["is_host"] and txt: | |
| room["event"] = txt[:150]; save=True | |
| for pl in room["players"].values(): pl["action"]=None | |
| elif room["event"] and txt and not p["action"]: | |
| p["action"] = txt[:150]; save=True | |
| if all(pl["action"] for pl in room["players"].values()): | |
| room["phase"] = "PROCESSING"; save_db(db); process_ai(code); return | |
| elif room["phase"] == "RESULTS" and p["is_host"]: | |
| if room["round"] < TOTAL_ROUNDS: | |
| room["round"]+=1; room["event"]=""; room["phase"]="INPUT" | |
| else: | |
| room["phase"]="GAMEOVER"; w=max(room["players"].values(), key=lambda x:x["score"]) | |
| room["winner_msg"]=ask_ai_congrats(w["name"], w["score"]) | |
| save=True | |
| elif room["phase"] == "GAMEOVER" and p["is_host"]: | |
| room["phase"]="LOBBY"; room["round"]=1; room["event"]="" | |
| for pl in room["players"].values(): pl["score"]=0; pl["action"]=None; pl["result"]="" | |
| save=True | |
| if save: save_db(db) | |
| def process_ai(code): | |
| db = load_db() | |
| room = db[code] | |
| for p in room["players"].values(): | |
| lore = ask_ai_judge(room["event"], p["action"]) | |
| p["result"] = lore | |
| if "[Survived]" in lore: p["score"] += 2 | |
| room["phase"] = "RESULTS" | |
| save_db(db) | |
| # --- REFRESH (PROTECTED) --- | |
| def refresh(code, name): | |
| # Безопасный возврат, если данных нет | |
| empty_ui = (gr.update(visible=True), gr.update(visible=False), "", "", "", gr.update(), gr.update(), "") | |
| try: | |
| if not code: return empty_ui | |
| db = load_db() | |
| if code not in db: return empty_ui | |
| room = db[code] | |
| me = room["players"].get(name) | |
| if not me: return empty_ui | |
| # Generate UI | |
| pl_md = "| Stat | Name | Score |\n|---|---|---|\n" | |
| for p in room["players"].values(): | |
| s = "✅" if "[Survived]" in p["result"] else "💀" if "[Not_Survived]" in p["result"] else "⏳" | |
| pl_md += f"| {s} | {p['name']} | {p['score']} |\n" | |
| ph = room["phase"] | |
| inp, btn, hdr = gr.update(visible=False), gr.update(visible=False), "" | |
| if ph == "LOBBY": | |
| hdr = "⏳ ОЖИДАНИЕ" | |
| if me["is_host"]: btn = gr.update(visible=True, value="СТАРТ", variant="primary") | |
| elif ph == "INPUT": | |
| if not room["event"]: | |
| hdr = "🎬 ХОСТ ВЫБИРАЕТ" | |
| if me["is_host"]: | |
| inp = gr.update(visible=True, label="Угроза") | |
| btn = gr.update(visible=True, value="ЗАДАТЬ") | |
| else: | |
| hdr = f"⚠️ {room['event']}" | |
| if not me["action"]: | |
| inp = gr.update(visible=True, label="Действие") | |
| btn = gr.update(visible=True, value="ОТПРАВИТЬ") | |
| elif ph == "PROCESSING": hdr = "🤖 ИИ ДУМАЕТ" | |
| elif ph == "RESULTS": | |
| hdr = "🏁 ИТОГИ" | |
| if me["is_host"]: btn = gr.update(visible=True, value="ДАЛЕЕ") | |
| elif ph == "GAMEOVER": | |
| hdr = f"🏆 ПОБЕДА\n{room['winner_msg']}" | |
| if me["is_host"]: btn = gr.update(visible=True, value="РЕСТАРТ") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| "\n".join(room["logs"][-10:]), | |
| pl_md, | |
| hdr, | |
| inp, | |
| btn, | |
| format_lore(me["result"]) | |
| ) | |
| except Exception as e: | |
| print(f"REFRESH ERROR: {e}") | |
| # Возвращаем хоть что-то, чтобы не крашить UI | |
| return empty_ui | |
| # --- LAUNCH --- | |
| with gr.Blocks(title="AI Survival") as demo: | |
| s_c, s_n = gr.State(""), gr.State("") | |
| gr.Markdown("# 💀 AI SURVIVAL") | |
| with gr.Group() as g_log: | |
| with gr.Row(): | |
| n_in = gr.Textbox(label="Name"); b_c = gr.Button("Create"); c_in = gr.Textbox(label="Code"); b_j = gr.Button("Join") | |
| err = gr.Markdown("") | |
| with gr.Group(visible=False) as g_game: | |
| with gr.Row(): | |
| with gr.Column(): m_rm = gr.Markdown("--"); m_pl = gr.Markdown("--"); log = gr.TextArea(lines=10, interactive=False) | |
| with gr.Column(): m_st = gr.Markdown("--"); m_lr = gr.Markdown("--"); t_in = gr.Textbox(visible=False); b_ac = gr.Button(visible=False) | |
| # 2 секунды таймер (снижаем нагрузку) | |
| tmr = gr.Timer(2) | |
| tmr.tick(refresh, [s_c, s_n], [g_log, g_game, log, m_pl, m_st, t_in, b_ac, m_lr]) | |
| b_c.click(create_room, [n_in], [s_c, err, g_log, g_game]).then(lambda c: f"🏠 {c}", [s_c], [m_rm]) | |
| b_j.click(join_room, [n_in, c_in], [s_c, err, g_log, g_game]).then(lambda c: f"🏠 {c}", [s_c], [m_rm]) | |
| b_ac.click(act, [s_c, s_n, t_in], None).then(lambda: "", None, [t_in]) | |
| if __name__ == "__main__": | |
| if not os.path.exists(DB_FILE): | |
| with open(DB_FILE, "w") as f: f.write("{}") | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |