lifeos / engine.py
awaisaziz's picture
Add config, model status, and VLM support
0c4cd3b
Raw
History Blame Contribute Delete
27.8 kB
"""LifeOS reasoning engine.
Text reasoning runs on one small model — NVIDIA Nemotron-3-Nano-4B (Q4_K_M
GGUF, 2.84GB) — 100% locally through the llama.cpp runtime (llama-cpp-python).
Deterministic feature code curates a small context (memory slices + RAG
recall); the model only does the judgment + explanation layer. That division
is what makes a 4B on 2 vCPUs feel smart.
Food photos additionally use a small vision-language model — Qwen2.5-VL-3B
(Q4_K_M GGUF) — for perception only: it identifies the food items in an image,
which Nemotron then reasons about against memory. The VLM is loaded lazily on
the first photo, so the text-only path never pays for it.
"""
import logging
import os
import re
import threading
from collections.abc import Iterator
import cuda_bootstrap
import config
import memory as memory_store
import rag
logger = logging.getLogger(__name__)
cuda_bootstrap.ensure() # register CUDA runtime DLL dirs before llama_cpp loads
MODEL_REPO = config.MODEL_REPO
MODEL_FILE = config.MODEL_FILE
# Fallback (plain llama arch) if the hybrid Mamba arch is unsupported by the
# installed llama.cpp: bartowski/nvidia_Llama-3.1-Nemotron-Nano-4B-v1.1-GGUF
# Vision model for food-photo recognition. Nemotron is text-only and cannot
# "see" an image, so a small vision-language model handles perception: it
# identifies the food items in a photo. The identified items are then fed to
# Nemotron, which does the memory-grounded judgment (dietary fit, suggestions).
# Q4_K_M (~2.4GB) + the f16 multimodal projector that encodes the image.
VLM_REPO = config.VLM_REPO
VLM_FILE = config.VLM_FILE
VLM_MMPROJ_FILE = config.VLM_MMPROJ_FILE
_llm = None
_llm_lock = threading.Lock()
_vlm = None
_vlm_lock = threading.Lock()
# GPU offload: number of model layers to push to the GPU. -1 = all layers
# (full offload), 0 = CPU only. Requires a CUDA/Metal/Vulkan build of
# llama-cpp-python — the plain CPU wheel ignores this and stays on CPU.
GPU_LAYERS = config.GPU_LAYERS
# Observable load state for the UI / status endpoint. One of:
# "idle" (not loaded yet), "loading", "ready", "error".
ACTIVE_BACKEND = None
MODEL_STATE = "idle"
MODEL_ERROR = None
class ModelUnavailable(RuntimeError):
"""Raised when the local model cannot be loaded (bad/missing wheel, failed
download, out of memory). Callers stream a friendly message instead."""
def status() -> dict:
"""Current model state for the /status endpoint and UI indicator."""
return {"state": MODEL_STATE, "backend": ACTIVE_BACKEND, "error": MODEL_ERROR}
def _load_llm(n_gpu_layers: int):
import cuda_bootstrap
cuda_bootstrap.ensure()
from llama_cpp import Llama
cores = os.cpu_count() or 2
# When fully offloaded to the GPU the text model needs almost no CPU threads;
# keeping its pool small leaves cores free for the CPU-bound vision model
# that runs on food-photo uploads (otherwise the two oversubscribe the CPU).
n_threads = max(2, cores // 2) if n_gpu_layers != 0 else cores
return Llama.from_pretrained(
repo_id=MODEL_REPO,
filename=MODEL_FILE,
n_ctx=8192,
n_threads=n_threads,
n_gpu_layers=n_gpu_layers,
verbose=False,
)
def get_llm():
"""Load the model once. Try GPU offload first; if the GPU build is missing
or crashes (bad wheel, no VRAM, driver mismatch), fall back to CPU so the
app still runs. Honors LIFEOS_GPU_LAYERS=0 to skip the GPU attempt.
Updates MODEL_STATE so the UI can show loading/ready/error. On total
failure raises ModelUnavailable so callers can stream a friendly message
instead of a raw 500."""
global _llm, ACTIVE_BACKEND, MODEL_STATE, MODEL_ERROR
if _llm is not None:
return _llm
MODEL_STATE = "loading"
if GPU_LAYERS != 0:
try:
_llm = _load_llm(GPU_LAYERS)
ACTIVE_BACKEND = "gpu"
MODEL_STATE, MODEL_ERROR = "ready", None
logger.info("model loaded on GPU (n_gpu_layers=%s)", GPU_LAYERS)
return _llm
except BaseException as e: # noqa: BLE001 — incl. OSError/illegal-instr
logger.warning("GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e)
_llm = None
try:
_llm = _load_llm(0)
except BaseException as e: # noqa: BLE001 — download/format/runtime failure
MODEL_STATE, MODEL_ERROR = "error", f"{type(e).__name__}: {e}"
logger.error("model load failed on CPU: %s", MODEL_ERROR)
raise ModelUnavailable(MODEL_ERROR) from e
ACTIVE_BACKEND = "cpu"
MODEL_STATE, MODEL_ERROR = "ready", None
logger.info("model loaded on CPU")
return _llm
def _load_vlm(n_gpu_layers: int):
import cuda_bootstrap
cuda_bootstrap.ensure()
from llama_cpp import Llama
from llama_cpp.llama_chat_format import Qwen25VLChatHandler
# The chat handler downloads + owns the multimodal projector (mmproj) that
# turns the image into tokens the model can attend to.
handler = Qwen25VLChatHandler.from_pretrained(
repo_id=VLM_REPO,
filename=VLM_MMPROJ_FILE,
verbose=False,
)
return Llama.from_pretrained(
repo_id=VLM_REPO,
filename=VLM_FILE,
chat_handler=handler,
n_ctx=4096,
n_threads=os.cpu_count() or 2,
n_gpu_layers=n_gpu_layers,
verbose=False,
)
VLM_GPU_LAYERS = config.VLM_GPU_LAYERS
def get_vlm():
"""Lazily load the vision-language model (used only for food photos). Loaded
on first photo so the text-only path never pays for it. Defaults to CPU
(VLM_GPU_LAYERS=0) so it doesn't fight the resident text model for VRAM on
small cards; if a GPU attempt is configured but fails, falls back to CPU."""
global _vlm
if _vlm is not None:
return _vlm
if VLM_GPU_LAYERS != 0:
try:
_vlm = _load_vlm(VLM_GPU_LAYERS)
logger.info("VLM loaded on GPU (n_gpu_layers=%s)", VLM_GPU_LAYERS)
return _vlm
except BaseException as e: # noqa: BLE001
logger.warning("VLM GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e)
_vlm = None
try:
_vlm = _load_vlm(0)
except BaseException as e: # noqa: BLE001
raise ModelUnavailable(f"vision model unavailable: {type(e).__name__}: {e}") from e
logger.info("VLM loaded on CPU")
return _vlm
_FOOD_VISION_PROMPT = (
"You are a food-recognition assistant. Look at this photo and list the food "
"and drink items you can see. Break composed dishes into their visible "
"components — e.g. a pizza becomes its toppings (crust, tomato sauce, "
"mozzarella, basil); a plate of toast with egg becomes each item. If it is "
"a grocery receipt or a label, read the product names instead. Respond with "
"ONLY a bulleted list — one item per line starting with '- ', using plain "
"common names (e.g. '- fried egg', '- whole-grain toast', '- cherry "
"tomatoes'). Add a rough quantity when obvious. Aim for 3-8 items. Ignore "
"plates, bowls, cutlery, and packaging. Do not add commentary, nutrition "
"facts, or headings."
)
# Longest-side cap for the image fed to the VLM. On this CPU path a full-res
# photo decodes ~1000 image tokens (~36s); 768px cuts that ~4x to a few seconds
# with no loss in food-recognition quality.
VLM_MAX_IMAGE_SIDE = config.VLM_MAX_IMAGE_SIDE
def _image_data_uri(path: str) -> str:
"""Downscale the photo to VLM_MAX_IMAGE_SIDE and return a JPEG data URI.
Falls back to the raw bytes if Pillow can't open it."""
import base64
import io
try:
from PIL import Image
im = Image.open(path)
if im.mode not in ("RGB", "L"):
im = im.convert("RGB")
w, h = im.size
scale = VLM_MAX_IMAGE_SIDE / max(w, h)
if scale < 1:
im = im.resize((max(1, int(w * scale)), max(1, int(h * scale))))
buf = io.BytesIO()
im.convert("RGB").save(buf, format="JPEG", quality=88)
data = buf.getvalue()
mime = "jpeg"
except Exception: # unreadable by Pillow — send original bytes
with open(path, "rb") as f:
data = f.read()
ext = os.path.splitext(path)[1].lstrip(".").lower() or "jpeg"
mime = "jpeg" if ext in ("jpg", "jpeg") else ext
return f"data:image/{mime};base64," + base64.b64encode(data).decode("ascii")
def _dedupe_food_items(text: str) -> str:
"""Keep unique '- item' bullet lines (the small VLM sometimes repeats), in
order, capped to 8 — so the identified-items list stays tight."""
seen, items = set(), []
for line in text.splitlines():
line = line.strip().lstrip("-*•").strip()
if not line:
continue
key = line.lower()
if key in seen:
continue
seen.add(key)
items.append(f"- {line}")
if len(items) >= 8:
break
return "\n".join(items)
def describe_food_image(path: str) -> str:
"""Identify the food items visible in a photo using the vision model.
Returns a short, de-duplicated bulleted list of items (also works on
receipts/labels by reading product names). This is the perception step; the
memory-grounded analysis is done separately by run_domain("meal_photo", …)."""
data_uri = _image_data_uri(path)
vlm = get_vlm()
with _vlm_lock:
out = vlm.create_chat_completion(
messages=[
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": data_uri}},
{"type": "text", "text": _FOOD_VISION_PROMPT},
],
}
],
max_tokens=160,
temperature=0.2,
)
raw = strip_think(out["choices"][0]["message"]["content"] or "").strip()
return _dedupe_food_items(raw)
def warmup() -> None:
"""Load the text model at startup so the first request isn't a cold start.
The vision model is loaded lazily on the first food photo. A load failure
is swallowed here — MODEL_STATE captures it and requests surface a friendly
message — so the web server still comes up and serves the UI."""
try:
get_llm()
except ModelUnavailable:
pass # state already set to "error"; UI will show it
# Load the embedder now (before any food-photo VLM load) and seed demo
# notes when in demo mode.
try:
rag.warmup()
except Exception as e: # embedder optional — recall just returns []
logger.warning("embedder warmup failed: %s", e)
if config.DEMO:
rag.ensure_seeded()
# This Nemotron GGUF always "thinks out loud" in plain prose and ignores
# /no_think and "detailed thinking off". Rather than fight it, we let it reason,
# ask it to keep reasoning short and mark the answer with a delimiter, and strip
# everything before the answer server-side (see ANSWER_DELIM / _clean_response).
# The stripper is anchor-based, so it stays clean even when the model forgets
# the delimiter under a long prompt.
ANSWER_DELIM = "==ANSWER=="
SYSTEM_BASE = (
"You are LifeOS, a sharp, friendly personal assistant running 100% locally "
"on {pos} own machine.\n"
"Think briefly first if you must, then write a line containing exactly "
+ ANSWER_DELIM + " followed by the final answer for {name}. Keep any "
"reasoning short; the user only sees what comes after " + ANSWER_DELIM + ".\n"
"The final answer is concise and concrete: lead with bold key items and "
"short bullet lists, ground every claim in the provided memory (quote "
"specific dishes, dates, prices, habits), and never invent data not in the "
"context."
)
DOMAIN_INSTRUCTIONS = {
"food": (
"Task: recommend exactly 3 recipes for this week. For each, give the "
"recipe name, which flyer deals it uses (with prices), estimated cost, "
"and a one-line 'why' that references both the deals and what {name} "
"cooked recently (favor variety — avoid repeating recent main "
"ingredients). Respect dietary preferences strictly."
),
"health": (
"Task: recommend tomorrow's exercise. Consider the recent workout "
"pattern, muscle-group rotation, rest balance, and the fitness goal. "
"Give one clear recommendation (type + duration), then 2-3 bullet "
"points of reasoning referencing specific recent workouts and any "
"known injury constraints."
),
"money": (
"Task: review the detected recurring subscriptions against income and "
"budget. Classify each as CANCEL, KEEP, or WATCH with a one-line "
"plain-language reason (reference cost, last-used date, and overlap "
"with other services). End with the total monthly savings if all "
"CANCEL items are dropped and what that money could fund."
),
"goal": (
"Task: act as a Socratic financial-goal coach for {name}. Ask exactly "
"ONE probing question at a time — why this goal matters, what tradeoffs "
"they'd accept, whether the timeline is realistic given income and "
"monthly payments, what spending they would cut. Keep each turn short. "
"After roughly 3-4 exchanges (use the conversation history to judge), "
"stop questioning and summarize a concrete savings plan: monthly amount "
"to set aside, what to cut, and the realistic completion date, checked "
"against {pos} income and monthly payments."
),
"meal_photo": (
"Task: a vision model has identified the food items in a photo of "
"{pos} meal (or read a grocery receipt). Using that item list, write "
"a short, well-structured markdown response with EXACTLY these three "
"sections:\n"
"**Identified** — a tight bullet list of the items, each in **bold**.\n"
"**How it fits** — 2-3 bullets on how these choices line up with "
"{pos} dietary preferences and fitness goal, calling out specific "
"items and a rough protein read.\n"
"**Buy next** — 2-3 suggested items that better fit their goals and "
"budget, each with a one-line reason.\n"
"Keep it concise. Use bullets and bold; do not invent items that were "
"not identified."
),
"payment_impact": (
"Task: {name} just updated their monthly payments. Explain how their "
"total monthly payments affect reaching their savings goal(s). Compute "
"money left to save = monthly income − total monthly payments, then for "
"each goal estimate how many months the remaining amount (target − "
"saved) will take at that rate and whether the deadline is realistic.\n"
"Format the answer EXACTLY like this, with real line breaks:\n"
"**<one-line headline with the key number>**\n"
"- <goal name>: <remaining $>, <months> at <$/mo>, deadline <date> — on "
"track / behind\n"
"Use one bullet per goal, each on its OWN line. Be concrete with dollar "
"figures. If there are no goals, reply with one short line instead."
),
"chat": (
"Task: answer the question using everything you know about {name} "
"across food, fitness, and finances. Cross-reference domains when "
"useful. If asked to plan, produce a compact, actionable plan."
),
}
def _slice_for_domain(domain: str, mem: dict) -> dict:
profile = mem["user_profile"]
finances = mem.get("finances", {})
if domain == "food":
return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)}
if domain == "meal_photo":
return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)}
if domain == "health":
return {
"user_profile": profile,
"workouts_last_14_days": memory_store.workouts_in_window(14, mem),
"calendar_next_7_days": memory_store.events_in_window(7, mem),
"workout_schedule": mem.get("workout_schedule", {}),
}
if domain in ("money", "goal", "payment_impact"):
return {
"user_profile": profile,
"finances": finances,
"monthly_payments": finances.get("monthly_payments", []),
"goals": mem.get("goals", []),
}
return { # chat sees everything
"user_profile": profile,
"recent_meals": memory_store.recent_meals(7, mem),
"workouts_last_14_days": memory_store.workouts_in_window(14, mem),
"calendar_next_7_days": memory_store.events_in_window(7, mem),
"workout_schedule": mem.get("workout_schedule", {}),
"finances": finances,
"goals": mem.get("goals", []),
}
def slice_for_domains(mem: dict, domains: list[str]) -> dict:
"""Merged memory slice for selected domains ("kitchen"->food); profile always included."""
alias = {"kitchen": "food"}
merged = {"user_profile": mem["user_profile"]}
for d in domains:
merged.update(_slice_for_domain(alias.get(d, d), mem))
return merged
def _fmt(obj, indent=0) -> str:
pad = " " * indent
if isinstance(obj, dict):
return "\n".join(f"{pad}{k}: {_fmt(v, indent + 1).lstrip() if not isinstance(v, (dict, list)) else chr(10) + _fmt(v, indent + 1)}" for k, v in obj.items())
if isinstance(obj, list):
return "\n".join(f"{pad}- {_fmt(x, indent + 1).lstrip()}" if not isinstance(x, (dict, list)) else f"{pad}-\n{_fmt(x, indent + 1)}" for x in obj)
return f"{pad}{obj}"
def _names(profile: dict) -> tuple[str, str, str]:
"""(address, possessive, header) for prompts. Falls back gracefully when a
new user hasn't set their name yet, so prompts never read "'s machine"."""
name = (profile.get("name") or profile.get("first_name") or "").strip()
if name:
return name, f"{name}'s", f"{name.upper()}'S MEMORY"
return "you", "your", "YOUR MEMORY"
def build_prompt(domain: str, mem: dict, user_input: str, domains: list[str] | None = None) -> list[dict]:
"""Assemble [system, user] messages: domain template + short-term memory
slice + long-term RAG recall. `domains` narrows the memory slice to only
the referenced domains (chat refs); None keeps the default slice."""
name, pos, header = _names(mem["user_profile"])
recall_query = user_input or DOMAIN_INSTRUCTIONS[domain]
notes = rag.recall(f"{domain}: {recall_query}", k=5)
system = SYSTEM_BASE.format(name=name, pos=pos)
if domain in DOMAIN_INSTRUCTIONS:
system += "\n\n" + DOMAIN_INSTRUCTIONS[domain].format(name=name, pos=pos)
mem_slice = slice_for_domains(mem, domains) if domains else _slice_for_domain(domain, mem)
parts = [f"=== {header} ===", _fmt(mem_slice)]
if notes:
parts.append("\n=== LONG-TERM NOTES (recalled) ===")
parts.extend(f"- {n['text']}" for n in notes)
parts.append("\n=== REQUEST ===")
parts.append(user_input.strip() if user_input.strip() else "(Use the task instructions above.)")
# Recency nudge: a final instruction at the very end of the user turn is the
# most reliable way to stop this reasoning-happy GGUF from burning the token
# budget thinking out loud. It jumps almost straight to the delimiter, which
# _clean_response strips — giving fast, clean answers.
parts.append(
"\n\nIMPORTANT: Do NOT think step by step or explain your reasoning. "
"Immediately write " + ANSWER_DELIM + " then the final answer."
)
return [
{"role": "system", "content": system},
{"role": "user", "content": "\n".join(parts)},
]
_THINK_RE = re.compile(r"<think>.*?(?:</think>|$)", re.DOTALL)
# A line that begins a markdown block — the real answer almost always starts
# with one of these across every domain (bold lead, header, bullet, number,
# table row, blockquote).
_MD_ANCHOR = re.compile(r"^(?:\*\*|#{1,6}\s|[-*+]\s|\d+[.)]\s|\||>\s?)")
# Plain-prose lines that are the model thinking out loud, not answer content.
# This GGUF reasons in first-person prose ("We need to…", "Let's compute…",
# "Now classify…", "Let's produce:") before writing the markdown answer.
_REASONING = re.compile(
r"(?i)\b(?:we (?:need|should|must|can|have to|could|want|'?ll)|let'?s\b|so we\b|"
r"the user (?:wants|needs|asks|is)|plain text|private reasoning|"
r"is (?:discarded|hidden)|then (?:markdown|final|the answer|answer)|"
r"first[,:]? |probably\b|i think\b|okay[,:]|now (?:let|we|i|classify|compute)|"
r"let'?s (?:produce|craft|compute|do|output)|markdown:|answer:?$|maybe\b|actually\b)"
)
# Trailing afterthoughts the model sometimes tacks on AFTER the answer
# ("But months 0.3 seems weird.", "Wait, let me recheck."). Trimmed from the end.
_TRAILING_META = re.compile(
r"(?i)^(?:but|wait|hmm+|note|actually|hold on|let me|i should|that|this|"
r"however)\b.*\b(?:seem|weird|odd|wrong|off|recalc|double|check|sure|"
r"strange|recompute|verify)\b|^(?:wait|hmm+)\b"
)
def strip_think(text: str) -> str:
"""Remove <think>…</think> blocks (also handles an unclosed one mid-stream)."""
return _THINK_RE.sub("", text).lstrip()
def _is_reasoning_line(line: str) -> bool:
return bool(_REASONING.search(line))
def _strip_to_last_delimiter(text: str) -> str:
"""Cut to the answer using the model's reasoning markers.
ANSWER_DELIM reliably marks where the answer STARTS, so we keep what's after
the last one. A bare </think> (no opening tag) is ambiguous: usually it ends
a reasoning block that PRECEDES the answer, but sometimes the model emits it
AFTER the answer (trailing). We disambiguate by whether real content follows
it — substantial text after </think> is the answer; otherwise the answer is
what came before."""
text = _THINK_RE.sub("", text) # drop any well-formed <think>…</think>
if ANSWER_DELIM in text:
text = text.rsplit(ANSWER_DELIM, 1)[-1]
if "</think>" in text:
before, _, after = text.rpartition("</think>")
text = after if len(after.strip()) >= 8 else before
return text.strip()
def _trim_trailing_meta(text: str) -> str:
"""Drop trailing blank / afterthought lines the model adds after the answer."""
lines = text.split("\n")
while lines and (not lines[-1].strip() or _TRAILING_META.search(lines[-1].strip())):
lines.pop()
return "\n".join(lines).strip()
def _clean_response(text: str) -> str:
"""Return only the user-facing answer, hiding the model's chain-of-thought.
The model reasons in plain prose then writes a markdown answer. Strategy:
1. drop <think> blocks; if it emitted ANSWER_DELIM, keep only what follows;
2. otherwise, if the text reads as reasoning and a markdown block appears
later, jump to that first markdown line (the answer);
3. while still mid-reasoning with no answer in sight, return "" so the UI
keeps showing its thinking state instead of the raw reasoning.
Returns the text unchanged when nothing looks like reasoning — genuine
answers pass through untouched."""
text = _strip_to_last_delimiter(text)
lines = text.strip().split("\n")
nonempty = [l for l in lines if l.strip()]
if not nonempty:
return ""
anchor = next((i for i, l in enumerate(lines) if _MD_ANCHOR.match(l.strip())), None)
looks_reasoning = any(_is_reasoning_line(l) for l in nonempty)
if anchor is not None:
pre = [l for l in lines[:anchor] if l.strip()]
# Jump to the answer when reasoning precedes the first markdown block.
if pre and any(_is_reasoning_line(l) for l in pre):
return _trim_trailing_meta("\n".join(lines[anchor:]).strip())
return _trim_trailing_meta(text.strip())
# No markdown block yet. If it's pure reasoning, hide it (streaming);
# the end-of-stream fallback will recover the answer if one exists.
return "" if looks_reasoning else _trim_trailing_meta(text.strip())
def _final_answer(text: str) -> str:
"""End-of-stream fallback: best-effort answer even if the model never wrote
a markdown block or delimiter (e.g. a plain one-line coaching question).
Drops leading reasoning lines; returns the raw text if that empties it."""
cleaned = _clean_response(text)
if cleaned:
return cleaned
body = _strip_to_last_delimiter(text)
lines = body.strip().split("\n")
while lines and (not lines[0].strip() or _is_reasoning_line(lines[0])):
lines.pop(0)
return _trim_trailing_meta("\n".join(lines).strip()) or body.strip()
_MODEL_ERROR_MSG = (
"⚠️ The local model couldn't start on this machine. Check that "
"llama-cpp-python is installed for your hardware and that there's enough "
"memory, then restart LifeOS. (Details are in the server log.)"
)
def generate_stream(
messages: list[dict],
max_tokens: int = 1024,
temperature: float = 0.4,
domain: str = "chat",
extra_context: str = "",
) -> Iterator[str]:
"""Yield cumulative user-facing response text.
The model reasons out loud and marks the answer with ANSWER_DELIM. We hide
everything until the delimiter appears, then stream the cleaned answer
(see _clean_response). If the model never emits the delimiter, we fall back
to a best-effort clean so the user is never left with an empty reply.
extra_context (e.g. web search results) is appended to the final user
message when non-empty. If the model can't be loaded, yields a single
friendly message rather than raising — the UI shows it inline.
"""
if extra_context:
messages = list(messages)
for i in range(len(messages) - 1, -1, -1):
if messages[i].get("role") == "user":
messages[i] = {
"role": "user",
"content": messages[i]["content"] + "\n\n=== WEB CONTEXT ===\n" + extra_context,
}
break
try:
llm = get_llm()
except ModelUnavailable:
yield _MODEL_ERROR_MSG
return
acc = ""
last = ""
try:
with _llm_lock:
for chunk in llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=True,
):
delta = chunk["choices"][0].get("delta", {})
acc += delta.get("content") or ""
# _clean_response returns "" while the model is still reasoning,
# so the UI keeps its "thinking…" state until the answer starts.
cleaned = _clean_response(acc)
if cleaned and cleaned != last:
last = cleaned
yield cleaned
except Exception as e: # inference-time failure (e.g. OOM mid-generation)
logger.error("generation failed (%s): %s", domain, e)
if not last and not acc:
yield _MODEL_ERROR_MSG
return
# If nothing surfaced (model never wrote a markdown answer/delimiter), fall
# back to a best-effort strip so the reply is never blank.
if not last and acc:
fallback = _final_answer(acc)
if fallback:
yield fallback
def run_domain(domain: str, user_input: str = "", max_tokens: int = 1024) -> Iterator[str]:
"""One-call helper: load memory, build prompt, stream the answer."""
mem = memory_store.load()
messages = build_prompt(domain, mem, user_input)
yield from generate_stream(messages, max_tokens=max_tokens, domain=domain)