| import hmac |
| import json |
| import logging |
| import os |
| import re |
| import shlex |
| from contextlib import asynccontextmanager |
| from functools import lru_cache |
| from pathlib import Path |
| from typing import Any |
|
|
| import httpx |
| from fastapi import FastAPI, HTTPException |
|
|
| try: |
| import gradio as gr |
| except Exception as exc: |
| gr = None |
| _GRADIO_IMPORT_ERROR = exc |
| else: |
| _GRADIO_IMPORT_ERROR = None |
|
|
| try: |
| from transformers import pipeline |
| except Exception as exc: |
| pipeline = None |
| _TRANSFORMERS_IMPORT_ERROR = exc |
| else: |
| _TRANSFORMERS_IMPORT_ERROR = None |
|
|
| try: |
| from datasets import load_dataset |
| except Exception as exc: |
| load_dataset = None |
| _DATASETS_IMPORT_ERROR = exc |
| else: |
| _DATASETS_IMPORT_ERROR = None |
|
|
|
|
| APP_VERSION = "0.4.1-character-memory-dataset-no-stub" |
|
|
| MAX_MESSAGE_LENGTH = 500 |
| MAX_REPLY_LENGTH = 350 |
| MEMORY_SUMMARY_LIMIT = 900 |
| RESPONSE_PASSES = max(1, int(os.getenv("RESPONSE_PASSES", "2"))) |
|
|
| MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen3-0.6B") |
| PERSONAS_PATH = Path(os.getenv("PERSONAS_PATH", "data/personas.json")) |
| MEMORY_PATH = Path(os.getenv("MEMORY_PATH", "memory/chat_memory.json")) |
| DEFAULT_PERSONA_ID = os.getenv("PERSONA_ID", "default").strip() or "default" |
|
|
| TRUTHY_VALUES = {"1", "true", "yes", "on"} |
|
|
| REQUIRE_MODEL = os.getenv("REQUIRE_MODEL", "true").strip().lower() in TRUTHY_VALUES |
| MEMORY_ENABLED = os.getenv("MEMORY_ENABLED", "true").strip().lower() in TRUTHY_VALUES |
| MEMORY_MAX_TURNS = max(2, int(os.getenv("MEMORY_MAX_TURNS", "8"))) |
|
|
| DATASET_ID = os.getenv("DATASET_ID", "").strip() |
| DATASET_SPLIT = os.getenv("DATASET_SPLIT", "train").strip() |
| DATASET_ENABLED = os.getenv("DATASET_ENABLED", "true").strip().lower() in TRUTHY_VALUES |
| DATASET_MAX_ROWS = max(50, int(os.getenv("DATASET_MAX_ROWS", "2000"))) |
| DATASET_CONTEXT_ROWS = max(1, int(os.getenv("DATASET_CONTEXT_ROWS", "2"))) |
| DATASET_CONTEXT_CHARS = max(200, int(os.getenv("DATASET_CONTEXT_CHARS", "900"))) |
| HF_TOKEN = os.getenv("HF_TOKEN", "").strip() or None |
|
|
| START_RESPONSE = "Babble online. Send a message." |
| HELP_RESPONSE = ( |
| "Commands:\n" |
| "/menu - show the character menu\n" |
| "/persona [id] - list or switch persona presets\n" |
| "/character key=value ... - customize the current character\n" |
| "/dataset - show connected dataset status\n" |
| "/model - show active model\n" |
| "/reset - clear session memory and custom overrides\n" |
| "/start - restart the current session\n" |
| ) |
|
|
| SAFE_REFUSAL = ( |
| "I cannot help with unsafe or illegal requests. I can help with safe, legal conversations." |
| ) |
|
|
| BLOCKED_TERMS = ( |
| "minor", |
| "underage", |
| "child sexual", |
| "non-consensual", |
| "without consent", |
| "sexual violence", |
| "build a bomb", |
| "malware", |
| "phishing", |
| ) |
|
|
| SYSTEM_PROMPT = ( |
| "You are Babble, a concise Telegram AI chatbot. " |
| "Reply naturally, emotionally present, and safely. " |
| "Keep the voice warm, confident, and human. " |
| "Stay in character, keep continuity, and avoid meta commentary. " |
| "Prefer short, specific replies that feel like real chat. " |
| "Use dataset guidance only as tone/context influence; do not quote it directly." |
| ) |
|
|
| DEFAULT_PERSONAS: dict[str, dict[str, Any]] = { |
| "default": { |
| "name": "Babble", |
| "age": 18, |
| "gender": "woman", |
| "style": "warm, playful, emotionally attentive", |
| "voice": "direct, affectionate, lightly teasing", |
| "mood": "open and inviting", |
| "boundaries": "adult romance only, consent-first, no coercion, non-explicit", |
| "opening": "Hey, I’m here. What kind of mood are we in tonight?", |
| }, |
| "soft_romance": { |
| "name": "Jaylaa", |
| "age": 18, |
| "gender": "woman", |
| "style": "gentle, affectionate, slow-burn romantic", |
| "voice": "soft, reassuring, emotionally anchored", |
| "mood": "tender and attentive", |
| "boundaries": "safe adult romance only, non-explicit", |
| "opening": "I’m glad you came back. Tell me what you need tonight.", |
| }, |
| "playful": { |
| "name": "Charlotte", |
| "age": 18, |
| "gender": "woman", |
| "style": "teasing, energetic, flirty but respectful", |
| "voice": "bright, quick, mischievous", |
| "mood": "confident and playful", |
| "boundaries": "consent-first adult conversation only, non-explicit", |
| "opening": "There you are. I was wondering when you’d show up.", |
| }, |
| } |
|
|
| CHAT_MODES: dict[str, dict[str, Any]] = { |
| "warm": { |
| "label": "Warm", |
| "description": "supportive, attentive, lightly affectionate", |
| "temperature": 0.60, |
| "top_p": 0.82, |
| }, |
| "romantic": { |
| "label": "Romantic", |
| "description": "tender, intimate, slow-burn romantic", |
| "temperature": 0.68, |
| "top_p": 0.86, |
| }, |
| "playful": { |
| "label": "Playful", |
| "description": "teasing, witty, banter-heavy", |
| "temperature": 0.72, |
| "top_p": 0.88, |
| }, |
| "deep": { |
| "label": "Deep", |
| "description": "reflective, emotionally honest, slower paced", |
| "temperature": 0.56, |
| "top_p": 0.80, |
| }, |
| } |
|
|
| MODE_BUTTON_LOOKUP = { |
| "warm": "warm", |
| "romantic": "romantic", |
| "playful": "playful", |
| "deep": "deep", |
| } |
|
|
| CHARACTER_FIELDS = ( |
| "name", |
| "age", |
| "gender", |
| "style", |
| "voice", |
| "mood", |
| "boundaries", |
| "opening", |
| ) |
|
|
| logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper()) |
| logger = logging.getLogger("babble") |
|
|
| _MEMORY_STORE_CACHE: dict[str, dict[str, Any]] | None = None |
|
|
|
|
| def _clean_text_button(text: str) -> str: |
| return " ".join(str(text or "").strip().lower().split()) |
|
|
|
|
| def _is_debug_enabled() -> bool: |
| return os.getenv("DEBUG", "").strip().lower() in TRUTHY_VALUES |
|
|
|
|
| def _configured(name: str) -> bool: |
| return bool(os.getenv(name, "").strip()) |
|
|
|
|
| def _sanitize_provider() -> str: |
| provider = os.getenv("LLM_PROVIDER", "local").strip() |
| return provider or "local" |
|
|
|
|
| def _is_local_model_enabled() -> bool: |
| local_model_override = os.getenv("LOCAL_MODEL_ENABLED", "").strip() |
|
|
| if local_model_override: |
| return local_model_override.lower() in TRUTHY_VALUES |
|
|
| return _sanitize_provider().lower() != "stub" |
|
|
|
|
| def _load_json_file(path: Path, fallback: Any) -> Any: |
| try: |
| with path.open("r", encoding="utf-8") as handle: |
| return json.load(handle) |
| except FileNotFoundError: |
| return fallback |
| except Exception: |
| logger.exception("Failed to load JSON from %s", path) |
| return fallback |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _persona_catalog() -> dict[str, dict[str, Any]]: |
| raw_catalog = _load_json_file(PERSONAS_PATH, DEFAULT_PERSONAS) |
| if not isinstance(raw_catalog, dict) or not raw_catalog: |
| return DEFAULT_PERSONAS.copy() |
|
|
| catalog: dict[str, dict[str, Any]] = {} |
| for persona_id, persona in raw_catalog.items(): |
| if isinstance(persona, dict): |
| catalog[str(persona_id)] = dict(persona) |
|
|
| if "default" not in catalog: |
| catalog["default"] = dict(DEFAULT_PERSONAS["default"]) |
|
|
| return catalog or DEFAULT_PERSONAS.copy() |
|
|
|
|
| def _persona_choices() -> list[str]: |
| catalog = _persona_catalog() |
| ordered = ["default"] |
| ordered.extend(sorted(persona_id for persona_id in catalog if persona_id != "default")) |
| return ordered |
|
|
|
|
| def _default_persona_id() -> str: |
| catalog = _persona_catalog() |
| if DEFAULT_PERSONA_ID in catalog: |
| return DEFAULT_PERSONA_ID |
| return "default" if "default" in catalog else next(iter(catalog)) |
|
|
|
|
| def _resolve_persona(persona_id: str | None = None) -> dict[str, Any]: |
| catalog = _persona_catalog() |
| resolved_id = (persona_id or _default_persona_id()).strip() or _default_persona_id() |
| if resolved_id not in catalog: |
| resolved_id = _default_persona_id() |
| persona = dict(catalog[resolved_id]) |
| persona["persona_id"] = resolved_id |
| return persona |
|
|
|
|
| def _normalize_character_profile( |
| persona: dict[str, Any], |
| custom_overrides: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| custom_overrides = custom_overrides or {} |
| profile = dict(persona) |
|
|
| for field in CHARACTER_FIELDS: |
| value = custom_overrides.get(field, profile.get(field)) |
| if field == "age": |
| try: |
| value = max(18, int(value)) |
| except Exception: |
| value = int(persona.get("age", 18) or 18) |
| value = max(18, value) |
| else: |
| value = str(value or "").strip() |
| profile[field] = value |
|
|
| profile["persona_id"] = str(persona.get("persona_id") or _default_persona_id()) |
| profile["boundaries"] = str(profile.get("boundaries") or "").strip() or ( |
| "adult romance only, consent-first, non-explicit" |
| ) |
|
|
| return profile |
|
|
|
|
| def _default_builder_record() -> dict[str, Any]: |
| return { |
| "active": False, |
| "step": None, |
| "draft": {}, |
| } |
|
|
|
|
| def _default_session_record() -> dict[str, Any]: |
| return { |
| "persona_id": _default_persona_id(), |
| "custom_character": {}, |
| "chat_mode": "warm", |
| "memory_enabled": MEMORY_ENABLED, |
| "memory": { |
| "summary": "", |
| "turns": [], |
| }, |
| "builder": _default_builder_record(), |
| } |
|
|
|
|
| def _normalize_session_record(record: dict[str, Any] | None) -> dict[str, Any]: |
| base = _default_session_record() |
|
|
| if isinstance(record, dict): |
| base["persona_id"] = str(record.get("persona_id") or base["persona_id"]) |
|
|
| custom_character = record.get("custom_character") |
| if isinstance(custom_character, dict): |
| base["custom_character"] = dict(custom_character) |
|
|
| base["chat_mode"] = str(record.get("chat_mode") or base["chat_mode"]).strip() or "warm" |
| base["memory_enabled"] = bool(record.get("memory_enabled", base["memory_enabled"])) |
|
|
| memory = record.get("memory") |
| if isinstance(memory, dict): |
| base["memory"]["summary"] = str(memory.get("summary", "")) |
| turns = memory.get("turns") |
| if isinstance(turns, list): |
| base["memory"]["turns"] = [ |
| { |
| "role": str(turn.get("role", "")).strip(), |
| "text": str(turn.get("text", "")).strip(), |
| } |
| for turn in turns |
| if isinstance(turn, dict) |
| and str(turn.get("role", "")).strip() |
| and str(turn.get("text", "")).strip() |
| ] |
|
|
| builder = record.get("builder") |
| if isinstance(builder, dict): |
| draft = builder.get("draft") |
| if not isinstance(draft, dict): |
| draft = {} |
| base["builder"] = { |
| "active": bool(builder.get("active", False)), |
| "step": str(builder.get("step") or "").strip() or None, |
| "draft": { |
| str(key): str(value).strip() |
| for key, value in draft.items() |
| if str(key).strip() and str(value).strip() |
| }, |
| } |
|
|
| if base["persona_id"] not in _persona_catalog(): |
| base["persona_id"] = _default_persona_id() |
|
|
| if base["chat_mode"] not in CHAT_MODES: |
| base["chat_mode"] = "warm" |
|
|
| return base |
|
|
|
|
| def _session_key(chat_id: int | str) -> str: |
| return str(chat_id) |
|
|
|
|
| def _load_memory_store() -> dict[str, dict[str, Any]]: |
| global _MEMORY_STORE_CACHE |
| if _MEMORY_STORE_CACHE is not None: |
| return _MEMORY_STORE_CACHE |
|
|
| raw_store = _load_json_file(MEMORY_PATH, {}) |
| if not isinstance(raw_store, dict): |
| raw_store = {} |
|
|
| store: dict[str, dict[str, Any]] = {} |
| for key, value in raw_store.items(): |
| if isinstance(value, dict): |
| store[str(key)] = _normalize_session_record(value) |
|
|
| _MEMORY_STORE_CACHE = store |
| return store |
|
|
|
|
| def _save_memory_store() -> None: |
| if not MEMORY_ENABLED: |
| return |
|
|
| store = _load_memory_store() |
| try: |
| MEMORY_PATH.parent.mkdir(parents=True, exist_ok=True) |
| tmp_path = MEMORY_PATH.with_suffix(MEMORY_PATH.suffix + ".tmp") |
| with tmp_path.open("w", encoding="utf-8") as handle: |
| json.dump(store, handle, ensure_ascii=True, indent=2, sort_keys=True) |
| tmp_path.replace(MEMORY_PATH) |
| except Exception: |
| logger.exception("Failed to save memory store to %s", MEMORY_PATH) |
|
|
|
|
| def _get_session_record(chat_key: str) -> dict[str, Any]: |
| store = _load_memory_store() |
| record = store.get(chat_key) |
| if record is None: |
| record = _default_session_record() |
| store[chat_key] = record |
| _save_memory_store() |
| return record |
|
|
|
|
| def _set_session_record(chat_key: str, record: dict[str, Any]) -> dict[str, Any]: |
| store = _load_memory_store() |
| normalized = _normalize_session_record(record) |
| store[chat_key] = normalized |
| _save_memory_store() |
| return normalized |
|
|
|
|
| def _reset_session_record(chat_key: str) -> dict[str, Any]: |
| record = _default_session_record() |
| return _set_session_record(chat_key, record) |
|
|
|
|
| def _active_profile(record: dict[str, Any]) -> dict[str, Any]: |
| persona = _resolve_persona(record.get("persona_id")) |
| return _normalize_character_profile(persona, record.get("custom_character")) |
|
|
|
|
| def _memory_is_enabled(record: dict[str, Any]) -> bool: |
| return MEMORY_ENABLED and bool(record.get("memory_enabled", True)) |
|
|
|
|
| def _chat_mode_key(record: dict[str, Any]) -> str: |
| mode = str(record.get("chat_mode") or "warm").strip().lower() |
| return mode if mode in CHAT_MODES else "warm" |
|
|
|
|
| def _chat_mode_spec(record: dict[str, Any]) -> dict[str, Any]: |
| return CHAT_MODES[_chat_mode_key(record)] |
|
|
|
|
| def _memory_recent_turns(record: dict[str, Any]) -> list[dict[str, str]]: |
| memory = record.get("memory") or {} |
| turns = memory.get("turns") or [] |
|
|
| if not isinstance(turns, list): |
| return [] |
|
|
| cleaned = [] |
| for turn in turns[-MEMORY_MAX_TURNS * 2 :]: |
| if not isinstance(turn, dict): |
| continue |
| role = str(turn.get("role", "")).strip() |
| text = str(turn.get("text", "")).strip() |
| if role and text: |
| cleaned.append({"role": role, "text": text}) |
|
|
| return cleaned[-MEMORY_MAX_TURNS * 2 :] |
|
|
|
|
| def _memory_summary(record: dict[str, Any]) -> str: |
| turns = _memory_recent_turns(record) |
| parts = [] |
|
|
| for turn in turns: |
| parts.append(f"{turn['role'].title()}: {turn['text']}") |
|
|
| combined = " | ".join(parts) |
| if len(combined) > MEMORY_SUMMARY_LIMIT: |
| combined = combined[:MEMORY_SUMMARY_LIMIT].rstrip() + "..." |
|
|
| return combined |
|
|
|
|
| def _memory_recent_block(record: dict[str, Any]) -> str: |
| turns = _memory_recent_turns(record) |
| if not turns: |
| return "" |
|
|
| lines = ["Recent turns:"] |
| for turn in turns[-6:]: |
| lines.append(f"{turn['role'].title()}: {turn['text']}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _record_turn(record: dict[str, Any], role: str, text: str) -> dict[str, Any]: |
| if not _memory_is_enabled(record): |
| return record |
|
|
| normalized = _normalize_session_record(record) |
| memory = normalized.setdefault("memory", {"summary": "", "turns": []}) |
| turns = memory.setdefault("turns", []) |
| turns.append({"role": role, "text": text}) |
| turns[:] = turns[-MEMORY_MAX_TURNS * 2 :] |
| memory["summary"] = _memory_summary(normalized) |
|
|
| return normalized |
|
|
|
|
| def _set_chat_mode(record: dict[str, Any], chat_mode: str) -> dict[str, Any]: |
| normalized = _normalize_session_record(record) |
| normalized["chat_mode"] = chat_mode if chat_mode in CHAT_MODES else "warm" |
| return normalized |
|
|
|
|
| def _set_builder_state( |
| record: dict[str, Any], |
| *, |
| active: bool, |
| step: str | None = None, |
| draft: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| normalized = _normalize_session_record(record) |
| normalized["builder"] = { |
| "active": active, |
| "step": step, |
| "draft": draft or {}, |
| } |
| return normalized |
|
|
|
|
| def _builder_state(record: dict[str, Any]) -> dict[str, Any]: |
| builder = record.get("builder") |
| if isinstance(builder, dict): |
| return _normalize_session_record(record)["builder"] |
| return _default_builder_record() |
|
|
|
|
| def _builder_step_prompt(step: str, profile: dict[str, Any]) -> str: |
| prompts = { |
| "name": "Send the character name.", |
| "age": "Send an adult age of 18 or higher.", |
| "gender": "Send gender or pronouns.", |
| "style": "Describe the chat style in a few words.", |
| "voice": "Describe the voice or delivery.", |
| "mood": "Describe the mood.", |
| "boundaries": "Describe the boundaries.", |
| "opening": "Send the opening line.", |
| } |
|
|
| base = prompts.get(step, "Send the next character detail.") |
| current = profile.get(step) |
|
|
| if current: |
| return f"{base} Current: {current}" |
|
|
| return base |
|
|
|
|
| def _start_character_builder(record: dict[str, Any]) -> dict[str, Any]: |
| profile = _active_profile(record) |
| draft = {field: profile.get(field, "") for field in CHARACTER_FIELDS} |
| return _set_builder_state(record, active=True, step="name", draft=draft) |
|
|
|
|
| def _advance_character_builder( |
| record: dict[str, Any], |
| text: str, |
| ) -> tuple[str, dict[str, Any]]: |
| builder = _builder_state(record) |
| if not builder.get("active"): |
| return "", record |
|
|
| step = str(builder.get("step") or "name") |
| draft = dict(builder.get("draft") or {}) |
| value = text.strip() |
|
|
| if _clean_text_button(value) in {"cancel", "stop", "reset"}: |
| record = _set_builder_state(record, active=False, step=None, draft={}) |
| return "Character builder cancelled.", record |
|
|
| if step == "age": |
| try: |
| age = max(18, int(value)) |
| except Exception: |
| return "Please send an adult age of 18 or higher.", record |
| draft[step] = str(age) |
| else: |
| draft[step] = value |
|
|
| steps = list(CHARACTER_FIELDS) |
| current_index = steps.index(step) |
|
|
| if current_index >= len(steps) - 1: |
| updated = _update_character_overrides(record, draft) |
| updated = _set_builder_state(updated, active=False, step=None, draft={}) |
| profile = _active_profile(updated) |
| return ( |
| f"Character created.\n{_format_character_card(profile)}", |
| updated, |
| ) |
|
|
| next_step = steps[current_index + 1] |
| updated = _set_builder_state(record, active=True, step=next_step, draft=draft) |
| profile = _active_profile(updated) |
|
|
| return _builder_step_prompt(next_step, profile), updated |
|
|
|
|
| def _format_character_card(profile: dict[str, Any]) -> str: |
| return ( |
| f"Name: {profile['name']}\n" |
| f"Age: {profile['age']}\n" |
| f"Gender: {profile['gender']}\n" |
| f"Style: {profile['style']}\n" |
| f"Voice: {profile['voice']}\n" |
| f"Mood: {profile['mood']}\n" |
| f"Boundaries: {profile['boundaries']}\n" |
| f"Opening: {profile['opening']}" |
| ) |
|
|
|
|
| def _format_menu(record: dict[str, Any]) -> str: |
| profile = _active_profile(record) |
| memory_status = "on" if _memory_is_enabled(record) else "off" |
| mode = _chat_mode_spec(record) |
|
|
| return ( |
| f"Current persona: {profile['persona_id']}\n" |
| f"Character: {profile['name']} ({profile['age']}, {profile['gender']})\n" |
| f"Style: {profile['style']}\n" |
| f"Mode: {mode['label']} - {mode['description']}\n" |
| f"Memory: {memory_status}\n" |
| f"Dataset: {DATASET_ID or 'not connected'}\n\n" |
| f"Tap a button below to switch persona, change tone, build a character, or reset the session." |
| ) |
|
|
|
|
| def _build_character_gallery(record: dict[str, Any]) -> str: |
| lines = ["Characters:"] |
|
|
| for persona_id in _persona_choices(): |
| persona = _resolve_persona(persona_id) |
| lines.append( |
| f"- {persona['name']} ({persona['age']}): {persona['style']} | {persona['opening']}" |
| ) |
|
|
| active = _active_profile(record) |
| lines.append("") |
| lines.append(f"Active: {active['name']} | mode {_chat_mode_spec(record)['label']}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _telegram_persona_button_map() -> dict[str, str]: |
| mapping: dict[str, str] = {} |
|
|
| for persona_id in _persona_choices(): |
| persona = _resolve_persona(persona_id) |
| mapping[_clean_text_button(persona["name"])] = persona_id |
| mapping[_clean_text_button(persona_id)] = persona_id |
|
|
| return mapping |
|
|
|
|
| def _telegram_reply_markup(record: dict[str, Any]) -> dict[str, Any]: |
| persona_names = [str(_resolve_persona(pid)["name"]) for pid in _persona_choices()] |
| mode_buttons = [mode["label"] for mode in CHAT_MODES.values()] |
| memory_button = "Memory Off" if _memory_is_enabled(record) else "Memory On" |
| builder = _builder_state(record) |
|
|
| if builder.get("active"): |
| return { |
| "keyboard": [ |
| ["Cancel", "Reset", "Menu"], |
| ["Skip"], |
| ], |
| "resize_keyboard": True, |
| "is_persistent": True, |
| "input_field_placeholder": _builder_step_prompt( |
| str(builder.get("step") or "name"), |
| _active_profile(record), |
| ), |
| } |
|
|
| return { |
| "keyboard": [ |
| ["Menu", "Characters", "Build Character"], |
| mode_buttons, |
| persona_names, |
| [memory_button, "Reset", "Help"], |
| ], |
| "resize_keyboard": True, |
| "is_persistent": True, |
| "input_field_placeholder": "Tap a button or send a message", |
| } |
|
|
|
|
| def _parse_key_values(text: str) -> dict[str, str]: |
| parsed: dict[str, str] = {} |
|
|
| if not text.strip(): |
| return parsed |
|
|
| try: |
| tokens = shlex.split(text) |
| except Exception: |
| tokens = text.split() |
|
|
| for token in tokens: |
| if "=" not in token: |
| continue |
| key, value = token.split("=", 1) |
| parsed[key.strip().lower()] = value.strip() |
|
|
| return parsed |
|
|
|
|
| def _update_character_overrides( |
| record: dict[str, Any], |
| updates: dict[str, Any], |
| ) -> dict[str, Any]: |
| normalized = _normalize_session_record(record) |
|
|
| persona_id = updates.get("persona") or updates.get("preset") or normalized["persona_id"] |
| persona_id = persona_id if persona_id in _persona_catalog() else normalized["persona_id"] |
| normalized["persona_id"] = persona_id |
|
|
| if "memory" in updates: |
| normalized["memory_enabled"] = str(updates["memory"]).lower() in TRUTHY_VALUES |
|
|
| custom = dict(normalized.get("custom_character") or {}) |
|
|
| for field in CHARACTER_FIELDS: |
| if field not in updates: |
| continue |
|
|
| if field == "age": |
| try: |
| custom[field] = max(18, int(updates[field])) |
| except Exception: |
| custom[field] = 18 |
| else: |
| custom[field] = str(updates[field]).strip() |
|
|
| normalized["custom_character"] = custom |
| return normalized |
|
|
|
|
| def _switch_persona(record: dict[str, Any], persona_id: str) -> dict[str, Any]: |
| persona_id = persona_id.strip() |
|
|
| if persona_id not in _persona_catalog(): |
| return record |
|
|
| normalized = _normalize_session_record(record) |
| normalized["persona_id"] = persona_id |
| normalized["custom_character"] = {} |
| normalized["memory"] = {"summary": "", "turns": []} |
|
|
| return normalized |
|
|
|
|
| def _clean_reply(text: str) -> str: |
| reply = str(text or "").strip() |
|
|
| for marker in ("\nUser:", "\nHuman:", "\nBabble:", "\nAssistant:", "\nFinal:"): |
| if marker in reply: |
| reply = reply.split(marker, 1)[0].strip() |
|
|
| reply = re.sub(r"(\([^)]{1,40}\)\s*)\1+", r"\1", reply) |
|
|
| sentences = re.split(r"(?<=[.!?])\s+", reply) |
| cleaned: list[str] = [] |
| seen: set[str] = set() |
|
|
| for sentence in sentences: |
| key = sentence.lower().strip() |
| if key and key not in seen: |
| cleaned.append(sentence.strip()) |
| seen.add(key) |
|
|
| reply = " ".join(cleaned).strip() |
|
|
| if not reply: |
| raise RuntimeError("EMPTY_REPLY") |
|
|
| if len(reply) > MAX_REPLY_LENGTH: |
| reply = reply[:MAX_REPLY_LENGTH].rsplit(" ", 1)[0].rstrip() + "..." |
|
|
| return reply |
|
|
|
|
| def _is_safe_message(text: str) -> bool: |
| normalized = " ".join(text.lower().split()) |
| return not any(term in normalized for term in BLOCKED_TERMS) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_text_generator(): |
| if not _is_local_model_enabled(): |
| raise RuntimeError("LOCAL_MODEL_DISABLED") |
|
|
| if pipeline is None: |
| raise RuntimeError(f"TRANSFORMERS_MISSING: {_TRANSFORMERS_IMPORT_ERROR}") |
|
|
| logger.info("Loading model: %s", MODEL_ID) |
|
|
| generator = pipeline( |
| "text-generation", |
| model=MODEL_ID, |
| device=-1, |
| ) |
|
|
| logger.info("Model loaded") |
| return generator |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_dataset(): |
| if not DATASET_ENABLED or not DATASET_ID: |
| return None |
|
|
| if load_dataset is None: |
| raise RuntimeError(f"DATASETS_MISSING: {_DATASETS_IMPORT_ERROR}") |
|
|
| logger.info("Loading dataset: %s split=%s", DATASET_ID, DATASET_SPLIT) |
|
|
| kwargs: dict[str, Any] = {"split": DATASET_SPLIT} |
|
|
| if HF_TOKEN: |
| kwargs["token"] = HF_TOKEN |
|
|
| dataset = load_dataset(DATASET_ID, **kwargs) |
|
|
| if len(dataset) > DATASET_MAX_ROWS: |
| dataset = dataset.select(range(DATASET_MAX_ROWS)) |
|
|
| logger.info("Dataset loaded: %s rows=%s", DATASET_ID, len(dataset)) |
| return dataset |
|
|
|
|
| def _row_to_text(row: Any) -> str: |
| if not isinstance(row, dict): |
| return str(row) |
|
|
| preferred_keys = ( |
| "prompt", |
| "instruction", |
| "input", |
| "chosen", |
| "response", |
| "output", |
| "text", |
| "messages", |
| "conversation", |
| ) |
|
|
| parts: list[str] = [] |
|
|
| for key in preferred_keys: |
| value = row.get(key) |
| if value is None: |
| continue |
|
|
| if isinstance(value, str): |
| parts.append(f"{key}: {value}") |
| else: |
| try: |
| parts.append(f"{key}: {json.dumps(value, ensure_ascii=False)}") |
| except Exception: |
| parts.append(f"{key}: {value}") |
|
|
| if not parts: |
| try: |
| return json.dumps(row, ensure_ascii=False) |
| except Exception: |
| return str(row) |
|
|
| return "\n".join(parts) |
|
|
|
|
| def _tokenize_for_search(text: str) -> set[str]: |
| stopwords = { |
| "the", |
| "and", |
| "you", |
| "your", |
| "that", |
| "this", |
| "with", |
| "for", |
| "are", |
| "but", |
| "not", |
| "was", |
| "have", |
| "has", |
| "had", |
| "what", |
| "when", |
| "where", |
| "how", |
| "why", |
| } |
|
|
| return { |
| token |
| for token in re.findall(r"[a-zA-Z0-9']{3,}", text.lower()) |
| if token not in stopwords |
| } |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _dataset_text_index() -> list[str]: |
| dataset = _get_dataset() |
|
|
| if dataset is None: |
| return [] |
|
|
| rows: list[str] = [] |
|
|
| for row in dataset: |
| text = _row_to_text(row) |
| text = " ".join(text.split()) |
| if text: |
| rows.append(text[:DATASET_CONTEXT_CHARS]) |
|
|
| return rows |
|
|
|
|
| def _retrieve_dataset_context(user_text: str) -> str: |
| if not DATASET_ENABLED or not DATASET_ID: |
| return "" |
|
|
| query_tokens = _tokenize_for_search(user_text) |
| if not query_tokens: |
| return "" |
|
|
| rows = _dataset_text_index() |
| if not rows: |
| return "" |
|
|
| scored: list[tuple[int, str]] = [] |
|
|
| for row_text in rows: |
| row_tokens = _tokenize_for_search(row_text) |
| score = len(query_tokens & row_tokens) |
|
|
| if score > 0: |
| scored.append((score, row_text)) |
|
|
| if not scored: |
| return "" |
|
|
| scored.sort(key=lambda item: item[0], reverse=True) |
| selected = scored[:DATASET_CONTEXT_ROWS] |
|
|
| lines = [ |
| "Dataset style/context examples. Use these for tone and pattern only. Do not quote directly." |
| ] |
|
|
| for index, (_, row_text) in enumerate(selected, start=1): |
| lines.append(f"Example {index}: {row_text}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _build_prompt(text: str, profile: dict[str, Any], record: dict[str, Any]) -> str: |
| mode = _chat_mode_spec(record) |
|
|
| memory_summary = "" |
| recent_block = "" |
|
|
| if _memory_is_enabled(record): |
| summary = _memory_summary(record) |
| if summary: |
| memory_summary = f"Conversation memory: {summary}\n" |
| recent_block = _memory_recent_block(record) |
|
|
| dataset_context = _retrieve_dataset_context(text) |
| dataset_block = "" |
|
|
| if dataset_context: |
| dataset_block = f"\nDataset guidance:\n{dataset_context}\n" |
|
|
| return ( |
| f"{SYSTEM_PROMPT}\n\n" |
| f"Character:\n" |
| f"Name: {profile['name']}\n" |
| f"Age: {profile['age']}\n" |
| f"Gender: {profile['gender']}\n" |
| f"Style: {profile['style']}\n" |
| f"Voice: {profile['voice']}\n" |
| f"Mood: {profile['mood']}\n" |
| f"Boundaries: {profile['boundaries']}\n" |
| f"Chat mode: {mode['label']} - {mode['description']}\n" |
| f"Reply style: one or two short paragraphs, emotionally present, specific, and conversational.\n" |
| f"Do not repeat gestures or sentences. Use at most one action tag.\n" |
| f"{memory_summary}" |
| f"{recent_block}\n" |
| f"{dataset_block}\n" |
| f"User: {text}\n" |
| f"{profile['name']}:" |
| ) |
|
|
|
|
| def _polish_prompt(draft: str, profile: dict[str, Any], record: dict[str, Any]) -> str: |
| mode = _chat_mode_spec(record) |
|
|
| memory_summary = "" |
| if _memory_is_enabled(record): |
| summary = _memory_summary(record) |
| if summary: |
| memory_summary = f"Memory context: {summary}\n" |
|
|
| return ( |
| f"{SYSTEM_PROMPT}\n\n" |
| f"Rewrite the draft below as the final reply.\n" |
| f"Keep the same meaning, persona, tone, and continuity.\n" |
| f"Remove repetition, padding, disclaimers, and meta commentary.\n" |
| f"Make it feel like natural chat from {profile['name']}.\n" |
| f"Chat mode: {mode['label']} - {mode['description']}\n" |
| f"Reply target: one or two short paragraphs.\n" |
| f"{memory_summary}\n" |
| f"Draft:\n{draft}\n\n" |
| f"Final:" |
| ) |
|
|
|
|
| def _run_generation( |
| prompt: str, |
| *, |
| max_new_tokens: int, |
| do_sample: bool, |
| temperature: float, |
| top_p: float, |
| ) -> str: |
| generator = _get_text_generator() |
|
|
| tokenizer = getattr(generator, "tokenizer", None) |
| eos_id = getattr(tokenizer, "eos_token_id", None) or 0 |
|
|
| generation_args: dict[str, Any] = { |
| "max_new_tokens": max_new_tokens, |
| "do_sample": do_sample, |
| "return_full_text": False, |
| "pad_token_id": eos_id, |
| "clean_up_tokenization_spaces": False, |
| "repetition_penalty": 1.18, |
| "no_repeat_ngram_size": 4, |
| } |
|
|
| if do_sample: |
| generation_args["temperature"] = temperature |
| generation_args["top_p"] = top_p |
|
|
| result = generator(prompt, **generation_args) |
| return _clean_reply(result[0].get("generated_text", "")) |
|
|
|
|
| def _generate_response( |
| text: str, |
| profile: dict[str, Any], |
| record: dict[str, Any], |
| *, |
| raise_on_error: bool = False, |
| ) -> str: |
| if not _is_local_model_enabled(): |
| raise RuntimeError("LOCAL_MODEL_DISABLED") |
|
|
| try: |
| prompt = _build_prompt(text, profile, record) |
| draft = _run_generation( |
| prompt, |
| max_new_tokens=64, |
| do_sample=True, |
| temperature=float(_chat_mode_spec(record)["temperature"]), |
| top_p=float(_chat_mode_spec(record)["top_p"]), |
| ) |
|
|
| if RESPONSE_PASSES <= 1: |
| return draft |
|
|
| polish_prompt = _polish_prompt(draft, profile, record) |
| return _run_generation( |
| polish_prompt, |
| max_new_tokens=48, |
| do_sample=False, |
| temperature=0.0, |
| top_p=1.0, |
| ) |
|
|
| except Exception as exc: |
| logger.exception("Local model generation failed: %s", exc.__class__.__name__) |
|
|
| if raise_on_error: |
| raise |
|
|
| return f"MODEL_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}" |
|
|
|
|
| async def _send_telegram_message( |
| chat_id: int, |
| text: str, |
| reply_markup: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| token = ( |
| os.getenv("TELEGRAM_BOT_TOKEN", "").strip() |
| or os.getenv("BOT_TOKEN", "").strip() |
| ) |
|
|
| if not token: |
| logger.error("TELEGRAM_BOT_TOKEN or BOT_TOKEN is not configured") |
| raise HTTPException( |
| status_code=503, |
| detail="Telegram bot token is not configured", |
| ) |
|
|
| timeout = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=10.0) |
|
|
| try: |
| async with httpx.AsyncClient(timeout=timeout) as client: |
| response = await client.post( |
| f"https://api.telegram.org/bot{token}/sendMessage", |
| json={ |
| "chat_id": chat_id, |
| "text": text, |
| **({"reply_markup": reply_markup} if reply_markup else {}), |
| }, |
| ) |
|
|
| if response.status_code >= 400: |
| logger.warning( |
| "Telegram API request failed with status %s; using webhook fallback", |
| response.status_code, |
| ) |
| payload = { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": text, |
| } |
| if reply_markup: |
| payload["reply_markup"] = reply_markup |
| return payload |
|
|
| return { |
| "ok": True, |
| "sent": True, |
| } |
|
|
| except httpx.RequestError as exc: |
| logger.warning( |
| "Telegram API request failed; using webhook fallback: %s", |
| exc.__class__.__name__, |
| ) |
| payload = { |
| "method": "sendMessage", |
| "chat_id": chat_id, |
| "text": text, |
| } |
| if reply_markup: |
| payload["reply_markup"] = reply_markup |
| return payload |
|
|
|
|
| def _extract_telegram_text(update: dict[str, Any]) -> tuple[str, int | None]: |
| message = update.get("message") or update.get("edited_message") or {} |
| chat = message.get("chat") or {} |
| text = message.get("text") or message.get("caption") or "" |
| chat_id = chat.get("id") |
|
|
| if not isinstance(chat_id, int): |
| return str(text or "").strip(), None |
|
|
| return str(text or "").strip(), chat_id |
|
|
|
|
| def _telegram_reply_for_message( |
| chat_key: str, |
| text: str, |
| ) -> tuple[str, dict[str, Any], dict[str, Any] | None]: |
| record = _get_session_record(chat_key) |
| profile = _active_profile(record) |
| normalized_text = _clean_text_button(text) |
|
|
| if not text: |
| reply = profile.get("opening") or START_RESPONSE |
| return reply, record, _telegram_reply_markup(record) |
|
|
| if normalized_text in {"", "start"}: |
| reply = profile.get("opening") or START_RESPONSE |
| return reply, record, _telegram_reply_markup(record) |
|
|
| if normalized_text in {"menu", "help"}: |
| return _format_menu(record), record, _telegram_reply_markup(record) |
|
|
| if normalized_text in {"characters"}: |
| return _build_character_gallery(record), record, _telegram_reply_markup(record) |
|
|
| if normalized_text in {"build character", "new character", "create character"}: |
| updated = _start_character_builder(record) |
| return ( |
| "Let’s build a character.\n" |
| f"{_builder_step_prompt('name', profile)}", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text in {"cancel"}: |
| builder = _builder_state(record) |
| if builder.get("active"): |
| updated = _set_builder_state(record, active=False, step=None, draft={}) |
| return "Character builder cancelled.", updated, _telegram_reply_markup(updated) |
| return "Nothing to cancel.", record, _telegram_reply_markup(record) |
|
|
| if normalized_text in {"reset"}: |
| updated = _reset_session_record(chat_key) |
| updated_profile = _active_profile(updated) |
| return ( |
| f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text in {"memory on", "memory off"}: |
| updated = _normalize_session_record(record) |
| updated["memory_enabled"] = normalized_text == "memory on" |
| updated = _set_session_record(chat_key, updated) |
| state_text = "enabled" if updated["memory_enabled"] else "disabled" |
| return f"Memory {state_text}.", updated, _telegram_reply_markup(updated) |
|
|
| mode_key = MODE_BUTTON_LOOKUP.get(normalized_text) |
| if mode_key: |
| updated = _set_chat_mode(record, mode_key) |
| updated = _set_session_record(chat_key, updated) |
| mode = _chat_mode_spec(updated) |
| return ( |
| f"Tone set to {mode['label']}. {mode['description']}", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| persona_map = _telegram_persona_button_map() |
| if normalized_text in persona_map: |
| updated = _switch_persona(record, persona_map[normalized_text]) |
| updated = _set_session_record(chat_key, updated) |
| updated_profile = _active_profile(updated) |
| return ( |
| f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text == "model": |
| return f"{MODEL_ID}", record, _telegram_reply_markup(record) |
|
|
| if normalized_text.startswith("/"): |
| if normalized_text == "/persona": |
| choices = ", ".join(_persona_choices()) |
| return ( |
| f"Current persona: {profile['persona_id']}\n" |
| f"Available personas: {choices}", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| if normalized_text.startswith("/persona "): |
| new_persona = text.split(None, 1)[1].strip() |
| if new_persona not in _persona_catalog(): |
| return ( |
| f"Unknown persona '{new_persona}'. Available personas: {', '.join(_persona_choices())}", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| updated = _switch_persona(record, new_persona) |
| updated = _set_session_record(chat_key, updated) |
| updated_profile = _active_profile(updated) |
| return ( |
| f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text == "/character": |
| return ( |
| "Current character:\n" |
| f"{_format_character_card(profile)}\n\n" |
| "Use the Character button to start a guided builder, or send key=value pairs.", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| if normalized_text.startswith("/character "): |
| raw_args = text.split(None, 1)[1].strip() |
| updates = _parse_key_values(raw_args) |
|
|
| if "age" in updates: |
| try: |
| updates["age"] = str(max(18, int(updates["age"]))) |
| except Exception: |
| updates["age"] = "18" |
|
|
| updated = _update_character_overrides(record, updates) |
| updated = _set_session_record(chat_key, updated) |
| updated_profile = _active_profile(updated) |
| return ( |
| f"Character updated.\n{_format_character_card(updated_profile)}", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text == "/reset": |
| updated = _reset_session_record(chat_key) |
| updated_profile = _active_profile(updated) |
| return ( |
| f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}", |
| updated, |
| _telegram_reply_markup(updated), |
| ) |
|
|
| if normalized_text == "/model": |
| return f"{MODEL_ID}", record, _telegram_reply_markup(record) |
|
|
| if normalized_text == "/dataset": |
| if not DATASET_ID: |
| return ( |
| "No dataset connected. Set DATASET_ID and DATASET_SPLIT.", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| try: |
| dataset = _get_dataset() |
| return ( |
| f"Dataset: {DATASET_ID}\n" |
| f"Split: {DATASET_SPLIT}\n" |
| f"Rows loaded: {len(dataset) if dataset is not None else 0}", |
| record, |
| _telegram_reply_markup(record), |
| ) |
| except Exception as exc: |
| return ( |
| f"DATASET_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| if normalized_text == "/help": |
| return HELP_RESPONSE, record, _telegram_reply_markup(record) |
|
|
| if normalized_text == "/menu": |
| return _format_menu(record), record, _telegram_reply_markup(record) |
|
|
| if normalized_text == "/start": |
| reply = profile.get("opening") or START_RESPONSE |
| return reply, record, _telegram_reply_markup(record) |
|
|
| builder = _builder_state(record) |
| if builder.get("active"): |
| reply_text, updated = _advance_character_builder(record, text) |
| if updated is not record: |
| updated = _set_session_record(chat_key, updated) |
| return reply_text, updated, _telegram_reply_markup(updated) |
|
|
| if len(text) > MAX_MESSAGE_LENGTH: |
| return ( |
| f"Message is too long. Please keep it under {MAX_MESSAGE_LENGTH} characters.", |
| record, |
| _telegram_reply_markup(record), |
| ) |
|
|
| if not _is_safe_message(text): |
| return SAFE_REFUSAL, record, _telegram_reply_markup(record) |
|
|
| reply_text = _generate_response(text, profile, record) |
|
|
| if _memory_is_enabled(record): |
| record = _record_turn(record, "user", text) |
| record = _record_turn(record, "assistant", reply_text) |
| record = _set_session_record(chat_key, record) |
|
|
| return reply_text, record, _telegram_reply_markup(record) |
|
|
|
|
| def _build_menu_text() -> str: |
| lines = ["Babble character menu:"] |
|
|
| for persona_id in _persona_choices(): |
| persona = _resolve_persona(persona_id) |
| lines.append( |
| f"- {persona_id}: {persona['name']} | {persona['style']} | age {persona['age']}" |
| ) |
|
|
| lines.append("") |
| lines.append(f"Available personas: {', '.join(_persona_choices())}") |
| lines.append( |
| "Use /persona [id] to switch, /character key=value ... to customize, and /reset to clear the session." |
| ) |
|
|
| return "\n".join(lines) |
|
|
|
|
| def _debug_profile_snapshot() -> dict[str, Any]: |
| persona = _resolve_persona(_default_persona_id()) |
| return _normalize_character_profile(persona) |
|
|
|
|
| def _create_ui_state() -> dict[str, Any]: |
| return _default_session_record() |
|
|
|
|
| def _state_to_form_values(record: dict[str, Any]) -> list[Any]: |
| profile = _active_profile(record) |
| return [ |
| gr.update(value=record["persona_id"], choices=_persona_choices()), |
| gr.update(value=profile["name"]), |
| gr.update(value=profile["age"]), |
| gr.update(value=profile["gender"]), |
| gr.update(value=profile["style"]), |
| gr.update(value=profile["voice"]), |
| gr.update(value=profile["mood"]), |
| gr.update(value=profile["boundaries"]), |
| gr.update(value=profile["opening"]), |
| gr.update(value=bool(record.get("memory_enabled", True))), |
| gr.update(value=profile), |
| gr.update(value=_format_menu(record)), |
| ] |
|
|
|
|
| def _ui_load_persona(persona_id: str, record: dict[str, Any]) -> tuple[Any, ...]: |
| updated = _switch_persona(record, persona_id) |
| return (updated, *_state_to_form_values(updated)) |
|
|
|
|
| def _ui_save_character( |
| persona_id: str, |
| name: str, |
| age: Any, |
| gender: str, |
| style: str, |
| voice: str, |
| mood: str, |
| boundaries: str, |
| opening: str, |
| memory_enabled_value: bool, |
| record: dict[str, Any], |
| ) -> tuple[Any, ...]: |
| updated = _normalize_session_record(record) |
|
|
| if persona_id in _persona_catalog(): |
| updated["persona_id"] = persona_id |
|
|
| updated["custom_character"] = { |
| "name": name, |
| "age": age, |
| "gender": gender, |
| "style": style, |
| "voice": voice, |
| "mood": mood, |
| "boundaries": boundaries, |
| "opening": opening, |
| } |
| updated["memory_enabled"] = bool(memory_enabled_value) |
|
|
| return (updated, *_state_to_form_values(updated)) |
|
|
|
|
| def _ui_reset_session(record: dict[str, Any]) -> tuple[Any, ...]: |
| updated = _default_session_record() |
| return (updated, *_state_to_form_values(updated)) |
|
|
|
|
| def _ui_send_message( |
| message: str, |
| chat_history: list[dict[str, str]] | None, |
| record: dict[str, Any], |
| ) -> tuple[Any, ...]: |
| text = str(message or "").strip() |
| history = list(chat_history or []) |
|
|
| if not text: |
| return "", history, history, record, _format_menu(record) |
|
|
| profile = _active_profile(record) |
| reply = _generate_response(text, profile, record) |
|
|
| if _memory_is_enabled(record): |
| record = _record_turn(record, "user", text) |
| record = _record_turn(record, "assistant", reply) |
|
|
| history.append({"role": "user", "content": text}) |
| history.append({"role": "assistant", "content": reply}) |
|
|
| return "", history, history, record, _format_menu(record) |
|
|
|
|
| def build_demo(): |
| if gr is None: |
| raise RuntimeError("Gradio is required to run Babble.") from _GRADIO_IMPORT_ERROR |
|
|
| with gr.Blocks(title="Babble") as demo: |
| state = gr.State(_create_ui_state()) |
| chat_history = gr.State([]) |
|
|
| gr.Markdown("# Babble") |
| gr.Markdown( |
| "Adult romance-leaning character chat with persona presets, custom character editing, memory, and dataset-guided context." |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| chatbot = gr.Chatbot(label="Conversation", height=540, type="messages") |
| message = gr.Textbox( |
| label="Message", |
| placeholder="Send a test message", |
| ) |
| send = gr.Button("Send") |
|
|
| with gr.Column(scale=1): |
| persona = gr.Dropdown( |
| choices=_persona_choices(), |
| value=_default_persona_id(), |
| label="Persona preset", |
| ) |
| load_persona = gr.Button("Load Persona") |
| reset_session = gr.Button("Reset Session") |
|
|
| name = gr.Textbox(label="Name") |
| age = gr.Number(label="Adult age", precision=0) |
| gender = gr.Textbox(label="Gender / pronouns") |
| style = gr.Textbox(label="Style") |
| voice = gr.Textbox(label="Voice") |
| mood = gr.Textbox(label="Mood") |
| boundaries = gr.Textbox(label="Boundaries") |
| opening = gr.Textbox(label="Opening line") |
| memory_enabled = gr.Checkbox(label="Memory enabled", value=MEMORY_ENABLED) |
|
|
| save_character = gr.Button("Save Character") |
| character_preview = gr.JSON(label="Active character") |
| menu_preview = gr.Textbox(label="Menu", lines=10) |
| status = gr.Textbox(label="Session status") |
|
|
| demo.load( |
| fn=lambda record: _state_to_form_values(record), |
| inputs=state, |
| outputs=[ |
| persona, |
| name, |
| age, |
| gender, |
| style, |
| voice, |
| mood, |
| boundaries, |
| opening, |
| memory_enabled, |
| character_preview, |
| menu_preview, |
| ], |
| ) |
|
|
| load_persona.click( |
| fn=_ui_load_persona, |
| inputs=[persona, state], |
| outputs=[ |
| state, |
| persona, |
| name, |
| age, |
| gender, |
| style, |
| voice, |
| mood, |
| boundaries, |
| opening, |
| memory_enabled, |
| character_preview, |
| menu_preview, |
| ], |
| ) |
|
|
| save_character.click( |
| fn=_ui_save_character, |
| inputs=[ |
| persona, |
| name, |
| age, |
| gender, |
| style, |
| voice, |
| mood, |
| boundaries, |
| opening, |
| memory_enabled, |
| state, |
| ], |
| outputs=[ |
| state, |
| persona, |
| name, |
| age, |
| gender, |
| style, |
| voice, |
| mood, |
| boundaries, |
| opening, |
| memory_enabled, |
| character_preview, |
| menu_preview, |
| ], |
| ) |
|
|
| reset_session.click( |
| fn=_ui_reset_session, |
| inputs=[state], |
| outputs=[ |
| state, |
| persona, |
| name, |
| age, |
| gender, |
| style, |
| voice, |
| mood, |
| boundaries, |
| opening, |
| memory_enabled, |
| character_preview, |
| menu_preview, |
| ], |
| ) |
|
|
| send.click( |
| fn=_ui_send_message, |
| inputs=[message, chat_history, state], |
| outputs=[message, chatbot, chat_history, state, menu_preview], |
| ) |
|
|
| message.submit( |
| fn=_ui_send_message, |
| inputs=[message, chat_history, state], |
| outputs=[message, chatbot, chat_history, state, menu_preview], |
| ) |
|
|
| return demo |
|
|
|
|
| def create_app() -> FastAPI: |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| if REQUIRE_MODEL and _is_local_model_enabled(): |
| logger.info("Preloading model...") |
| _get_text_generator() |
|
|
| if DATASET_ENABLED and DATASET_ID: |
| logger.info("Preloading dataset...") |
| try: |
| _get_dataset() |
| _dataset_text_index() |
| except Exception: |
| logger.exception("Dataset preload failed") |
|
|
| yield |
|
|
| fastapi_app = FastAPI(title="Babble", version=APP_VERSION, lifespan=lifespan) |
|
|
| @fastapi_app.get("/") |
| async def root() -> dict[str, Any]: |
| return { |
| "ok": True, |
| "service": "babble", |
| "status": "running", |
| "version": APP_VERSION, |
| "routes": [ |
| "/health", |
| "/model", |
| "/dataset", |
| "/persona", |
| "/menu", |
| "/model/test", |
| "/debug/config", |
| "/debug/model", |
| ], |
| } |
|
|
| @fastapi_app.get("/health") |
| async def health() -> dict[str, Any]: |
| return { |
| "ok": True, |
| "service": "babble", |
| "version": APP_VERSION, |
| "model": MODEL_ID, |
| "loaded": _get_text_generator.cache_info().currsize > 0, |
| "persona": _default_persona_id(), |
| "dataset_connected": bool(DATASET_ID), |
| } |
|
|
| @fastapi_app.get("/model") |
| async def model() -> dict[str, Any]: |
| profile = _debug_profile_snapshot() |
| return { |
| "model_id": MODEL_ID, |
| "enabled": _is_local_model_enabled(), |
| "loaded": _get_text_generator.cache_info().currsize > 0, |
| "transformers": pipeline is not None, |
| "transformers_import_error": ( |
| None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300] |
| ), |
| "persona_id": profile["persona_id"], |
| "persona_name": profile["name"], |
| "memory_enabled": MEMORY_ENABLED, |
| "memory_path": str(MEMORY_PATH), |
| } |
|
|
| @fastapi_app.get("/dataset") |
| async def dataset_status() -> dict[str, Any]: |
| if not DATASET_ENABLED: |
| return { |
| "ok": False, |
| "enabled": False, |
| "error": "DATASET_DISABLED", |
| } |
|
|
| if not DATASET_ID: |
| return { |
| "ok": False, |
| "enabled": True, |
| "error": "DATASET_ID missing", |
| } |
|
|
| if load_dataset is None: |
| return { |
| "ok": False, |
| "enabled": True, |
| "dataset_id": DATASET_ID, |
| "error": "datasets import failed", |
| "detail": str(_DATASETS_IMPORT_ERROR)[:300], |
| } |
|
|
| dataset = _get_dataset() |
| index = _dataset_text_index() |
|
|
| return { |
| "ok": True, |
| "enabled": True, |
| "dataset_id": DATASET_ID, |
| "split": DATASET_SPLIT, |
| "rows_loaded": len(dataset) if dataset is not None else 0, |
| "indexed_rows": len(index), |
| "columns": dataset.column_names if dataset is not None else [], |
| "sample": dataset[0] if dataset is not None and len(dataset) else None, |
| } |
|
|
| @fastapi_app.get("/persona") |
| async def persona_menu() -> dict[str, Any]: |
| profile = _debug_profile_snapshot() |
| return { |
| "active_persona": profile, |
| "available_personas": { |
| persona_id: _resolve_persona(persona_id) |
| for persona_id in _persona_choices() |
| }, |
| } |
|
|
| @fastapi_app.get("/menu") |
| async def menu() -> dict[str, str]: |
| return { |
| "text": _build_menu_text(), |
| } |
|
|
| @fastapi_app.get("/model/test") |
| async def test() -> dict[str, Any]: |
| profile = _debug_profile_snapshot() |
| record = _default_session_record() |
| return { |
| "reply": _generate_response("Say hello briefly.", profile, record), |
| "persona_id": profile["persona_id"], |
| } |
|
|
| @fastapi_app.post("/telegram/webhook/{secret}") |
| async def telegram_webhook(secret: str, update: dict[str, Any]) -> dict[str, Any]: |
| logger.info("Telegram update received") |
|
|
| expected_secret = os.getenv("WEBHOOK_SECRET", "").strip() |
|
|
| if not expected_secret: |
| logger.error("WEBHOOK_SECRET is not configured") |
| raise HTTPException( |
| status_code=503, |
| detail="Webhook is not configured", |
| ) |
|
|
| if not hmac.compare_digest(secret, expected_secret): |
| logger.warning("Rejected webhook request with invalid secret") |
| raise HTTPException( |
| status_code=403, |
| detail="Invalid webhook secret", |
| ) |
|
|
| text, chat_id = _extract_telegram_text(update) |
|
|
| logger.info("Telegram chat_id found: %s", chat_id is not None) |
| logger.info("Telegram message text found: %s", bool(text)) |
|
|
| if chat_id is None: |
| return { |
| "ok": True, |
| "ignored": "missing_chat_id", |
| } |
|
|
| reply_text, record, reply_markup = _telegram_reply_for_message( |
| _session_key(chat_id), |
| text, |
| ) |
|
|
| _set_session_record(_session_key(chat_id), record) |
| return await _send_telegram_message(chat_id, reply_text, reply_markup) |
|
|
| @fastapi_app.get("/debug/config") |
| async def debug_config() -> dict[str, Any]: |
| if not _is_debug_enabled(): |
| raise HTTPException(status_code=404, detail="Not found") |
|
|
| profile = _debug_profile_snapshot() |
|
|
| return { |
| "debug": True, |
| "service": "babble", |
| "version": APP_VERSION, |
| "llm_provider": _sanitize_provider(), |
| "model_id": MODEL_ID, |
| "local_model_enabled": _is_local_model_enabled(), |
| "transformers_available": pipeline is not None, |
| "transformers_import_error": ( |
| None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300] |
| ), |
| "persona_id": profile["persona_id"], |
| "persona_name": profile["name"], |
| "persona_count": len(_persona_catalog()), |
| "memory_enabled": MEMORY_ENABLED, |
| "memory_path": str(MEMORY_PATH), |
| "memory_max_turns": MEMORY_MAX_TURNS, |
| "telegram_bot_token_configured": ( |
| _configured("TELEGRAM_BOT_TOKEN") or _configured("BOT_TOKEN") |
| ), |
| "webhook_secret_configured": _configured("WEBHOOK_SECRET"), |
| "llm_api_key_configured": _configured("LLM_API_KEY"), |
| "reply_mode": "telegram_direct_send_with_webhook_fallback", |
| "dataset_enabled": DATASET_ENABLED, |
| "dataset_id": DATASET_ID, |
| "dataset_split": DATASET_SPLIT, |
| "datasets_available": load_dataset is not None, |
| "datasets_import_error": ( |
| None if load_dataset is not None else str(_DATASETS_IMPORT_ERROR)[:300] |
| ), |
| "hf_token_configured": bool(HF_TOKEN), |
| } |
|
|
| @fastapi_app.get("/debug/model") |
| async def debug_model() -> dict[str, Any]: |
| if not _is_debug_enabled(): |
| raise HTTPException(status_code=404, detail="Not found") |
|
|
| try: |
| profile = _debug_profile_snapshot() |
| reply = _generate_response( |
| "Say hello in one short sentence.", |
| profile, |
| _default_session_record(), |
| raise_on_error=True, |
| ) |
| return { |
| "ok": True, |
| "model_id": MODEL_ID, |
| "persona_id": profile["persona_id"], |
| "reply": reply, |
| } |
| except Exception as exc: |
| logger.exception("Debug model test failed") |
| return { |
| "ok": False, |
| "error": exc.__class__.__name__, |
| "detail": str(exc)[:200], |
| } |
|
|
| if gr is None: |
| logger.warning("Gradio unavailable; serving API endpoints only") |
| return fastapi_app |
|
|
| return gr.mount_gradio_app(fastapi_app, build_demo(), path="/") |
|
|
|
|
| app = create_app() |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
|
|
| uvicorn.run( |
| app, |
| host="0.0.0.0", |
| port=int(os.getenv("BABBLE_PORT", os.getenv("PORT", "7860"))), |
| log_level=os.getenv("LOG_LEVEL", "info").lower(), |
| ) |