Babble / app.py
TheWheke's picture
Update app.py
1f0896a verified
import hmac
import json
import logging
import os
import re
import shlex
from contextlib import asynccontextmanager
from functools import lru_cache
from pathlib import Path
from typing import Any
import httpx
from fastapi import FastAPI, HTTPException
try:
import gradio as gr
except Exception as exc:
gr = None
_GRADIO_IMPORT_ERROR = exc
else:
_GRADIO_IMPORT_ERROR = None
try:
from transformers import pipeline
except Exception as exc:
pipeline = None
_TRANSFORMERS_IMPORT_ERROR = exc
else:
_TRANSFORMERS_IMPORT_ERROR = None
try:
from datasets import load_dataset
except Exception as exc:
load_dataset = None
_DATASETS_IMPORT_ERROR = exc
else:
_DATASETS_IMPORT_ERROR = None
APP_VERSION = "0.4.1-character-memory-dataset-no-stub"
MAX_MESSAGE_LENGTH = 500
MAX_REPLY_LENGTH = 350
MEMORY_SUMMARY_LIMIT = 900
RESPONSE_PASSES = max(1, int(os.getenv("RESPONSE_PASSES", "2")))
MODEL_ID = os.getenv("MODEL_ID", "Qwen/Qwen3-0.6B")
PERSONAS_PATH = Path(os.getenv("PERSONAS_PATH", "data/personas.json"))
MEMORY_PATH = Path(os.getenv("MEMORY_PATH", "memory/chat_memory.json"))
DEFAULT_PERSONA_ID = os.getenv("PERSONA_ID", "default").strip() or "default"
TRUTHY_VALUES = {"1", "true", "yes", "on"}
REQUIRE_MODEL = os.getenv("REQUIRE_MODEL", "true").strip().lower() in TRUTHY_VALUES
MEMORY_ENABLED = os.getenv("MEMORY_ENABLED", "true").strip().lower() in TRUTHY_VALUES
MEMORY_MAX_TURNS = max(2, int(os.getenv("MEMORY_MAX_TURNS", "8")))
DATASET_ID = os.getenv("DATASET_ID", "").strip()
DATASET_SPLIT = os.getenv("DATASET_SPLIT", "train").strip()
DATASET_ENABLED = os.getenv("DATASET_ENABLED", "true").strip().lower() in TRUTHY_VALUES
DATASET_MAX_ROWS = max(50, int(os.getenv("DATASET_MAX_ROWS", "2000")))
DATASET_CONTEXT_ROWS = max(1, int(os.getenv("DATASET_CONTEXT_ROWS", "2")))
DATASET_CONTEXT_CHARS = max(200, int(os.getenv("DATASET_CONTEXT_CHARS", "900")))
HF_TOKEN = os.getenv("HF_TOKEN", "").strip() or None
START_RESPONSE = "Babble online. Send a message."
HELP_RESPONSE = (
"Commands:\n"
"/menu - show the character menu\n"
"/persona [id] - list or switch persona presets\n"
"/character key=value ... - customize the current character\n"
"/dataset - show connected dataset status\n"
"/model - show active model\n"
"/reset - clear session memory and custom overrides\n"
"/start - restart the current session\n"
)
SAFE_REFUSAL = (
"I cannot help with unsafe or illegal requests. I can help with safe, legal conversations."
)
BLOCKED_TERMS = (
"minor",
"underage",
"child sexual",
"non-consensual",
"without consent",
"sexual violence",
"build a bomb",
"malware",
"phishing",
)
SYSTEM_PROMPT = (
"You are Babble, a concise Telegram AI chatbot. "
"Reply naturally, emotionally present, and safely. "
"Keep the voice warm, confident, and human. "
"Stay in character, keep continuity, and avoid meta commentary. "
"Prefer short, specific replies that feel like real chat. "
"Use dataset guidance only as tone/context influence; do not quote it directly."
)
DEFAULT_PERSONAS: dict[str, dict[str, Any]] = {
"default": {
"name": "Babble",
"age": 18,
"gender": "woman",
"style": "warm, playful, emotionally attentive",
"voice": "direct, affectionate, lightly teasing",
"mood": "open and inviting",
"boundaries": "adult romance only, consent-first, no coercion, non-explicit",
"opening": "Hey, I’m here. What kind of mood are we in tonight?",
},
"soft_romance": {
"name": "Jaylaa",
"age": 18,
"gender": "woman",
"style": "gentle, affectionate, slow-burn romantic",
"voice": "soft, reassuring, emotionally anchored",
"mood": "tender and attentive",
"boundaries": "safe adult romance only, non-explicit",
"opening": "I’m glad you came back. Tell me what you need tonight.",
},
"playful": {
"name": "Charlotte",
"age": 18,
"gender": "woman",
"style": "teasing, energetic, flirty but respectful",
"voice": "bright, quick, mischievous",
"mood": "confident and playful",
"boundaries": "consent-first adult conversation only, non-explicit",
"opening": "There you are. I was wondering when you’d show up.",
},
}
CHAT_MODES: dict[str, dict[str, Any]] = {
"warm": {
"label": "Warm",
"description": "supportive, attentive, lightly affectionate",
"temperature": 0.60,
"top_p": 0.82,
},
"romantic": {
"label": "Romantic",
"description": "tender, intimate, slow-burn romantic",
"temperature": 0.68,
"top_p": 0.86,
},
"playful": {
"label": "Playful",
"description": "teasing, witty, banter-heavy",
"temperature": 0.72,
"top_p": 0.88,
},
"deep": {
"label": "Deep",
"description": "reflective, emotionally honest, slower paced",
"temperature": 0.56,
"top_p": 0.80,
},
}
MODE_BUTTON_LOOKUP = {
"warm": "warm",
"romantic": "romantic",
"playful": "playful",
"deep": "deep",
}
CHARACTER_FIELDS = (
"name",
"age",
"gender",
"style",
"voice",
"mood",
"boundaries",
"opening",
)
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper())
logger = logging.getLogger("babble")
_MEMORY_STORE_CACHE: dict[str, dict[str, Any]] | None = None
def _clean_text_button(text: str) -> str:
return " ".join(str(text or "").strip().lower().split())
def _is_debug_enabled() -> bool:
return os.getenv("DEBUG", "").strip().lower() in TRUTHY_VALUES
def _configured(name: str) -> bool:
return bool(os.getenv(name, "").strip())
def _sanitize_provider() -> str:
provider = os.getenv("LLM_PROVIDER", "local").strip()
return provider or "local"
def _is_local_model_enabled() -> bool:
local_model_override = os.getenv("LOCAL_MODEL_ENABLED", "").strip()
if local_model_override:
return local_model_override.lower() in TRUTHY_VALUES
return _sanitize_provider().lower() != "stub"
def _load_json_file(path: Path, fallback: Any) -> Any:
try:
with path.open("r", encoding="utf-8") as handle:
return json.load(handle)
except FileNotFoundError:
return fallback
except Exception:
logger.exception("Failed to load JSON from %s", path)
return fallback
@lru_cache(maxsize=1)
def _persona_catalog() -> dict[str, dict[str, Any]]:
raw_catalog = _load_json_file(PERSONAS_PATH, DEFAULT_PERSONAS)
if not isinstance(raw_catalog, dict) or not raw_catalog:
return DEFAULT_PERSONAS.copy()
catalog: dict[str, dict[str, Any]] = {}
for persona_id, persona in raw_catalog.items():
if isinstance(persona, dict):
catalog[str(persona_id)] = dict(persona)
if "default" not in catalog:
catalog["default"] = dict(DEFAULT_PERSONAS["default"])
return catalog or DEFAULT_PERSONAS.copy()
def _persona_choices() -> list[str]:
catalog = _persona_catalog()
ordered = ["default"]
ordered.extend(sorted(persona_id for persona_id in catalog if persona_id != "default"))
return ordered
def _default_persona_id() -> str:
catalog = _persona_catalog()
if DEFAULT_PERSONA_ID in catalog:
return DEFAULT_PERSONA_ID
return "default" if "default" in catalog else next(iter(catalog))
def _resolve_persona(persona_id: str | None = None) -> dict[str, Any]:
catalog = _persona_catalog()
resolved_id = (persona_id or _default_persona_id()).strip() or _default_persona_id()
if resolved_id not in catalog:
resolved_id = _default_persona_id()
persona = dict(catalog[resolved_id])
persona["persona_id"] = resolved_id
return persona
def _normalize_character_profile(
persona: dict[str, Any],
custom_overrides: dict[str, Any] | None = None,
) -> dict[str, Any]:
custom_overrides = custom_overrides or {}
profile = dict(persona)
for field in CHARACTER_FIELDS:
value = custom_overrides.get(field, profile.get(field))
if field == "age":
try:
value = max(18, int(value))
except Exception:
value = int(persona.get("age", 18) or 18)
value = max(18, value)
else:
value = str(value or "").strip()
profile[field] = value
profile["persona_id"] = str(persona.get("persona_id") or _default_persona_id())
profile["boundaries"] = str(profile.get("boundaries") or "").strip() or (
"adult romance only, consent-first, non-explicit"
)
return profile
def _default_builder_record() -> dict[str, Any]:
return {
"active": False,
"step": None,
"draft": {},
}
def _default_session_record() -> dict[str, Any]:
return {
"persona_id": _default_persona_id(),
"custom_character": {},
"chat_mode": "warm",
"memory_enabled": MEMORY_ENABLED,
"memory": {
"summary": "",
"turns": [],
},
"builder": _default_builder_record(),
}
def _normalize_session_record(record: dict[str, Any] | None) -> dict[str, Any]:
base = _default_session_record()
if isinstance(record, dict):
base["persona_id"] = str(record.get("persona_id") or base["persona_id"])
custom_character = record.get("custom_character")
if isinstance(custom_character, dict):
base["custom_character"] = dict(custom_character)
base["chat_mode"] = str(record.get("chat_mode") or base["chat_mode"]).strip() or "warm"
base["memory_enabled"] = bool(record.get("memory_enabled", base["memory_enabled"]))
memory = record.get("memory")
if isinstance(memory, dict):
base["memory"]["summary"] = str(memory.get("summary", ""))
turns = memory.get("turns")
if isinstance(turns, list):
base["memory"]["turns"] = [
{
"role": str(turn.get("role", "")).strip(),
"text": str(turn.get("text", "")).strip(),
}
for turn in turns
if isinstance(turn, dict)
and str(turn.get("role", "")).strip()
and str(turn.get("text", "")).strip()
]
builder = record.get("builder")
if isinstance(builder, dict):
draft = builder.get("draft")
if not isinstance(draft, dict):
draft = {}
base["builder"] = {
"active": bool(builder.get("active", False)),
"step": str(builder.get("step") or "").strip() or None,
"draft": {
str(key): str(value).strip()
for key, value in draft.items()
if str(key).strip() and str(value).strip()
},
}
if base["persona_id"] not in _persona_catalog():
base["persona_id"] = _default_persona_id()
if base["chat_mode"] not in CHAT_MODES:
base["chat_mode"] = "warm"
return base
def _session_key(chat_id: int | str) -> str:
return str(chat_id)
def _load_memory_store() -> dict[str, dict[str, Any]]:
global _MEMORY_STORE_CACHE
if _MEMORY_STORE_CACHE is not None:
return _MEMORY_STORE_CACHE
raw_store = _load_json_file(MEMORY_PATH, {})
if not isinstance(raw_store, dict):
raw_store = {}
store: dict[str, dict[str, Any]] = {}
for key, value in raw_store.items():
if isinstance(value, dict):
store[str(key)] = _normalize_session_record(value)
_MEMORY_STORE_CACHE = store
return store
def _save_memory_store() -> None:
if not MEMORY_ENABLED:
return
store = _load_memory_store()
try:
MEMORY_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp_path = MEMORY_PATH.with_suffix(MEMORY_PATH.suffix + ".tmp")
with tmp_path.open("w", encoding="utf-8") as handle:
json.dump(store, handle, ensure_ascii=True, indent=2, sort_keys=True)
tmp_path.replace(MEMORY_PATH)
except Exception:
logger.exception("Failed to save memory store to %s", MEMORY_PATH)
def _get_session_record(chat_key: str) -> dict[str, Any]:
store = _load_memory_store()
record = store.get(chat_key)
if record is None:
record = _default_session_record()
store[chat_key] = record
_save_memory_store()
return record
def _set_session_record(chat_key: str, record: dict[str, Any]) -> dict[str, Any]:
store = _load_memory_store()
normalized = _normalize_session_record(record)
store[chat_key] = normalized
_save_memory_store()
return normalized
def _reset_session_record(chat_key: str) -> dict[str, Any]:
record = _default_session_record()
return _set_session_record(chat_key, record)
def _active_profile(record: dict[str, Any]) -> dict[str, Any]:
persona = _resolve_persona(record.get("persona_id"))
return _normalize_character_profile(persona, record.get("custom_character"))
def _memory_is_enabled(record: dict[str, Any]) -> bool:
return MEMORY_ENABLED and bool(record.get("memory_enabled", True))
def _chat_mode_key(record: dict[str, Any]) -> str:
mode = str(record.get("chat_mode") or "warm").strip().lower()
return mode if mode in CHAT_MODES else "warm"
def _chat_mode_spec(record: dict[str, Any]) -> dict[str, Any]:
return CHAT_MODES[_chat_mode_key(record)]
def _memory_recent_turns(record: dict[str, Any]) -> list[dict[str, str]]:
memory = record.get("memory") or {}
turns = memory.get("turns") or []
if not isinstance(turns, list):
return []
cleaned = []
for turn in turns[-MEMORY_MAX_TURNS * 2 :]:
if not isinstance(turn, dict):
continue
role = str(turn.get("role", "")).strip()
text = str(turn.get("text", "")).strip()
if role and text:
cleaned.append({"role": role, "text": text})
return cleaned[-MEMORY_MAX_TURNS * 2 :]
def _memory_summary(record: dict[str, Any]) -> str:
turns = _memory_recent_turns(record)
parts = []
for turn in turns:
parts.append(f"{turn['role'].title()}: {turn['text']}")
combined = " | ".join(parts)
if len(combined) > MEMORY_SUMMARY_LIMIT:
combined = combined[:MEMORY_SUMMARY_LIMIT].rstrip() + "..."
return combined
def _memory_recent_block(record: dict[str, Any]) -> str:
turns = _memory_recent_turns(record)
if not turns:
return ""
lines = ["Recent turns:"]
for turn in turns[-6:]:
lines.append(f"{turn['role'].title()}: {turn['text']}")
return "\n".join(lines)
def _record_turn(record: dict[str, Any], role: str, text: str) -> dict[str, Any]:
if not _memory_is_enabled(record):
return record
normalized = _normalize_session_record(record)
memory = normalized.setdefault("memory", {"summary": "", "turns": []})
turns = memory.setdefault("turns", [])
turns.append({"role": role, "text": text})
turns[:] = turns[-MEMORY_MAX_TURNS * 2 :]
memory["summary"] = _memory_summary(normalized)
return normalized
def _set_chat_mode(record: dict[str, Any], chat_mode: str) -> dict[str, Any]:
normalized = _normalize_session_record(record)
normalized["chat_mode"] = chat_mode if chat_mode in CHAT_MODES else "warm"
return normalized
def _set_builder_state(
record: dict[str, Any],
*,
active: bool,
step: str | None = None,
draft: dict[str, Any] | None = None,
) -> dict[str, Any]:
normalized = _normalize_session_record(record)
normalized["builder"] = {
"active": active,
"step": step,
"draft": draft or {},
}
return normalized
def _builder_state(record: dict[str, Any]) -> dict[str, Any]:
builder = record.get("builder")
if isinstance(builder, dict):
return _normalize_session_record(record)["builder"]
return _default_builder_record()
def _builder_step_prompt(step: str, profile: dict[str, Any]) -> str:
prompts = {
"name": "Send the character name.",
"age": "Send an adult age of 18 or higher.",
"gender": "Send gender or pronouns.",
"style": "Describe the chat style in a few words.",
"voice": "Describe the voice or delivery.",
"mood": "Describe the mood.",
"boundaries": "Describe the boundaries.",
"opening": "Send the opening line.",
}
base = prompts.get(step, "Send the next character detail.")
current = profile.get(step)
if current:
return f"{base} Current: {current}"
return base
def _start_character_builder(record: dict[str, Any]) -> dict[str, Any]:
profile = _active_profile(record)
draft = {field: profile.get(field, "") for field in CHARACTER_FIELDS}
return _set_builder_state(record, active=True, step="name", draft=draft)
def _advance_character_builder(
record: dict[str, Any],
text: str,
) -> tuple[str, dict[str, Any]]:
builder = _builder_state(record)
if not builder.get("active"):
return "", record
step = str(builder.get("step") or "name")
draft = dict(builder.get("draft") or {})
value = text.strip()
if _clean_text_button(value) in {"cancel", "stop", "reset"}:
record = _set_builder_state(record, active=False, step=None, draft={})
return "Character builder cancelled.", record
if step == "age":
try:
age = max(18, int(value))
except Exception:
return "Please send an adult age of 18 or higher.", record
draft[step] = str(age)
else:
draft[step] = value
steps = list(CHARACTER_FIELDS)
current_index = steps.index(step)
if current_index >= len(steps) - 1:
updated = _update_character_overrides(record, draft)
updated = _set_builder_state(updated, active=False, step=None, draft={})
profile = _active_profile(updated)
return (
f"Character created.\n{_format_character_card(profile)}",
updated,
)
next_step = steps[current_index + 1]
updated = _set_builder_state(record, active=True, step=next_step, draft=draft)
profile = _active_profile(updated)
return _builder_step_prompt(next_step, profile), updated
def _format_character_card(profile: dict[str, Any]) -> str:
return (
f"Name: {profile['name']}\n"
f"Age: {profile['age']}\n"
f"Gender: {profile['gender']}\n"
f"Style: {profile['style']}\n"
f"Voice: {profile['voice']}\n"
f"Mood: {profile['mood']}\n"
f"Boundaries: {profile['boundaries']}\n"
f"Opening: {profile['opening']}"
)
def _format_menu(record: dict[str, Any]) -> str:
profile = _active_profile(record)
memory_status = "on" if _memory_is_enabled(record) else "off"
mode = _chat_mode_spec(record)
return (
f"Current persona: {profile['persona_id']}\n"
f"Character: {profile['name']} ({profile['age']}, {profile['gender']})\n"
f"Style: {profile['style']}\n"
f"Mode: {mode['label']} - {mode['description']}\n"
f"Memory: {memory_status}\n"
f"Dataset: {DATASET_ID or 'not connected'}\n\n"
f"Tap a button below to switch persona, change tone, build a character, or reset the session."
)
def _build_character_gallery(record: dict[str, Any]) -> str:
lines = ["Characters:"]
for persona_id in _persona_choices():
persona = _resolve_persona(persona_id)
lines.append(
f"- {persona['name']} ({persona['age']}): {persona['style']} | {persona['opening']}"
)
active = _active_profile(record)
lines.append("")
lines.append(f"Active: {active['name']} | mode {_chat_mode_spec(record)['label']}")
return "\n".join(lines)
def _telegram_persona_button_map() -> dict[str, str]:
mapping: dict[str, str] = {}
for persona_id in _persona_choices():
persona = _resolve_persona(persona_id)
mapping[_clean_text_button(persona["name"])] = persona_id
mapping[_clean_text_button(persona_id)] = persona_id
return mapping
def _telegram_reply_markup(record: dict[str, Any]) -> dict[str, Any]:
persona_names = [str(_resolve_persona(pid)["name"]) for pid in _persona_choices()]
mode_buttons = [mode["label"] for mode in CHAT_MODES.values()]
memory_button = "Memory Off" if _memory_is_enabled(record) else "Memory On"
builder = _builder_state(record)
if builder.get("active"):
return {
"keyboard": [
["Cancel", "Reset", "Menu"],
["Skip"],
],
"resize_keyboard": True,
"is_persistent": True,
"input_field_placeholder": _builder_step_prompt(
str(builder.get("step") or "name"),
_active_profile(record),
),
}
return {
"keyboard": [
["Menu", "Characters", "Build Character"],
mode_buttons,
persona_names,
[memory_button, "Reset", "Help"],
],
"resize_keyboard": True,
"is_persistent": True,
"input_field_placeholder": "Tap a button or send a message",
}
def _parse_key_values(text: str) -> dict[str, str]:
parsed: dict[str, str] = {}
if not text.strip():
return parsed
try:
tokens = shlex.split(text)
except Exception:
tokens = text.split()
for token in tokens:
if "=" not in token:
continue
key, value = token.split("=", 1)
parsed[key.strip().lower()] = value.strip()
return parsed
def _update_character_overrides(
record: dict[str, Any],
updates: dict[str, Any],
) -> dict[str, Any]:
normalized = _normalize_session_record(record)
persona_id = updates.get("persona") or updates.get("preset") or normalized["persona_id"]
persona_id = persona_id if persona_id in _persona_catalog() else normalized["persona_id"]
normalized["persona_id"] = persona_id
if "memory" in updates:
normalized["memory_enabled"] = str(updates["memory"]).lower() in TRUTHY_VALUES
custom = dict(normalized.get("custom_character") or {})
for field in CHARACTER_FIELDS:
if field not in updates:
continue
if field == "age":
try:
custom[field] = max(18, int(updates[field]))
except Exception:
custom[field] = 18
else:
custom[field] = str(updates[field]).strip()
normalized["custom_character"] = custom
return normalized
def _switch_persona(record: dict[str, Any], persona_id: str) -> dict[str, Any]:
persona_id = persona_id.strip()
if persona_id not in _persona_catalog():
return record
normalized = _normalize_session_record(record)
normalized["persona_id"] = persona_id
normalized["custom_character"] = {}
normalized["memory"] = {"summary": "", "turns": []}
return normalized
def _clean_reply(text: str) -> str:
reply = str(text or "").strip()
for marker in ("\nUser:", "\nHuman:", "\nBabble:", "\nAssistant:", "\nFinal:"):
if marker in reply:
reply = reply.split(marker, 1)[0].strip()
reply = re.sub(r"(\([^)]{1,40}\)\s*)\1+", r"\1", reply)
sentences = re.split(r"(?<=[.!?])\s+", reply)
cleaned: list[str] = []
seen: set[str] = set()
for sentence in sentences:
key = sentence.lower().strip()
if key and key not in seen:
cleaned.append(sentence.strip())
seen.add(key)
reply = " ".join(cleaned).strip()
if not reply:
raise RuntimeError("EMPTY_REPLY")
if len(reply) > MAX_REPLY_LENGTH:
reply = reply[:MAX_REPLY_LENGTH].rsplit(" ", 1)[0].rstrip() + "..."
return reply
def _is_safe_message(text: str) -> bool:
normalized = " ".join(text.lower().split())
return not any(term in normalized for term in BLOCKED_TERMS)
@lru_cache(maxsize=1)
def _get_text_generator():
if not _is_local_model_enabled():
raise RuntimeError("LOCAL_MODEL_DISABLED")
if pipeline is None:
raise RuntimeError(f"TRANSFORMERS_MISSING: {_TRANSFORMERS_IMPORT_ERROR}")
logger.info("Loading model: %s", MODEL_ID)
generator = pipeline(
"text-generation",
model=MODEL_ID,
device=-1,
)
logger.info("Model loaded")
return generator
@lru_cache(maxsize=1)
def _get_dataset():
if not DATASET_ENABLED or not DATASET_ID:
return None
if load_dataset is None:
raise RuntimeError(f"DATASETS_MISSING: {_DATASETS_IMPORT_ERROR}")
logger.info("Loading dataset: %s split=%s", DATASET_ID, DATASET_SPLIT)
kwargs: dict[str, Any] = {"split": DATASET_SPLIT}
if HF_TOKEN:
kwargs["token"] = HF_TOKEN
dataset = load_dataset(DATASET_ID, **kwargs)
if len(dataset) > DATASET_MAX_ROWS:
dataset = dataset.select(range(DATASET_MAX_ROWS))
logger.info("Dataset loaded: %s rows=%s", DATASET_ID, len(dataset))
return dataset
def _row_to_text(row: Any) -> str:
if not isinstance(row, dict):
return str(row)
preferred_keys = (
"prompt",
"instruction",
"input",
"chosen",
"response",
"output",
"text",
"messages",
"conversation",
)
parts: list[str] = []
for key in preferred_keys:
value = row.get(key)
if value is None:
continue
if isinstance(value, str):
parts.append(f"{key}: {value}")
else:
try:
parts.append(f"{key}: {json.dumps(value, ensure_ascii=False)}")
except Exception:
parts.append(f"{key}: {value}")
if not parts:
try:
return json.dumps(row, ensure_ascii=False)
except Exception:
return str(row)
return "\n".join(parts)
def _tokenize_for_search(text: str) -> set[str]:
stopwords = {
"the",
"and",
"you",
"your",
"that",
"this",
"with",
"for",
"are",
"but",
"not",
"was",
"have",
"has",
"had",
"what",
"when",
"where",
"how",
"why",
}
return {
token
for token in re.findall(r"[a-zA-Z0-9']{3,}", text.lower())
if token not in stopwords
}
@lru_cache(maxsize=1)
def _dataset_text_index() -> list[str]:
dataset = _get_dataset()
if dataset is None:
return []
rows: list[str] = []
for row in dataset:
text = _row_to_text(row)
text = " ".join(text.split())
if text:
rows.append(text[:DATASET_CONTEXT_CHARS])
return rows
def _retrieve_dataset_context(user_text: str) -> str:
if not DATASET_ENABLED or not DATASET_ID:
return ""
query_tokens = _tokenize_for_search(user_text)
if not query_tokens:
return ""
rows = _dataset_text_index()
if not rows:
return ""
scored: list[tuple[int, str]] = []
for row_text in rows:
row_tokens = _tokenize_for_search(row_text)
score = len(query_tokens & row_tokens)
if score > 0:
scored.append((score, row_text))
if not scored:
return ""
scored.sort(key=lambda item: item[0], reverse=True)
selected = scored[:DATASET_CONTEXT_ROWS]
lines = [
"Dataset style/context examples. Use these for tone and pattern only. Do not quote directly."
]
for index, (_, row_text) in enumerate(selected, start=1):
lines.append(f"Example {index}: {row_text}")
return "\n".join(lines)
def _build_prompt(text: str, profile: dict[str, Any], record: dict[str, Any]) -> str:
mode = _chat_mode_spec(record)
memory_summary = ""
recent_block = ""
if _memory_is_enabled(record):
summary = _memory_summary(record)
if summary:
memory_summary = f"Conversation memory: {summary}\n"
recent_block = _memory_recent_block(record)
dataset_context = _retrieve_dataset_context(text)
dataset_block = ""
if dataset_context:
dataset_block = f"\nDataset guidance:\n{dataset_context}\n"
return (
f"{SYSTEM_PROMPT}\n\n"
f"Character:\n"
f"Name: {profile['name']}\n"
f"Age: {profile['age']}\n"
f"Gender: {profile['gender']}\n"
f"Style: {profile['style']}\n"
f"Voice: {profile['voice']}\n"
f"Mood: {profile['mood']}\n"
f"Boundaries: {profile['boundaries']}\n"
f"Chat mode: {mode['label']} - {mode['description']}\n"
f"Reply style: one or two short paragraphs, emotionally present, specific, and conversational.\n"
f"Do not repeat gestures or sentences. Use at most one action tag.\n"
f"{memory_summary}"
f"{recent_block}\n"
f"{dataset_block}\n"
f"User: {text}\n"
f"{profile['name']}:"
)
def _polish_prompt(draft: str, profile: dict[str, Any], record: dict[str, Any]) -> str:
mode = _chat_mode_spec(record)
memory_summary = ""
if _memory_is_enabled(record):
summary = _memory_summary(record)
if summary:
memory_summary = f"Memory context: {summary}\n"
return (
f"{SYSTEM_PROMPT}\n\n"
f"Rewrite the draft below as the final reply.\n"
f"Keep the same meaning, persona, tone, and continuity.\n"
f"Remove repetition, padding, disclaimers, and meta commentary.\n"
f"Make it feel like natural chat from {profile['name']}.\n"
f"Chat mode: {mode['label']} - {mode['description']}\n"
f"Reply target: one or two short paragraphs.\n"
f"{memory_summary}\n"
f"Draft:\n{draft}\n\n"
f"Final:"
)
def _run_generation(
prompt: str,
*,
max_new_tokens: int,
do_sample: bool,
temperature: float,
top_p: float,
) -> str:
generator = _get_text_generator()
tokenizer = getattr(generator, "tokenizer", None)
eos_id = getattr(tokenizer, "eos_token_id", None) or 0
generation_args: dict[str, Any] = {
"max_new_tokens": max_new_tokens,
"do_sample": do_sample,
"return_full_text": False,
"pad_token_id": eos_id,
"clean_up_tokenization_spaces": False,
"repetition_penalty": 1.18,
"no_repeat_ngram_size": 4,
}
if do_sample:
generation_args["temperature"] = temperature
generation_args["top_p"] = top_p
result = generator(prompt, **generation_args)
return _clean_reply(result[0].get("generated_text", ""))
def _generate_response(
text: str,
profile: dict[str, Any],
record: dict[str, Any],
*,
raise_on_error: bool = False,
) -> str:
if not _is_local_model_enabled():
raise RuntimeError("LOCAL_MODEL_DISABLED")
try:
prompt = _build_prompt(text, profile, record)
draft = _run_generation(
prompt,
max_new_tokens=64,
do_sample=True,
temperature=float(_chat_mode_spec(record)["temperature"]),
top_p=float(_chat_mode_spec(record)["top_p"]),
)
if RESPONSE_PASSES <= 1:
return draft
polish_prompt = _polish_prompt(draft, profile, record)
return _run_generation(
polish_prompt,
max_new_tokens=48,
do_sample=False,
temperature=0.0,
top_p=1.0,
)
except Exception as exc:
logger.exception("Local model generation failed: %s", exc.__class__.__name__)
if raise_on_error:
raise
return f"MODEL_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}"
async def _send_telegram_message(
chat_id: int,
text: str,
reply_markup: dict[str, Any] | None = None,
) -> dict[str, Any]:
token = (
os.getenv("TELEGRAM_BOT_TOKEN", "").strip()
or os.getenv("BOT_TOKEN", "").strip()
)
if not token:
logger.error("TELEGRAM_BOT_TOKEN or BOT_TOKEN is not configured")
raise HTTPException(
status_code=503,
detail="Telegram bot token is not configured",
)
timeout = httpx.Timeout(connect=5.0, read=15.0, write=10.0, pool=10.0)
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
f"https://api.telegram.org/bot{token}/sendMessage",
json={
"chat_id": chat_id,
"text": text,
**({"reply_markup": reply_markup} if reply_markup else {}),
},
)
if response.status_code >= 400:
logger.warning(
"Telegram API request failed with status %s; using webhook fallback",
response.status_code,
)
payload = {
"method": "sendMessage",
"chat_id": chat_id,
"text": text,
}
if reply_markup:
payload["reply_markup"] = reply_markup
return payload
return {
"ok": True,
"sent": True,
}
except httpx.RequestError as exc:
logger.warning(
"Telegram API request failed; using webhook fallback: %s",
exc.__class__.__name__,
)
payload = {
"method": "sendMessage",
"chat_id": chat_id,
"text": text,
}
if reply_markup:
payload["reply_markup"] = reply_markup
return payload
def _extract_telegram_text(update: dict[str, Any]) -> tuple[str, int | None]:
message = update.get("message") or update.get("edited_message") or {}
chat = message.get("chat") or {}
text = message.get("text") or message.get("caption") or ""
chat_id = chat.get("id")
if not isinstance(chat_id, int):
return str(text or "").strip(), None
return str(text or "").strip(), chat_id
def _telegram_reply_for_message(
chat_key: str,
text: str,
) -> tuple[str, dict[str, Any], dict[str, Any] | None]:
record = _get_session_record(chat_key)
profile = _active_profile(record)
normalized_text = _clean_text_button(text)
if not text:
reply = profile.get("opening") or START_RESPONSE
return reply, record, _telegram_reply_markup(record)
if normalized_text in {"", "start"}:
reply = profile.get("opening") or START_RESPONSE
return reply, record, _telegram_reply_markup(record)
if normalized_text in {"menu", "help"}:
return _format_menu(record), record, _telegram_reply_markup(record)
if normalized_text in {"characters"}:
return _build_character_gallery(record), record, _telegram_reply_markup(record)
if normalized_text in {"build character", "new character", "create character"}:
updated = _start_character_builder(record)
return (
"Let’s build a character.\n"
f"{_builder_step_prompt('name', profile)}",
updated,
_telegram_reply_markup(updated),
)
if normalized_text in {"cancel"}:
builder = _builder_state(record)
if builder.get("active"):
updated = _set_builder_state(record, active=False, step=None, draft={})
return "Character builder cancelled.", updated, _telegram_reply_markup(updated)
return "Nothing to cancel.", record, _telegram_reply_markup(record)
if normalized_text in {"reset"}:
updated = _reset_session_record(chat_key)
updated_profile = _active_profile(updated)
return (
f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}",
updated,
_telegram_reply_markup(updated),
)
if normalized_text in {"memory on", "memory off"}:
updated = _normalize_session_record(record)
updated["memory_enabled"] = normalized_text == "memory on"
updated = _set_session_record(chat_key, updated)
state_text = "enabled" if updated["memory_enabled"] else "disabled"
return f"Memory {state_text}.", updated, _telegram_reply_markup(updated)
mode_key = MODE_BUTTON_LOOKUP.get(normalized_text)
if mode_key:
updated = _set_chat_mode(record, mode_key)
updated = _set_session_record(chat_key, updated)
mode = _chat_mode_spec(updated)
return (
f"Tone set to {mode['label']}. {mode['description']}",
updated,
_telegram_reply_markup(updated),
)
persona_map = _telegram_persona_button_map()
if normalized_text in persona_map:
updated = _switch_persona(record, persona_map[normalized_text])
updated = _set_session_record(chat_key, updated)
updated_profile = _active_profile(updated)
return (
f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).",
updated,
_telegram_reply_markup(updated),
)
if normalized_text == "model":
return f"{MODEL_ID}", record, _telegram_reply_markup(record)
if normalized_text.startswith("/"):
if normalized_text == "/persona":
choices = ", ".join(_persona_choices())
return (
f"Current persona: {profile['persona_id']}\n"
f"Available personas: {choices}",
record,
_telegram_reply_markup(record),
)
if normalized_text.startswith("/persona "):
new_persona = text.split(None, 1)[1].strip()
if new_persona not in _persona_catalog():
return (
f"Unknown persona '{new_persona}'. Available personas: {', '.join(_persona_choices())}",
record,
_telegram_reply_markup(record),
)
updated = _switch_persona(record, new_persona)
updated = _set_session_record(chat_key, updated)
updated_profile = _active_profile(updated)
return (
f"Persona switched to {updated_profile['persona_id']} ({updated_profile['name']}).",
updated,
_telegram_reply_markup(updated),
)
if normalized_text == "/character":
return (
"Current character:\n"
f"{_format_character_card(profile)}\n\n"
"Use the Character button to start a guided builder, or send key=value pairs.",
record,
_telegram_reply_markup(record),
)
if normalized_text.startswith("/character "):
raw_args = text.split(None, 1)[1].strip()
updates = _parse_key_values(raw_args)
if "age" in updates:
try:
updates["age"] = str(max(18, int(updates["age"])))
except Exception:
updates["age"] = "18"
updated = _update_character_overrides(record, updates)
updated = _set_session_record(chat_key, updated)
updated_profile = _active_profile(updated)
return (
f"Character updated.\n{_format_character_card(updated_profile)}",
updated,
_telegram_reply_markup(updated),
)
if normalized_text == "/reset":
updated = _reset_session_record(chat_key)
updated_profile = _active_profile(updated)
return (
f"Session reset to persona {updated_profile['persona_id']}.\n{updated_profile['opening']}",
updated,
_telegram_reply_markup(updated),
)
if normalized_text == "/model":
return f"{MODEL_ID}", record, _telegram_reply_markup(record)
if normalized_text == "/dataset":
if not DATASET_ID:
return (
"No dataset connected. Set DATASET_ID and DATASET_SPLIT.",
record,
_telegram_reply_markup(record),
)
try:
dataset = _get_dataset()
return (
f"Dataset: {DATASET_ID}\n"
f"Split: {DATASET_SPLIT}\n"
f"Rows loaded: {len(dataset) if dataset is not None else 0}",
record,
_telegram_reply_markup(record),
)
except Exception as exc:
return (
f"DATASET_FAILED: {exc.__class__.__name__}: {str(exc)[:160]}",
record,
_telegram_reply_markup(record),
)
if normalized_text == "/help":
return HELP_RESPONSE, record, _telegram_reply_markup(record)
if normalized_text == "/menu":
return _format_menu(record), record, _telegram_reply_markup(record)
if normalized_text == "/start":
reply = profile.get("opening") or START_RESPONSE
return reply, record, _telegram_reply_markup(record)
builder = _builder_state(record)
if builder.get("active"):
reply_text, updated = _advance_character_builder(record, text)
if updated is not record:
updated = _set_session_record(chat_key, updated)
return reply_text, updated, _telegram_reply_markup(updated)
if len(text) > MAX_MESSAGE_LENGTH:
return (
f"Message is too long. Please keep it under {MAX_MESSAGE_LENGTH} characters.",
record,
_telegram_reply_markup(record),
)
if not _is_safe_message(text):
return SAFE_REFUSAL, record, _telegram_reply_markup(record)
reply_text = _generate_response(text, profile, record)
if _memory_is_enabled(record):
record = _record_turn(record, "user", text)
record = _record_turn(record, "assistant", reply_text)
record = _set_session_record(chat_key, record)
return reply_text, record, _telegram_reply_markup(record)
def _build_menu_text() -> str:
lines = ["Babble character menu:"]
for persona_id in _persona_choices():
persona = _resolve_persona(persona_id)
lines.append(
f"- {persona_id}: {persona['name']} | {persona['style']} | age {persona['age']}"
)
lines.append("")
lines.append(f"Available personas: {', '.join(_persona_choices())}")
lines.append(
"Use /persona [id] to switch, /character key=value ... to customize, and /reset to clear the session."
)
return "\n".join(lines)
def _debug_profile_snapshot() -> dict[str, Any]:
persona = _resolve_persona(_default_persona_id())
return _normalize_character_profile(persona)
def _create_ui_state() -> dict[str, Any]:
return _default_session_record()
def _state_to_form_values(record: dict[str, Any]) -> list[Any]:
profile = _active_profile(record)
return [
gr.update(value=record["persona_id"], choices=_persona_choices()),
gr.update(value=profile["name"]),
gr.update(value=profile["age"]),
gr.update(value=profile["gender"]),
gr.update(value=profile["style"]),
gr.update(value=profile["voice"]),
gr.update(value=profile["mood"]),
gr.update(value=profile["boundaries"]),
gr.update(value=profile["opening"]),
gr.update(value=bool(record.get("memory_enabled", True))),
gr.update(value=profile),
gr.update(value=_format_menu(record)),
]
def _ui_load_persona(persona_id: str, record: dict[str, Any]) -> tuple[Any, ...]:
updated = _switch_persona(record, persona_id)
return (updated, *_state_to_form_values(updated))
def _ui_save_character(
persona_id: str,
name: str,
age: Any,
gender: str,
style: str,
voice: str,
mood: str,
boundaries: str,
opening: str,
memory_enabled_value: bool,
record: dict[str, Any],
) -> tuple[Any, ...]:
updated = _normalize_session_record(record)
if persona_id in _persona_catalog():
updated["persona_id"] = persona_id
updated["custom_character"] = {
"name": name,
"age": age,
"gender": gender,
"style": style,
"voice": voice,
"mood": mood,
"boundaries": boundaries,
"opening": opening,
}
updated["memory_enabled"] = bool(memory_enabled_value)
return (updated, *_state_to_form_values(updated))
def _ui_reset_session(record: dict[str, Any]) -> tuple[Any, ...]:
updated = _default_session_record()
return (updated, *_state_to_form_values(updated))
def _ui_send_message(
message: str,
chat_history: list[dict[str, str]] | None,
record: dict[str, Any],
) -> tuple[Any, ...]:
text = str(message or "").strip()
history = list(chat_history or [])
if not text:
return "", history, history, record, _format_menu(record)
profile = _active_profile(record)
reply = _generate_response(text, profile, record)
if _memory_is_enabled(record):
record = _record_turn(record, "user", text)
record = _record_turn(record, "assistant", reply)
history.append({"role": "user", "content": text})
history.append({"role": "assistant", "content": reply})
return "", history, history, record, _format_menu(record)
def build_demo():
if gr is None:
raise RuntimeError("Gradio is required to run Babble.") from _GRADIO_IMPORT_ERROR
with gr.Blocks(title="Babble") as demo:
state = gr.State(_create_ui_state())
chat_history = gr.State([])
gr.Markdown("# Babble")
gr.Markdown(
"Adult romance-leaning character chat with persona presets, custom character editing, memory, and dataset-guided context."
)
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(label="Conversation", height=540, type="messages")
message = gr.Textbox(
label="Message",
placeholder="Send a test message",
)
send = gr.Button("Send")
with gr.Column(scale=1):
persona = gr.Dropdown(
choices=_persona_choices(),
value=_default_persona_id(),
label="Persona preset",
)
load_persona = gr.Button("Load Persona")
reset_session = gr.Button("Reset Session")
name = gr.Textbox(label="Name")
age = gr.Number(label="Adult age", precision=0)
gender = gr.Textbox(label="Gender / pronouns")
style = gr.Textbox(label="Style")
voice = gr.Textbox(label="Voice")
mood = gr.Textbox(label="Mood")
boundaries = gr.Textbox(label="Boundaries")
opening = gr.Textbox(label="Opening line")
memory_enabled = gr.Checkbox(label="Memory enabled", value=MEMORY_ENABLED)
save_character = gr.Button("Save Character")
character_preview = gr.JSON(label="Active character")
menu_preview = gr.Textbox(label="Menu", lines=10)
status = gr.Textbox(label="Session status")
demo.load(
fn=lambda record: _state_to_form_values(record),
inputs=state,
outputs=[
persona,
name,
age,
gender,
style,
voice,
mood,
boundaries,
opening,
memory_enabled,
character_preview,
menu_preview,
],
)
load_persona.click(
fn=_ui_load_persona,
inputs=[persona, state],
outputs=[
state,
persona,
name,
age,
gender,
style,
voice,
mood,
boundaries,
opening,
memory_enabled,
character_preview,
menu_preview,
],
)
save_character.click(
fn=_ui_save_character,
inputs=[
persona,
name,
age,
gender,
style,
voice,
mood,
boundaries,
opening,
memory_enabled,
state,
],
outputs=[
state,
persona,
name,
age,
gender,
style,
voice,
mood,
boundaries,
opening,
memory_enabled,
character_preview,
menu_preview,
],
)
reset_session.click(
fn=_ui_reset_session,
inputs=[state],
outputs=[
state,
persona,
name,
age,
gender,
style,
voice,
mood,
boundaries,
opening,
memory_enabled,
character_preview,
menu_preview,
],
)
send.click(
fn=_ui_send_message,
inputs=[message, chat_history, state],
outputs=[message, chatbot, chat_history, state, menu_preview],
)
message.submit(
fn=_ui_send_message,
inputs=[message, chat_history, state],
outputs=[message, chatbot, chat_history, state, menu_preview],
)
return demo
def create_app() -> FastAPI:
@asynccontextmanager
async def lifespan(app: FastAPI):
if REQUIRE_MODEL and _is_local_model_enabled():
logger.info("Preloading model...")
_get_text_generator()
if DATASET_ENABLED and DATASET_ID:
logger.info("Preloading dataset...")
try:
_get_dataset()
_dataset_text_index()
except Exception:
logger.exception("Dataset preload failed")
yield
fastapi_app = FastAPI(title="Babble", version=APP_VERSION, lifespan=lifespan)
@fastapi_app.get("/")
async def root() -> dict[str, Any]:
return {
"ok": True,
"service": "babble",
"status": "running",
"version": APP_VERSION,
"routes": [
"/health",
"/model",
"/dataset",
"/persona",
"/menu",
"/model/test",
"/debug/config",
"/debug/model",
],
}
@fastapi_app.get("/health")
async def health() -> dict[str, Any]:
return {
"ok": True,
"service": "babble",
"version": APP_VERSION,
"model": MODEL_ID,
"loaded": _get_text_generator.cache_info().currsize > 0,
"persona": _default_persona_id(),
"dataset_connected": bool(DATASET_ID),
}
@fastapi_app.get("/model")
async def model() -> dict[str, Any]:
profile = _debug_profile_snapshot()
return {
"model_id": MODEL_ID,
"enabled": _is_local_model_enabled(),
"loaded": _get_text_generator.cache_info().currsize > 0,
"transformers": pipeline is not None,
"transformers_import_error": (
None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300]
),
"persona_id": profile["persona_id"],
"persona_name": profile["name"],
"memory_enabled": MEMORY_ENABLED,
"memory_path": str(MEMORY_PATH),
}
@fastapi_app.get("/dataset")
async def dataset_status() -> dict[str, Any]:
if not DATASET_ENABLED:
return {
"ok": False,
"enabled": False,
"error": "DATASET_DISABLED",
}
if not DATASET_ID:
return {
"ok": False,
"enabled": True,
"error": "DATASET_ID missing",
}
if load_dataset is None:
return {
"ok": False,
"enabled": True,
"dataset_id": DATASET_ID,
"error": "datasets import failed",
"detail": str(_DATASETS_IMPORT_ERROR)[:300],
}
dataset = _get_dataset()
index = _dataset_text_index()
return {
"ok": True,
"enabled": True,
"dataset_id": DATASET_ID,
"split": DATASET_SPLIT,
"rows_loaded": len(dataset) if dataset is not None else 0,
"indexed_rows": len(index),
"columns": dataset.column_names if dataset is not None else [],
"sample": dataset[0] if dataset is not None and len(dataset) else None,
}
@fastapi_app.get("/persona")
async def persona_menu() -> dict[str, Any]:
profile = _debug_profile_snapshot()
return {
"active_persona": profile,
"available_personas": {
persona_id: _resolve_persona(persona_id)
for persona_id in _persona_choices()
},
}
@fastapi_app.get("/menu")
async def menu() -> dict[str, str]:
return {
"text": _build_menu_text(),
}
@fastapi_app.get("/model/test")
async def test() -> dict[str, Any]:
profile = _debug_profile_snapshot()
record = _default_session_record()
return {
"reply": _generate_response("Say hello briefly.", profile, record),
"persona_id": profile["persona_id"],
}
@fastapi_app.post("/telegram/webhook/{secret}")
async def telegram_webhook(secret: str, update: dict[str, Any]) -> dict[str, Any]:
logger.info("Telegram update received")
expected_secret = os.getenv("WEBHOOK_SECRET", "").strip()
if not expected_secret:
logger.error("WEBHOOK_SECRET is not configured")
raise HTTPException(
status_code=503,
detail="Webhook is not configured",
)
if not hmac.compare_digest(secret, expected_secret):
logger.warning("Rejected webhook request with invalid secret")
raise HTTPException(
status_code=403,
detail="Invalid webhook secret",
)
text, chat_id = _extract_telegram_text(update)
logger.info("Telegram chat_id found: %s", chat_id is not None)
logger.info("Telegram message text found: %s", bool(text))
if chat_id is None:
return {
"ok": True,
"ignored": "missing_chat_id",
}
reply_text, record, reply_markup = _telegram_reply_for_message(
_session_key(chat_id),
text,
)
_set_session_record(_session_key(chat_id), record)
return await _send_telegram_message(chat_id, reply_text, reply_markup)
@fastapi_app.get("/debug/config")
async def debug_config() -> dict[str, Any]:
if not _is_debug_enabled():
raise HTTPException(status_code=404, detail="Not found")
profile = _debug_profile_snapshot()
return {
"debug": True,
"service": "babble",
"version": APP_VERSION,
"llm_provider": _sanitize_provider(),
"model_id": MODEL_ID,
"local_model_enabled": _is_local_model_enabled(),
"transformers_available": pipeline is not None,
"transformers_import_error": (
None if pipeline is not None else str(_TRANSFORMERS_IMPORT_ERROR)[:300]
),
"persona_id": profile["persona_id"],
"persona_name": profile["name"],
"persona_count": len(_persona_catalog()),
"memory_enabled": MEMORY_ENABLED,
"memory_path": str(MEMORY_PATH),
"memory_max_turns": MEMORY_MAX_TURNS,
"telegram_bot_token_configured": (
_configured("TELEGRAM_BOT_TOKEN") or _configured("BOT_TOKEN")
),
"webhook_secret_configured": _configured("WEBHOOK_SECRET"),
"llm_api_key_configured": _configured("LLM_API_KEY"),
"reply_mode": "telegram_direct_send_with_webhook_fallback",
"dataset_enabled": DATASET_ENABLED,
"dataset_id": DATASET_ID,
"dataset_split": DATASET_SPLIT,
"datasets_available": load_dataset is not None,
"datasets_import_error": (
None if load_dataset is not None else str(_DATASETS_IMPORT_ERROR)[:300]
),
"hf_token_configured": bool(HF_TOKEN),
}
@fastapi_app.get("/debug/model")
async def debug_model() -> dict[str, Any]:
if not _is_debug_enabled():
raise HTTPException(status_code=404, detail="Not found")
try:
profile = _debug_profile_snapshot()
reply = _generate_response(
"Say hello in one short sentence.",
profile,
_default_session_record(),
raise_on_error=True,
)
return {
"ok": True,
"model_id": MODEL_ID,
"persona_id": profile["persona_id"],
"reply": reply,
}
except Exception as exc:
logger.exception("Debug model test failed")
return {
"ok": False,
"error": exc.__class__.__name__,
"detail": str(exc)[:200],
}
if gr is None:
logger.warning("Gradio unavailable; serving API endpoints only")
return fastapi_app
return gr.mount_gradio_app(fastapi_app, build_demo(), path="/")
app = create_app()
if __name__ == "__main__":
import uvicorn
uvicorn.run(
app,
host="0.0.0.0",
port=int(os.getenv("BABBLE_PORT", os.getenv("PORT", "7860"))),
log_level=os.getenv("LOG_LEVEL", "info").lower(),
)