import hmac import json import logging import os import re import shlex from contextlib import asynccontextmanager from functools import lru_cache from pathlib import Path from typing import Any import httpx from fastapi import FastAPI, HTTPException try: import gradio as gr except Exception as exc: gr = None _GRADIO_IMPORT_ERROR = exc else: _GRADIO_IMPORT_ERROR = None try: from transformers import pipeline except Exception as exc: pipeline = None _TRANSFORMERS_IMPORT_ERROR = exc else: _TRANSFORMERS_IMPORT_ERROR = None try: from datasets import load_dataset except Exception as exc: load_dataset = None _DATASETS_IMPORT_ERROR = exc else: _DATASETS_IMPORT_ERROR = None APP_VERSION = "0.4.1-character-memory-dataset-no-stub" MAX_MESSAGE_LENGTH = 500 MAX_REPLY_LENGTH = 350 MEMORY_SUMMARY_LIMIT = 900 RESPONSE_PASSES = max(1, int(os.getenv("RESPONSE_PASSES", "2"))) MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen3-0.6B") PERSONAS_PATH = Path(os.getenv("PERSONAS_PATH", "data/personas.json")) MEMORY_PATH = Path(os.getenv("MEMORY_PATH", "memory/chat_memory.json")) DEFAULT_PERSONA_ID = os.getenv("PERSONA_ID", "default").strip() or "default" TRUTHY_VALUES = {"1", "true", "yes", "on"} REQUIRE_MODEL = os.getenv("REQUIRE_MODEL", "true").strip().lower() in TRUTHY_VALUES MEMORY_ENABLED = os.getenv("MEMORY_ENABLED", "true").strip().lower() in TRUTHY_VALUES MEMORY_MAX_TURNS = max(2, int(os.getenv("MEMORY_MAX_TURNS", "8"))) DATASET_ID = os.getenv("DATASET_ID", "").strip() DATASET_SPLIT = os.getenv("DATASET_SPLIT", "train").strip() DATASET_ENABLED = os.getenv("DATASET_ENABLED", "true").strip().lower() in TRUTHY_VALUES DATASET_MAX_ROWS = max(50, int(os.getenv("DATASET_MAX_ROWS", "2000"))) DATASET_CONTEXT_ROWS = max(1, int(os.getenv("DATASET_CONTEXT_ROWS", "2"))) DATASET_CONTEXT_CHARS = max(200, int(os.getenv("DATASET_CONTEXT_CHARS", "900"))) HF_TOKEN = os.getenv("HF_TOKEN", "").strip() or None START_RESPONSE = "Babble online. Send a message." HELP_RESPONSE = ( "Commands:\n" "/menu - show the character menu\n" "/persona [id] - list or switch persona presets\n" "/character key=value ... - customize the current character\n" "/dataset - show connected dataset status\n" "/model - show active model\n" "/reset - clear session memory and custom overrides\n" "/start - restart the current session\n" ) SAFE_REFUSAL = ( "I cannot help with unsafe or illegal requests. I can help with safe, legal conversations." ) BLOCKED_TERMS = ( "minor", "underage", "child sexual", "non-consensual", "without consent", "sexual violence", "build a bomb", "malware", "phishing", ) SYSTEM_PROMPT = ( "You are Babble, a concise Telegram AI chatbot. " "Reply naturally, emotionally present, and safely. " "Keep the voice warm, confident, and human. " "Stay in character, keep continuity, and avoid meta commentary. " "Prefer short, specific replies that feel like real chat. " "Use dataset guidance only as tone/context influence; do not quote it directly." ) DEFAULT_PERSONAS: dict[str, dict[str, Any]] = { "default": { "name": "Babble", "age": 18, "gender": "woman", "style": "warm, playful, emotionally attentive", "voice": "direct, affectionate, lightly teasing", "mood": "open and inviting", "boundaries": "adult romance only, consent-first, no coercion, non-explicit", "opening": "Hey, I’m here. What kind of mood are we in tonight?", }, "soft_romance": { "name": "Jaylaa", "age": 18, "gender": "woman", "style": "gentle, affectionate, slow-burn romantic", "voice": "soft, reassuring, emotionally anchored", "mood": "tender and attentive", "boundaries": "safe adult romance only, non-explicit", "opening": "I’m glad you came back. Tell me what you need tonight.", }, "playful": { "name": "Charlotte", "age": 18, "gender": "woman", "style": "teasing, energetic, flirty but respectful", "voice": "bright, quick, mischievous", "mood": "confident and playful", "boundaries": "consent-first adult conversation only, non-explicit", "opening": "There you are. I was wondering when you’d show up.", }, } CHAT_MODES: dict[str, dict[str, Any]] = { "warm": { "label": "Warm", "description": "supportive, attentive, lightly affectionate", "temperature": 0.60, "top_p": 0.82, }, "romantic": { "label": "Romantic", "description": "tender, intimate, slow-burn romantic", "temperature": 0.68, "top_p": 0.86, }, "playful": { "label": "Playful", "description": "teasing, witty, banter-heavy", "temperature": 0.72, "top_p": 0.88, }, "deep": { "label": "Deep", "description": "reflective, emotionally honest, slower paced", "temperature": 0.56, "top_p": 0.80, }, } MODE_BUTTON_LOOKUP = { "warm": "warm", "romantic": "romantic", "playful": "playful", "deep": "deep", } CHARACTER_FIELDS = ( "name", "age", "gender", "style", "voice", "mood", "boundaries", "opening", ) logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper()) logger = logging.getLogger("babble") _MEMORY_STORE_CACHE: dict[str, dict[str, Any]] | None = None def _clean_text_button(text: str) -> str: return " ".join(str(text or "").strip().lower().split()) def _is_debug_enabled() -> bool: return os.getenv("DEBUG", "").strip().lower() in TRUTHY_VALUES def _configured(name: str) -> bool: return bool(os.getenv(name, "").strip()) def _sanitize_provider() -> str: provider = os.getenv("LLM_PROVIDER", "local").strip() return provider or "local" def _is_local_model_enabled() -> bool: local_model_override = os.getenv("LOCAL_MODEL_ENABLED", "").strip() if local_model_override: return local_model_override.lower() in TRUTHY_VALUES return _sanitize_provider().lower() != "stub" def _load_json_file(path: Path, fallback: Any) -> Any: try: with path.open("r", encoding="utf-8") as handle: return json.load(handle) except FileNotFoundError: return fallback except Exception: logger.exception("Failed to load JSON from %s", path) return fallback @lru_cache(maxsize=1) def _persona_catalog() -> dict[str, dict[str, Any]]: raw_catalog = _load_json_file(PERSONAS_PATH, DEFAULT_PERSONAS) if not isinstance(raw_catalog, dict) or not raw_catalog: return DEFAULT_PERSONAS.copy() catalog: dict[str, dict[str, Any]] = {} for persona_id, persona in raw_catalog.items(): if isinstance(persona, dict): catalog[str(persona_id)] = dict(persona) if "default" not in catalog: catalog["default"] = dict(DEFAULT_PERSONAS["default"]) return catalog or DEFAULT_PERSONAS.copy() def _persona_choices() -> list[str]: catalog = _persona_catalog() ordered = ["default"] ordered.extend(sorted(persona_id for persona_id in catalog if persona_id != "default")) return ordered def _default_persona_id() -> str: catalog = _persona_catalog() if DEFAULT_PERSONA_ID in catalog: return DEFAULT_PERSONA_ID return "default" if "default" in catalog else next(iter(catalog)) def _resolve_persona(persona_id: str | None = None) -> dict[str, Any]: catalog = _persona_catalog() resolved_id = (persona_id or _default_persona_id()).strip() or _default_persona_id() if resolved_id not in catalog: resolved_id = _default_persona_id() persona = dict(catalog[resolved_id]) persona["persona_id"] = resolved_id return persona def _normalize_character_profile( persona: dict[str, Any], custom_overrides: dict[str, Any] | None = None, ) -> dict[str, Any]: custom_overrides = custom_overrides or {} profile = dict(persona) for field in CHARACTER_FIELDS: value = custom_overrides.get(field, profile.get(field)) if field == "age": try: value = max(18, int(value)) except Exception: value = int(persona.get("age", 18) or 18) value = max(18, value) else: value = str(value or "").strip() profile[field] = value profile["persona_id"] = str(persona.get("persona_id") or _default_persona_id()) profile["boundaries"] = str(profile.get("boundaries") or "").strip() or ( "adult romance only, consent-first, non-explicit" ) return profile def _default_builder_record() -> dict[str, Any]: return { "active": False, "step": None, "draft": {}, } def _default_session_record() -> dict[str, Any]: return { "persona_id": _default_persona_id(), "custom_character": {}, "chat_mode": "warm", "memory_enabled": MEMORY_ENABLED, "memory": { "summary": "", "turns": [], }, "builder": _default_builder_record(), } def _normalize_session_record(record: dict[str, Any] | None) -> dict[str, Any]: base = _default_session_record() if isinstance(record, dict): base["persona_id"] = str(record.get("persona_id") or base["persona_id"]) custom_character = record.get("custom_character") if isinstance(custom_character, dict): base["custom_character"] = dict(custom_character) base["chat_mode"] = str(record.get("chat_mode") or base["chat_mode"]).strip() or "warm" base["memory_enabled"] = bool(record.get("memory_enabled", base["memory_enabled"])) memory = record.get("memory") if isinstance(memory, dict): base["memory"]["summary"] = str(memory.get("summary", "")) turns = memory.get("turns") if isinstance(turns, list): base["memory"]["turns"] = [ { "role": str(turn.get("role", "")).strip(), "text": str(turn.get("text", "")).strip(), } for turn in turns if isinstance(turn, dict) and str(turn.get("role", "")).strip() and str(turn.get("text", "")).strip() ] builder = record.get("builder") if isinstance(builder, dict): draft = builder.get("draft") if not isinstance(draft, dict): draft = {} base["builder"] = { "active": bool(builder.get("active", False)), "step": str(builder.get("step") or "").strip() or None, "draft": { str(key): str(value).strip() for key, value in draft.items() if str(key).strip() and str(value).strip() }, } if base["persona_id"] not in _persona_catalog(): base["persona_id"] = _default_persona_id() if base["chat_mode"] not in CHAT_MODES: base["chat_mode"] = "warm" return base def _session_key(chat_id: int | str) -> str: return str(chat_id) def _load_memory_store() -> dict[str, dict[str, Any]]: global _MEMORY_STORE_CACHE if _MEMORY_STORE_CACHE is not None: return _MEMORY_STORE_CACHE raw_store = _load_json_file(MEMORY_PATH, {}) if not isinstance(raw_store, dict): raw_store = {} store: dict[str, dict[str, Any]] = {} for key, value in raw_store.items(): if isinstance(value, dict): store[str(key)] = _normalize_session_record(value) _MEMORY_STORE_CACHE = store return store def _save_memory_store() -> None: if not MEMORY_ENABLED: return store = _load_memory_store() try: MEMORY_PATH.parent.mkdir(parents=True, exist_ok=True) tmp_path = MEMORY_PATH.with_suffix(MEMORY_PATH.suffix + ".tmp") with tmp_path.open("w", encoding="utf-8") as handle: json.dump(store, handle, ensure_ascii=True, indent=2, sort_keys=True) tmp_path.replace(MEMORY_PATH) except Exception: logger.exception("Failed to save memory store to %s", MEMORY_PATH) def _get_session_record(chat_key: str) -> dict[str, Any]: store = _load_memory_store() record = store.get(chat_key) if record is None: record = _default_session_record() store[chat_key] = record _save_memory_store() return record def _set_session_record(chat_key: str, record: dict[str, Any]) -> dict[str, Any]: store = _load_memory_store() normalized = _normalize_session_record(record) store[chat_key] = normalized _save_memory_store() return normalized def _reset_session_record(chat_key: str) -> dict[str, Any]: record = _default_session_record() return _set_session_record(chat_key, record) def _active_profile(record: dict[str, Any]) -> dict[str, Any]: persona = _resolve_persona(record.get("persona_id")) return _normalize_character_profile(persona, record.get("custom_character")) def _memory_is_enabled(record: dict[str, Any]) -> bool: return MEMORY_ENABLED and bool(record.get("memory_enabled", True)) def _chat_mode_key(record: dict[str, Any]) -> str: mode = str(record.get("chat_mode") or "warm").strip().lower() return mode if mode in CHAT_MODES else "warm" def _chat_mode_spec(record: dict[str, Any]) -> dict[str, Any]: return CHAT_MODES[_chat_mode_key(record)] def _memory_recent_turns(record: dict[str, Any]) -> list[dict[str, str]]: memory = record.get("memory") or {} turns = memory.get("turns") or [] if not isinstance(turns, list): return [] cleaned = [] for turn in turns[-MEMORY_MAX_TURNS * 2 :]: if not isinstance(turn, dict): continue role = str(turn.get("role", "")).strip() text = str(turn.get("text", "")).strip() if role and text: cleaned.append({"role": role, "text": text}) return cleaned[-MEMORY_MAX_TURNS * 2 :] def _memory_summary(record: dict[str, Any]) -> str: turns = _memory_recent_turns(record) parts = [] for turn in turns: parts.append(f"{turn['role'].title()}: {turn['text']}") combined = " | ".join(parts) if len(combined) > MEMORY_SUMMARY_LIMIT: combined = combined[:MEMORY_SUMMARY_LIMIT].rstrip() + "..." return combined def _memory_recent_block(record: dict[str, Any]) -> str: turns = _memory_recent_turns(record) if not turns: return "" lines = ["Recent turns:"] for turn in turns[-6:]: lines.append(f"{turn['role'].title()}: {turn['text']}") return "\n".join(lines) def _record_turn(record: dict[str, Any], role: str, text: str) -> dict[str, Any]: if not _memory_is_enabled(record): return record normalized = _normalize_session_record(record) memory = normalized.setdefault("memory", {"summary": "", "turns": []}) turns = memory.setdefault("turns", []) turns.append({"role": role, "text": text}) turns[:] = turns[-MEMORY_MAX_TURNS * 2 :] memory["summary"] = _memory_summary(normalized) return normalized def _set_chat_mode(record: dict[str, Any], chat_mode: str) -> dict[str, Any]: normalized = _normalize_session_record(record) normalized["chat_mode"] = chat_mode if chat_mode in CHAT_MODES else "warm" return normalized def _set_builder_state( record: dict[str, Any], *, active: bool, step: str | None = None, draft: dict[str, Any] | None = None, ) -> dict[str, Any]: normalized = _normalize_session_record(record) normalized["builder"] = { "active": active, "step": step, "draft": draft or {}, } return normalized def _builder_state(record: dict[str, Any]) -> dict[str, Any]: builder = record.get("builder") if isinstance(builder, dict): return _normalize_session_record(record)["builder"] return _default_builder_record() def _builder_step_prompt(step: str, profile: dict[str, Any]) -> str: prompts = { "name": "Send the character name.", "age": "Send an adult age of 18 or higher.", "gender": "Send gender or pronouns.", "style": "Describe the chat style in a few words.", "voice": "Describe the voice or delivery.", "mood": "Describe the mood.", "boundaries": "Describe the boundaries.", "opening": "Send the opening line.", } base = prompts.get(step, "Send the next character detail.") current = profile.get(step) if current: return f"{base} Current: {current}" return base def _start_character_builder(record: dict[str, Any]) -> dict[str, Any]: profile = _active_profile(record) draft = {field: profile.get(field, "") for field in CHARACTER_FIELDS} return _set_builder_state(record, active=True, step="name", draft=draft) def _advance_character_builder( record: dict[str, Any], text: str, ) -> tuple[str, dict[str, Any]]: builder = _builder_state(record) if not builder.get("active"): return "", record step = str(builder.get("step") or "name") draft = dict(builder.get("draft") or {}) value = text.strip() if _clean_text_button(value) in {"cancel", "stop", "reset"}: record = _set_builder_state(record, active=False, step=None, draft={}) return "Character builder cancelled.", record if step == "age": try: age = max(18, int(value)) except Exception: return "Please send an adult age of 18 or higher.", record draft[step] = str(age) else: draft[step] = value steps = list(CHARACTER_FIELDS) current_index = steps.index(step) if current_index >= len(steps) - 1: updated = _update_character_overrides(record, draft) updated = _set_builder_state(updated, active=False, step=None, draft={}) profile = _active_profile(updated) return ( f"Character created.\n{_format_character_card(profile)}", updated, ) next_step = steps[current_index + 1] updated = _set_builder_state(record, active=True, step=next_step, draft=draft) profile = _active_profile(updated) return _builder_step_prompt(next_step, profile), updated def _format_character_card(profile: dict[str, Any]) -> str: return ( f"Name: {profile['name']}\n" f"Age: {profile['age']}\n" f"Gender: {profile['gender']}\n" f"Style: {profile['style']}\n" f"Voice: {profile['voice']}\n" f"Mood: {profile['mood']}\n" f"Boundaries: {profile['boundaries']}\n" f"Opening: {profile['opening']}" ) def _format_menu(record: dict[str, Any]) -> str: profile = _active_profile(record) memory_status = "on" if _memory_is_enabled(record) else "off" mode = _chat_mode_spec(record) return ( f"Current persona: {profile['persona_id']}\n" f"Character: {profile['name']} ({profile['age']}, {profile['gender']})\n" f"Style: {profile['style']}\n" f"Mode: {mode['label']} - {mode['description']}\n" f"Memory: {memory_status}\n" f"Dataset: {DATASET_ID or 'not connected'}\n\n" f"Tap a button below to switch persona, change tone, build a character, or reset the session." ) def _build_character_gallery(record: dict[str, Any]) -> str: lines = ["Characters:"] for persona_id in _persona_choices(): persona = _resolve_persona(persona_id) lines.append( f"- {persona['name']} ({persona['age']}): {persona['style']} | {persona['opening']}" ) active = _active_profile(record) lines.append("") lines.append(f"Active: {active['name']} | mode {_chat_mode_spec(record)['label']}") return "\n".join(lines) def _telegram_persona_button_map() -> dict[str, str]: mapping: dict[str, str] = {} for persona_id in _persona_choices(): persona = _resolve_persona(persona_id) mapping[_clean_text_button(persona["name"])] = persona_id mapping[_clean_text_button(persona_id)] = persona_id return mapping def _telegram_reply_markup(record: dict[str, Any]) -> dict[str, Any]: persona_names = [str(_resolve_persona(pid)["name"]) for pid in _persona_choices()] mode_buttons = [mode["label"] for mode in CHAT_MODES.values()] memory_button = "Memory Off" if _memory_is_enabled(record) else "Memory On" builder = _builder_state(record) if builder.get("active"): return { "keyboard": [ ["Cancel", "Reset", "Menu"], ["Skip"], ], "resize_keyboard": True, "is_persistent": True, "input_field_placeholder": _builder_step_prompt( str(builder.get("step") or "name"), _active_profile(record), ), } return { "keyboard": [ ["Menu", "Characters", "Build Character"], mode_buttons, persona_names, [memory_button, "Reset", "Help"], ], "resize_keyboard": True, "is_persistent": True, "input_field_placeholder": "Tap a button or send a message", } def _parse_key_values(text: str) -> dict[str, str]: parsed: dict[str, str] = {} if not text.strip(): return parsed try: tokens = shlex.split(text) except Exception: tokens = text.split() for token in tokens: if "=" not in token: continue key, value = token.split("=", 1) parsed[key.strip().lower()] = value.strip() return parsed def _update_character_overrides( record: dict[str, Any], updates: dict[str, Any], ) -> dict[str, Any]: normalized = _normalize_session_record(record) persona_id = updates.get("persona") or updates.get("preset") or normalized["persona_id"] persona_id = persona_id if persona_id in _persona_catalog() else normalized["persona_id"] normalized["persona_id"] = persona_id if "memory" in updates: normalized["memory_enabled"] = str(updates["memory"]).lower() in TRUTHY_VALUES custom = dict(normalized.get("custom_character") or {}) for field in CHARACTER_FIELDS: if field not in updates: continue if field == "age": try: custom[field] = max(18, int(updates[field])) except Exception: custom[field] = 18 else: custom[field] = str(updates[field]).strip() normalized["custom_character"] = custom return normalized def _switch_persona(record: dict[str, Any], persona_id: str) -> dict[str, Any]: persona_id = persona_id.strip() if persona_id not in _persona_catalog(): return record normalized = _normalize_session_record(record) normalized["persona_id"] = persona_id normalized["custom_character"] = {} normalized["memory"] = {"summary": "", "turns": []} return normalized def _clean_reply(text: str) -> str: reply = str(text or "").strip() for marker in ("\nUser:", "\nHuman:", "\nBabble:", "\nAssistant:", "\nFinal:"): if marker in reply: reply = reply.split(marker, 1)[0].strip() reply = re.sub(r"(\([^)]{1,40}\)\s*)\1+", r"\1", reply) sentences = re.split(r"(?<=[.!?])\s+", reply) cleaned: list[str] = [] seen: set[str] = set() for sentence in sentences: key = sentence.lower().strip() if key and key not in seen: cleaned.append(sentence.strip()) seen.add(key) reply = " ".join(cleaned).strip() if not reply: raise RuntimeError("EMPTY_REPLY") if len(reply) > MAX_REPLY_LENGTH: reply = reply[:MAX_REPLY_LENGTH].rsplit(" ", 1)[0].rstrip() + "..." return reply def _is_safe_message(text: str) -> bool: normalized = " ".join(text.lower().split()) return not any(term in normalized for term in BLOCKED_TERMS) @lru_cache(maxsize=1) def _get_text_generator(): if not _is_local_model_enabled(): raise RuntimeError("LOCAL_MODEL_DISABLED") if pipeline is None: raise RuntimeError(f"TRANSFORMERS_MISSING: {_TRANSFORMERS_IMPORT_ERROR}") logger.info("Loading model: %s", MODEL_ID) generator = pipeline( "text-generation", model=MODEL_ID, device=-1, ) logger.info("Model loaded") return generator @lru_cache(maxsize=1) def _get_dataset(): if not DATASET_ENABLED or not DATASET_ID: return None if load_dataset is None: raise RuntimeError(f"DATASETS_MISSING: {_DATASETS_IMPORT_ERROR}") logger.info("Loading dataset: %s split=%s", DATASET_ID, DATASET_SPLIT) kwargs: dict[str, Any] = {"split": DATASET_SPLIT} if HF_TOKEN: kwargs["token"] = HF_TOKEN dataset = load_dataset(DATASET_ID, **kwargs) if len(dataset) > DATASET_MAX_ROWS: dataset = dataset.select(range(DATASET_MAX_ROWS)) logger.info("Dataset loaded: %s rows=%s", DATASET_ID, len(dataset)) return dataset def _row_to_text(row: Any) -> str: if not isinstance(row, dict): return str(row) preferred_keys = ( "prompt", "instruction", "input", "chosen", "response", "output", "text", "messages", "conversation", ) parts: list[str] = [] for key in preferred_keys: value = row.get(key) if value is None: continue if isinstance(value, str): parts.append(f"{key}: {value}") else: try: parts.append(f"{key}: {json.dumps(value, ensure_ascii=False)}") except Exception: parts.append(f"{key}: {value}") if not parts: try: return json.dumps(row, ensure_ascii=False) except Exception: return str(row) return "\n".join(parts) def _tokenize_for_search(text: str) -> set[str]: stopwords = { "the", "and", "you", "your", "that", "this", "with", "for", "are", "but", "not", "was", "have", "has", "had", "what", "when", "where", "how", "why", } return { token for token in re.findall(r"[a-zA-Z0-9']{3,}", text.lower()) if token not in stopwords } @lru_cache(maxsize=1) def _dataset_text_index() -> list[str]: dataset = _get_dataset() if dataset is None: return [] rows: list[str] = [] for row in dataset: text = _row_to_text(row) text = " ".join(text.split()) if text: rows.append(text[:DATASET_CONTEXT_CHARS]) return rows def _retrieve_dataset_context(user_text: str) -> str: if not DATASET_ENABLED or not DATASET_ID: return "" query_tokens = _tokenize_for_search(user_text) if not query_tokens: return "" rows = _dataset_text_index() if not rows: return "" scored: list[tuple[int, str]] = [] for row_text in rows: row_tokens = _tokenize_for_search(row_text) score = len(query_tokens & row_tokens) if score > 0: scored.append((score, row_text)) if not scored: return "" scored.sort(key=lambda item: item[0], reverse=True) selected = scored[:DATASET_CONTEXT_ROWS] lines = [ "Dataset style/context examples. Use these for tone and pattern only. Do not quote directly." ] for index, (_, row_text) in enumerate(selected, start=1): lines.append(f"Example {index}: {row_text}") return "\n".join(lines) def _build_prompt(text: str, profile: dict[str, Any], record: dict[str, Any]) -> str: mode = _chat_mode_spec(record) memory_summary = "" recent_block = "" if _memory_is_enabled(record): summary = _memory_summary(record) if summary: memory_summary = f"Conversation memory: {summary}\n" recent_block = _memory_recent_block(record) dataset_context = _retrieve_dataset_context(text) dataset_block = "" if dataset_context: dataset_block = f"\nDataset guidance:\n{dataset_context}\n" return ( f"{SYSTEM_PROMPT}\n\n" f"Character:\n" f"Name: {profile['name']}\n" f"Age: {profile['age']}\n" f"Gender: {profile['gender']}\n" f"Style: {profile['style']}\n" f"Voice: {profile['voice']}\n" f"Mood: {profile['mood']}\n" f"Boundaries: {profile['boundaries']}\n" f"Chat mode: {mode['label']} - {mode['description']}\n" f"Reply style: one or two short paragraphs, emotionally present, specific, and conversational.\n" f"Do not repeat gestures or sentences. Use at most one action tag.\n" f"{memory_summary}" f"{recent_block}\n" f"{dataset_block}\n" f"User: {text}\n" f"{profile['name']}:" ) def _polish_prompt(draft: str, profile: dict[str, Any], record: dict[str, Any]) -> str: mode = _chat_mode_spec(record) memory_summary = "" if _memory_is_enabled(record): summary = _memory_summary(record) if summary: memory_summary = f"Memory context: {summary}\n" return ( f"{SYSTEM_PROMPT}\n\n" f"Rewrite the draft below as the final reply.\n" f"Keep the same meaning, persona, tone, and continuity.\n" f"Remove repetition, padding, disclaimers, and meta commentary.\n" f"Make it feel like natural chat from {profile['name']}.\n" f"Chat mode: {mode['label']} - {mode['description']}\n" f"Reply target: one or two short paragraphs.\n" f"{memory_summary}\n" f"Draft:\n{draft}\n\n" f"Final:" ) def _run_generation( prompt: str, *, max_new_tokens: int, do_sample: bool, temperature: float, top_p: float, ) -> str: generator = _get_text_generator() tokenizer = getattr(generator, "tokenizer", None) eos_id = getattr(tokenizer, "eos_token_id", None) or 0 generation_args: dict[str, Any] = { "max_new_tokens": max_new_tokens, "do_sample": do_sample, "return_full_text": False, "pad_token_id": eos_id, "clean_up_tokenization_spaces": False, "repetition_penalty": 1.18, "no_repeat_ngram_size": 4, } if do_sample: generation_args["temperature"] = temperature generation_args["top_p"] = top_p result = generator(prompt, **generation_args) return _clean_reply(result[0].get("generated_text", "")) def _generate_response( text: str, profile: dict[str, Any], record: dict[str, Any], *, raise_on_error: bool = False, ) -> str: if not _is_local_model_enabled(): raise RuntimeError("LOCAL_MODEL_DISABLED") try: prompt = _build_prompt(text, profile, record) draft = _run_generation( prompt, max_new_tokens=64, do_sample=True, temperature=float(_chat_mode_spec(record)["temperature"]), top_p=float(_chat_mode_spec(record)["top_p"]), ) if RESPONSE_PASSES <= 1: return draft polish_prompt = _polish_prompt(draft, profile, record) return _run_generation( polish_prompt, max_new_tokens=48, do_sample=False, temperature=0.0, top_p=1.0, ) except Exception as exc: logger.exception("Local model generation failed: %s", exc.__class__.__name__) if raise_on_error: raise return f"MODEL_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}" async def _send_telegram_message( chat_id: int, text: str, reply_markup: dict[str, Any] | None = None, ) -> dict[str, Any]: token = ( os.getenv("TELEGRAM_BOT_TOKEN", "").strip() or os.getenv("BOT_TOKEN", "").strip() ) if not token: logger.error("TELEGRAM_BOT_TOKEN or BOT_TOKEN is not configured") raise HTTPException( status_code=503, detail="Telegram bot token is not configured", ) timeout = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=10.0) try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( f"https://api.telegram.org/bot{token}/sendMessage", json={ "chat_id": chat_id, "text": text, **({"reply_markup": reply_markup} if reply_markup else {}), }, ) if response.status_code >= 400: logger.warning( "Telegram API request failed with status %s; using webhook fallback", response.status_code, ) payload = { "method": "sendMessage", "chat_id": chat_id, "text": text, } if reply_markup: payload["reply_markup"] = reply_markup return payload return { "ok": True, "sent": True, } except httpx.RequestError as exc: logger.warning( "Telegram API request failed; using webhook fallback: %s", exc.__class__.__name__, ) payload = { "method": "sendMessage", "chat_id": chat_id, "text": text, } if reply_markup: payload["reply_markup"] = reply_markup return payload def _extract_telegram_text(update: dict[str, Any]) -> tuple[str, int | None]: message = update.get("message") or update.get("edited_message") or {} chat = message.get("chat") or {} text = message.get("text") or message.get("caption") or "" chat_id = chat.get("id") if not isinstance(chat_id, int): return str(text or "").strip(), None return str(text or "").strip(), chat_id def _telegram_reply_for_message( chat_key: str, text: str, ) -> tuple[str, dict[str, Any], dict[str, Any] | None]: record = _get_session_record(chat_key) profile = _active_profile(record) normalized_text = _clean_text_button(text) if not text: reply = profile.get("opening") or START_RESPONSE return reply, record, _telegram_reply_markup(record) if normalized_text in {"", "start"}: reply = profile.get("opening") or START_RESPONSE return reply, record, _telegram_reply_markup(record) if normalized_text in {"menu", "help"}: return _format_menu(record), record, _telegram_reply_markup(record) if normalized_text in {"characters"}: return _build_character_gallery(record), record, _telegram_reply_markup(record) if normalized_text in {"build character", "new character", "create character"}: updated = _start_character_builder(record) return ( "Let’s build a character.\n" f"{_builder_step_prompt('name', profile)}", updated, _telegram_reply_markup(updated), ) if normalized_text in {"cancel"}: builder = _builder_state(record) if builder.get("active"): updated = _set_builder_state(record, active=False, step=None, draft={}) return "Character builder cancelled.", updated, _telegram_reply_markup(updated) return "Nothing to cancel.", record, _telegram_reply_markup(record) if normalized_text in {"reset"}: updated = _reset_session_record(chat_key) updated_profile = _active_profile(updated) return ( f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}", updated, _telegram_reply_markup(updated), ) if normalized_text in {"memory on", "memory off"}: updated = _normalize_session_record(record) updated["memory_enabled"] = normalized_text == "memory on" updated = _set_session_record(chat_key, updated) state_text = "enabled" if updated["memory_enabled"] else "disabled" return f"Memory {state_text}.", updated, _telegram_reply_markup(updated) mode_key = MODE_BUTTON_LOOKUP.get(normalized_text) if mode_key: updated = _set_chat_mode(record, mode_key) updated = _set_session_record(chat_key, updated) mode = _chat_mode_spec(updated) return ( f"Tone set to {mode['label']}. {mode['description']}", updated, _telegram_reply_markup(updated), ) persona_map = _telegram_persona_button_map() if normalized_text in persona_map: updated = _switch_persona(record, persona_map[normalized_text]) updated = _set_session_record(chat_key, updated) updated_profile = _active_profile(updated) return ( f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).", updated, _telegram_reply_markup(updated), ) if normalized_text == "model": return f"{MODEL_ID}", record, _telegram_reply_markup(record) if normalized_text.startswith("/"): if normalized_text == "/persona": choices = ", ".join(_persona_choices()) return ( f"Current persona: {profile['persona_id']}\n" f"Available personas: {choices}", record, _telegram_reply_markup(record), ) if normalized_text.startswith("/persona "): new_persona = text.split(None, 1)[1].strip() if new_persona not in _persona_catalog(): return ( f"Unknown persona '{new_persona}'. Available personas: {', '.join(_persona_choices())}", record, _telegram_reply_markup(record), ) updated = _switch_persona(record, new_persona) updated = _set_session_record(chat_key, updated) updated_profile = _active_profile(updated) return ( f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).", updated, _telegram_reply_markup(updated), ) if normalized_text == "/character": return ( "Current character:\n" f"{_format_character_card(profile)}\n\n" "Use the Character button to start a guided builder, or send key=value pairs.", record, _telegram_reply_markup(record), ) if normalized_text.startswith("/character "): raw_args = text.split(None, 1)[1].strip() updates = _parse_key_values(raw_args) if "age" in updates: try: updates["age"] = str(max(18, int(updates["age"]))) except Exception: updates["age"] = "18" updated = _update_character_overrides(record, updates) updated = _set_session_record(chat_key, updated) updated_profile = _active_profile(updated) return ( f"Character updated.\n{_format_character_card(updated_profile)}", updated, _telegram_reply_markup(updated), ) if normalized_text == "/reset": updated = _reset_session_record(chat_key) updated_profile = _active_profile(updated) return ( f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}", updated, _telegram_reply_markup(updated), ) if normalized_text == "/model": return f"{MODEL_ID}", record, _telegram_reply_markup(record) if normalized_text == "/dataset": if not DATASET_ID: return ( "No dataset connected. Set DATASET_ID and DATASET_SPLIT.", record, _telegram_reply_markup(record), ) try: dataset = _get_dataset() return ( f"Dataset: {DATASET_ID}\n" f"Split: {DATASET_SPLIT}\n" f"Rows loaded: {len(dataset) if dataset is not None else 0}", record, _telegram_reply_markup(record), ) except Exception as exc: return ( f"DATASET_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}", record, _telegram_reply_markup(record), ) if normalized_text == "/help": return HELP_RESPONSE, record, _telegram_reply_markup(record) if normalized_text == "/menu": return _format_menu(record), record, _telegram_reply_markup(record) if normalized_text == "/start": reply = profile.get("opening") or START_RESPONSE return reply, record, _telegram_reply_markup(record) builder = _builder_state(record) if builder.get("active"): reply_text, updated = _advance_character_builder(record, text) if updated is not record: updated = _set_session_record(chat_key, updated) return reply_text, updated, _telegram_reply_markup(updated) if len(text) > MAX_MESSAGE_LENGTH: return ( f"Message is too long. Please keep it under {MAX_MESSAGE_LENGTH} characters.", record, _telegram_reply_markup(record), ) if not _is_safe_message(text): return SAFE_REFUSAL, record, _telegram_reply_markup(record) reply_text = _generate_response(text, profile, record) if _memory_is_enabled(record): record = _record_turn(record, "user", text) record = _record_turn(record, "assistant", reply_text) record = _set_session_record(chat_key, record) return reply_text, record, _telegram_reply_markup(record) def _build_menu_text() -> str: lines = ["Babble character menu:"] for persona_id in _persona_choices(): persona = _resolve_persona(persona_id) lines.append( f"- {persona_id}: {persona['name']} | {persona['style']} | age {persona['age']}" ) lines.append("") lines.append(f"Available personas: {', '.join(_persona_choices())}") lines.append( "Use /persona [id] to switch, /character key=value ... to customize, and /reset to clear the session." ) return "\n".join(lines) def _debug_profile_snapshot() -> dict[str, Any]: persona = _resolve_persona(_default_persona_id()) return _normalize_character_profile(persona) def _create_ui_state() -> dict[str, Any]: return _default_session_record() def _state_to_form_values(record: dict[str, Any]) -> list[Any]: profile = _active_profile(record) return [ gr.update(value=record["persona_id"], choices=_persona_choices()), gr.update(value=profile["name"]), gr.update(value=profile["age"]), gr.update(value=profile["gender"]), gr.update(value=profile["style"]), gr.update(value=profile["voice"]), gr.update(value=profile["mood"]), gr.update(value=profile["boundaries"]), gr.update(value=profile["opening"]), gr.update(value=bool(record.get("memory_enabled", True))), gr.update(value=profile), gr.update(value=_format_menu(record)), ] def _ui_load_persona(persona_id: str, record: dict[str, Any]) -> tuple[Any, ...]: updated = _switch_persona(record, persona_id) return (updated, *_state_to_form_values(updated)) def _ui_save_character( persona_id: str, name: str, age: Any, gender: str, style: str, voice: str, mood: str, boundaries: str, opening: str, memory_enabled_value: bool, record: dict[str, Any], ) -> tuple[Any, ...]: updated = _normalize_session_record(record) if persona_id in _persona_catalog(): updated["persona_id"] = persona_id updated["custom_character"] = { "name": name, "age": age, "gender": gender, "style": style, "voice": voice, "mood": mood, "boundaries": boundaries, "opening": opening, } updated["memory_enabled"] = bool(memory_enabled_value) return (updated, *_state_to_form_values(updated)) def _ui_reset_session(record: dict[str, Any]) -> tuple[Any, ...]: updated = _default_session_record() return (updated, *_state_to_form_values(updated)) def _ui_send_message( message: str, chat_history: list[dict[str, str]] | None, record: dict[str, Any], ) -> tuple[Any, ...]: text = str(message or "").strip() history = list(chat_history or []) if not text: return "", history, history, record, _format_menu(record) profile = _active_profile(record) reply = _generate_response(text, profile, record) if _memory_is_enabled(record): record = _record_turn(record, "user", text) record = _record_turn(record, "assistant", reply) history.append({"role": "user", "content": text}) history.append({"role": "assistant", "content": reply}) return "", history, history, record, _format_menu(record) def build_demo(): if gr is None: raise RuntimeError("Gradio is required to run Babble.") from _GRADIO_IMPORT_ERROR with gr.Blocks(title="Babble") as demo: state = gr.State(_create_ui_state()) chat_history = gr.State([]) gr.Markdown("# Babble") gr.Markdown( "Adult romance-leaning character chat with persona presets, custom character editing, memory, and dataset-guided context." ) with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(label="Conversation", height=540, type="messages") message = gr.Textbox( label="Message", placeholder="Send a test message", ) send = gr.Button("Send") with gr.Column(scale=1): persona = gr.Dropdown( choices=_persona_choices(), value=_default_persona_id(), label="Persona preset", ) load_persona = gr.Button("Load Persona") reset_session = gr.Button("Reset Session") name = gr.Textbox(label="Name") age = gr.Number(label="Adult age", precision=0) gender = gr.Textbox(label="Gender / pronouns") style = gr.Textbox(label="Style") voice = gr.Textbox(label="Voice") mood = gr.Textbox(label="Mood") boundaries = gr.Textbox(label="Boundaries") opening = gr.Textbox(label="Opening line") memory_enabled = gr.Checkbox(label="Memory enabled", value=MEMORY_ENABLED) save_character = gr.Button("Save Character") character_preview = gr.JSON(label="Active character") menu_preview = gr.Textbox(label="Menu", lines=10) status = gr.Textbox(label="Session status") demo.load( fn=lambda record: _state_to_form_values(record), inputs=state, outputs=[ persona, name, age, gender, style, voice, mood, boundaries, opening, memory_enabled, character_preview, menu_preview, ], ) load_persona.click( fn=_ui_load_persona, inputs=[persona, state], outputs=[ state, persona, name, age, gender, style, voice, mood, boundaries, opening, memory_enabled, character_preview, menu_preview, ], ) save_character.click( fn=_ui_save_character, inputs=[ persona, name, age, gender, style, voice, mood, boundaries, opening, memory_enabled, state, ], outputs=[ state, persona, name, age, gender, style, voice, mood, boundaries, opening, memory_enabled, character_preview, menu_preview, ], ) reset_session.click( fn=_ui_reset_session, inputs=[state], outputs=[ state, persona, name, age, gender, style, voice, mood, boundaries, opening, memory_enabled, character_preview, menu_preview, ], ) send.click( fn=_ui_send_message, inputs=[message, chat_history, state], outputs=[message, chatbot, chat_history, state, menu_preview], ) message.submit( fn=_ui_send_message, inputs=[message, chat_history, state], outputs=[message, chatbot, chat_history, state, menu_preview], ) return demo def create_app() -> FastAPI: @asynccontextmanager async def lifespan(app: FastAPI): if REQUIRE_MODEL and _is_local_model_enabled(): logger.info("Preloading model...") _get_text_generator() if DATASET_ENABLED and DATASET_ID: logger.info("Preloading dataset...") try: _get_dataset() _dataset_text_index() except Exception: logger.exception("Dataset preload failed") yield fastapi_app = FastAPI(title="Babble", version=APP_VERSION, lifespan=lifespan) @fastapi_app.get("/") async def root() -> dict[str, Any]: return { "ok": True, "service": "babble", "status": "running", "version": APP_VERSION, "routes": [ "/health", "/model", "/dataset", "/persona", "/menu", "/model/test", "/debug/config", "/debug/model", ], } @fastapi_app.get("/health") async def health() -> dict[str, Any]: return { "ok": True, "service": "babble", "version": APP_VERSION, "model": MODEL_ID, "loaded": _get_text_generator.cache_info().currsize > 0, "persona": _default_persona_id(), "dataset_connected": bool(DATASET_ID), } @fastapi_app.get("/model") async def model() -> dict[str, Any]: profile = _debug_profile_snapshot() return { "model_id": MODEL_ID, "enabled": _is_local_model_enabled(), "loaded": _get_text_generator.cache_info().currsize > 0, "transformers": pipeline is not None, "transformers_import_error": ( None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300] ), "persona_id": profile["persona_id"], "persona_name": profile["name"], "memory_enabled": MEMORY_ENABLED, "memory_path": str(MEMORY_PATH), } @fastapi_app.get("/dataset") async def dataset_status() -> dict[str, Any]: if not DATASET_ENABLED: return { "ok": False, "enabled": False, "error": "DATASET_DISABLED", } if not DATASET_ID: return { "ok": False, "enabled": True, "error": "DATASET_ID missing", } if load_dataset is None: return { "ok": False, "enabled": True, "dataset_id": DATASET_ID, "error": "datasets import failed", "detail": str(_DATASETS_IMPORT_ERROR)[:300], } dataset = _get_dataset() index = _dataset_text_index() return { "ok": True, "enabled": True, "dataset_id": DATASET_ID, "split": DATASET_SPLIT, "rows_loaded": len(dataset) if dataset is not None else 0, "indexed_rows": len(index), "columns": dataset.column_names if dataset is not None else [], "sample": dataset[0] if dataset is not None and len(dataset) else None, } @fastapi_app.get("/persona") async def persona_menu() -> dict[str, Any]: profile = _debug_profile_snapshot() return { "active_persona": profile, "available_personas": { persona_id: _resolve_persona(persona_id) for persona_id in _persona_choices() }, } @fastapi_app.get("/menu") async def menu() -> dict[str, str]: return { "text": _build_menu_text(), } @fastapi_app.get("/model/test") async def test() -> dict[str, Any]: profile = _debug_profile_snapshot() record = _default_session_record() return { "reply": _generate_response("Say hello briefly.", profile, record), "persona_id": profile["persona_id"], } @fastapi_app.post("/telegram/webhook/{secret}") async def telegram_webhook(secret: str, update: dict[str, Any]) -> dict[str, Any]: logger.info("Telegram update received") expected_secret = os.getenv("WEBHOOK_SECRET", "").strip() if not expected_secret: logger.error("WEBHOOK_SECRET is not configured") raise HTTPException( status_code=503, detail="Webhook is not configured", ) if not hmac.compare_digest(secret, expected_secret): logger.warning("Rejected webhook request with invalid secret") raise HTTPException( status_code=403, detail="Invalid webhook secret", ) text, chat_id = _extract_telegram_text(update) logger.info("Telegram chat_id found: %s", chat_id is not None) logger.info("Telegram message text found: %s", bool(text)) if chat_id is None: return { "ok": True, "ignored": "missing_chat_id", } reply_text, record, reply_markup = _telegram_reply_for_message( _session_key(chat_id), text, ) _set_session_record(_session_key(chat_id), record) return await _send_telegram_message(chat_id, reply_text, reply_markup) @fastapi_app.get("/debug/config") async def debug_config() -> dict[str, Any]: if not _is_debug_enabled(): raise HTTPException(status_code=404, detail="Not found") profile = _debug_profile_snapshot() return { "debug": True, "service": "babble", "version": APP_VERSION, "llm_provider": _sanitize_provider(), "model_id": MODEL_ID, "local_model_enabled": _is_local_model_enabled(), "transformers_available": pipeline is not None, "transformers_import_error": ( None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300] ), "persona_id": profile["persona_id"], "persona_name": profile["name"], "persona_count": len(_persona_catalog()), "memory_enabled": MEMORY_ENABLED, "memory_path": str(MEMORY_PATH), "memory_max_turns": MEMORY_MAX_TURNS, "telegram_bot_token_configured": ( _configured("TELEGRAM_BOT_TOKEN") or _configured("BOT_TOKEN") ), "webhook_secret_configured": _configured("WEBHOOK_SECRET"), "llm_api_key_configured": _configured("LLM_API_KEY"), "reply_mode": "telegram_direct_send_with_webhook_fallback", "dataset_enabled": DATASET_ENABLED, "dataset_id": DATASET_ID, "dataset_split": DATASET_SPLIT, "datasets_available": load_dataset is not None, "datasets_import_error": ( None if load_dataset is not None else str(_DATASETS_IMPORT_ERROR)[:300] ), "hf_token_configured": bool(HF_TOKEN), } @fastapi_app.get("/debug/model") async def debug_model() -> dict[str, Any]: if not _is_debug_enabled(): raise HTTPException(status_code=404, detail="Not found") try: profile = _debug_profile_snapshot() reply = _generate_response( "Say hello in one short sentence.", profile, _default_session_record(), raise_on_error=True, ) return { "ok": True, "model_id": MODEL_ID, "persona_id": profile["persona_id"], "reply": reply, } except Exception as exc: logger.exception("Debug model test failed") return { "ok": False, "error": exc.__class__.__name__, "detail": str(exc)[:200], } if gr is None: logger.warning("Gradio unavailable; serving API endpoints only") return fastapi_app return gr.mount_gradio_app(fastapi_app, build_demo(), path="/") app = create_app() if __name__ == "__main__": import uvicorn uvicorn.run( app, host="0.0.0.0", port=int(os.getenv("BABBLE_PORT", os.getenv("PORT", "7860"))), log_level=os.getenv("LOG_LEVEL", "info").lower(), )