Spaces:
Running
Running
File size: 27,763 Bytes
40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 d1ce419 0c4cd3b 40cf485 0c4cd3b d1ce419 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 40cf485 d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b d1ce419 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 86f6b4e 0c4cd3b 86f6b4e 0c4cd3b 86f6b4e 40cf485 86f6b4e 40cf485 86f6b4e 40cf485 86f6b4e 0c4cd3b 86f6b4e 40cf485 86f6b4e 40cf485 1a4f133 40cf485 0c4cd3b 1a4f133 40cf485 1a4f133 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 1a4f133 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 0c4cd3b 40cf485 86f6b4e 40cf485 0c4cd3b 86f6b4e 0c4cd3b 86f6b4e 0c4cd3b 40cf485 0c4cd3b 40cf485 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 | """LifeOS reasoning engine.
Text reasoning runs on one small model — NVIDIA Nemotron-3-Nano-4B (Q4_K_M
GGUF, 2.84GB) — 100% locally through the llama.cpp runtime (llama-cpp-python).
Deterministic feature code curates a small context (memory slices + RAG
recall); the model only does the judgment + explanation layer. That division
is what makes a 4B on 2 vCPUs feel smart.
Food photos additionally use a small vision-language model — Qwen2.5-VL-3B
(Q4_K_M GGUF) — for perception only: it identifies the food items in an image,
which Nemotron then reasons about against memory. The VLM is loaded lazily on
the first photo, so the text-only path never pays for it.
"""
import logging
import os
import re
import threading
from collections.abc import Iterator
import cuda_bootstrap
import config
import memory as memory_store
import rag
logger = logging.getLogger(__name__)
cuda_bootstrap.ensure() # register CUDA runtime DLL dirs before llama_cpp loads
MODEL_REPO = config.MODEL_REPO
MODEL_FILE = config.MODEL_FILE
# Fallback (plain llama arch) if the hybrid Mamba arch is unsupported by the
# installed llama.cpp: bartowski/nvidia_Llama-3.1-Nemotron-Nano-4B-v1.1-GGUF
# Vision model for food-photo recognition. Nemotron is text-only and cannot
# "see" an image, so a small vision-language model handles perception: it
# identifies the food items in a photo. The identified items are then fed to
# Nemotron, which does the memory-grounded judgment (dietary fit, suggestions).
# Q4_K_M (~2.4GB) + the f16 multimodal projector that encodes the image.
VLM_REPO = config.VLM_REPO
VLM_FILE = config.VLM_FILE
VLM_MMPROJ_FILE = config.VLM_MMPROJ_FILE
_llm = None
_llm_lock = threading.Lock()
_vlm = None
_vlm_lock = threading.Lock()
# GPU offload: number of model layers to push to the GPU. -1 = all layers
# (full offload), 0 = CPU only. Requires a CUDA/Metal/Vulkan build of
# llama-cpp-python — the plain CPU wheel ignores this and stays on CPU.
GPU_LAYERS = config.GPU_LAYERS
# Observable load state for the UI / status endpoint. One of:
# "idle" (not loaded yet), "loading", "ready", "error".
ACTIVE_BACKEND = None
MODEL_STATE = "idle"
MODEL_ERROR = None
class ModelUnavailable(RuntimeError):
"""Raised when the local model cannot be loaded (bad/missing wheel, failed
download, out of memory). Callers stream a friendly message instead."""
def status() -> dict:
"""Current model state for the /status endpoint and UI indicator."""
return {"state": MODEL_STATE, "backend": ACTIVE_BACKEND, "error": MODEL_ERROR}
def _load_llm(n_gpu_layers: int):
import cuda_bootstrap
cuda_bootstrap.ensure()
from llama_cpp import Llama
cores = os.cpu_count() or 2
# When fully offloaded to the GPU the text model needs almost no CPU threads;
# keeping its pool small leaves cores free for the CPU-bound vision model
# that runs on food-photo uploads (otherwise the two oversubscribe the CPU).
n_threads = max(2, cores // 2) if n_gpu_layers != 0 else cores
return Llama.from_pretrained(
repo_id=MODEL_REPO,
filename=MODEL_FILE,
n_ctx=8192,
n_threads=n_threads,
n_gpu_layers=n_gpu_layers,
verbose=False,
)
def get_llm():
"""Load the model once. Try GPU offload first; if the GPU build is missing
or crashes (bad wheel, no VRAM, driver mismatch), fall back to CPU so the
app still runs. Honors LIFEOS_GPU_LAYERS=0 to skip the GPU attempt.
Updates MODEL_STATE so the UI can show loading/ready/error. On total
failure raises ModelUnavailable so callers can stream a friendly message
instead of a raw 500."""
global _llm, ACTIVE_BACKEND, MODEL_STATE, MODEL_ERROR
if _llm is not None:
return _llm
MODEL_STATE = "loading"
if GPU_LAYERS != 0:
try:
_llm = _load_llm(GPU_LAYERS)
ACTIVE_BACKEND = "gpu"
MODEL_STATE, MODEL_ERROR = "ready", None
logger.info("model loaded on GPU (n_gpu_layers=%s)", GPU_LAYERS)
return _llm
except BaseException as e: # noqa: BLE001 — incl. OSError/illegal-instr
logger.warning("GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e)
_llm = None
try:
_llm = _load_llm(0)
except BaseException as e: # noqa: BLE001 — download/format/runtime failure
MODEL_STATE, MODEL_ERROR = "error", f"{type(e).__name__}: {e}"
logger.error("model load failed on CPU: %s", MODEL_ERROR)
raise ModelUnavailable(MODEL_ERROR) from e
ACTIVE_BACKEND = "cpu"
MODEL_STATE, MODEL_ERROR = "ready", None
logger.info("model loaded on CPU")
return _llm
def _load_vlm(n_gpu_layers: int):
import cuda_bootstrap
cuda_bootstrap.ensure()
from llama_cpp import Llama
from llama_cpp.llama_chat_format import Qwen25VLChatHandler
# The chat handler downloads + owns the multimodal projector (mmproj) that
# turns the image into tokens the model can attend to.
handler = Qwen25VLChatHandler.from_pretrained(
repo_id=VLM_REPO,
filename=VLM_MMPROJ_FILE,
verbose=False,
)
return Llama.from_pretrained(
repo_id=VLM_REPO,
filename=VLM_FILE,
chat_handler=handler,
n_ctx=4096,
n_threads=os.cpu_count() or 2,
n_gpu_layers=n_gpu_layers,
verbose=False,
)
VLM_GPU_LAYERS = config.VLM_GPU_LAYERS
def get_vlm():
"""Lazily load the vision-language model (used only for food photos). Loaded
on first photo so the text-only path never pays for it. Defaults to CPU
(VLM_GPU_LAYERS=0) so it doesn't fight the resident text model for VRAM on
small cards; if a GPU attempt is configured but fails, falls back to CPU."""
global _vlm
if _vlm is not None:
return _vlm
if VLM_GPU_LAYERS != 0:
try:
_vlm = _load_vlm(VLM_GPU_LAYERS)
logger.info("VLM loaded on GPU (n_gpu_layers=%s)", VLM_GPU_LAYERS)
return _vlm
except BaseException as e: # noqa: BLE001
logger.warning("VLM GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e)
_vlm = None
try:
_vlm = _load_vlm(0)
except BaseException as e: # noqa: BLE001
raise ModelUnavailable(f"vision model unavailable: {type(e).__name__}: {e}") from e
logger.info("VLM loaded on CPU")
return _vlm
_FOOD_VISION_PROMPT = (
"You are a food-recognition assistant. Look at this photo and list the food "
"and drink items you can see. Break composed dishes into their visible "
"components — e.g. a pizza becomes its toppings (crust, tomato sauce, "
"mozzarella, basil); a plate of toast with egg becomes each item. If it is "
"a grocery receipt or a label, read the product names instead. Respond with "
"ONLY a bulleted list — one item per line starting with '- ', using plain "
"common names (e.g. '- fried egg', '- whole-grain toast', '- cherry "
"tomatoes'). Add a rough quantity when obvious. Aim for 3-8 items. Ignore "
"plates, bowls, cutlery, and packaging. Do not add commentary, nutrition "
"facts, or headings."
)
# Longest-side cap for the image fed to the VLM. On this CPU path a full-res
# photo decodes ~1000 image tokens (~36s); 768px cuts that ~4x to a few seconds
# with no loss in food-recognition quality.
VLM_MAX_IMAGE_SIDE = config.VLM_MAX_IMAGE_SIDE
def _image_data_uri(path: str) -> str:
"""Downscale the photo to VLM_MAX_IMAGE_SIDE and return a JPEG data URI.
Falls back to the raw bytes if Pillow can't open it."""
import base64
import io
try:
from PIL import Image
im = Image.open(path)
if im.mode not in ("RGB", "L"):
im = im.convert("RGB")
w, h = im.size
scale = VLM_MAX_IMAGE_SIDE / max(w, h)
if scale < 1:
im = im.resize((max(1, int(w * scale)), max(1, int(h * scale))))
buf = io.BytesIO()
im.convert("RGB").save(buf, format="JPEG", quality=88)
data = buf.getvalue()
mime = "jpeg"
except Exception: # unreadable by Pillow — send original bytes
with open(path, "rb") as f:
data = f.read()
ext = os.path.splitext(path)[1].lstrip(".").lower() or "jpeg"
mime = "jpeg" if ext in ("jpg", "jpeg") else ext
return f"data:image/{mime};base64," + base64.b64encode(data).decode("ascii")
def _dedupe_food_items(text: str) -> str:
"""Keep unique '- item' bullet lines (the small VLM sometimes repeats), in
order, capped to 8 — so the identified-items list stays tight."""
seen, items = set(), []
for line in text.splitlines():
line = line.strip().lstrip("-*•").strip()
if not line:
continue
key = line.lower()
if key in seen:
continue
seen.add(key)
items.append(f"- {line}")
if len(items) >= 8:
break
return "\n".join(items)
def describe_food_image(path: str) -> str:
"""Identify the food items visible in a photo using the vision model.
Returns a short, de-duplicated bulleted list of items (also works on
receipts/labels by reading product names). This is the perception step; the
memory-grounded analysis is done separately by run_domain("meal_photo", …)."""
data_uri = _image_data_uri(path)
vlm = get_vlm()
with _vlm_lock:
out = vlm.create_chat_completion(
messages=[
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": data_uri}},
{"type": "text", "text": _FOOD_VISION_PROMPT},
],
}
],
max_tokens=160,
temperature=0.2,
)
raw = strip_think(out["choices"][0]["message"]["content"] or "").strip()
return _dedupe_food_items(raw)
def warmup() -> None:
"""Load the text model at startup so the first request isn't a cold start.
The vision model is loaded lazily on the first food photo. A load failure
is swallowed here — MODEL_STATE captures it and requests surface a friendly
message — so the web server still comes up and serves the UI."""
try:
get_llm()
except ModelUnavailable:
pass # state already set to "error"; UI will show it
# Load the embedder now (before any food-photo VLM load) and seed demo
# notes when in demo mode.
try:
rag.warmup()
except Exception as e: # embedder optional — recall just returns []
logger.warning("embedder warmup failed: %s", e)
if config.DEMO:
rag.ensure_seeded()
# This Nemotron GGUF always "thinks out loud" in plain prose and ignores
# /no_think and "detailed thinking off". Rather than fight it, we let it reason,
# ask it to keep reasoning short and mark the answer with a delimiter, and strip
# everything before the answer server-side (see ANSWER_DELIM / _clean_response).
# The stripper is anchor-based, so it stays clean even when the model forgets
# the delimiter under a long prompt.
ANSWER_DELIM = "==ANSWER=="
SYSTEM_BASE = (
"You are LifeOS, a sharp, friendly personal assistant running 100% locally "
"on {pos} own machine.\n"
"Think briefly first if you must, then write a line containing exactly "
+ ANSWER_DELIM + " followed by the final answer for {name}. Keep any "
"reasoning short; the user only sees what comes after " + ANSWER_DELIM + ".\n"
"The final answer is concise and concrete: lead with bold key items and "
"short bullet lists, ground every claim in the provided memory (quote "
"specific dishes, dates, prices, habits), and never invent data not in the "
"context."
)
DOMAIN_INSTRUCTIONS = {
"food": (
"Task: recommend exactly 3 recipes for this week. For each, give the "
"recipe name, which flyer deals it uses (with prices), estimated cost, "
"and a one-line 'why' that references both the deals and what {name} "
"cooked recently (favor variety — avoid repeating recent main "
"ingredients). Respect dietary preferences strictly."
),
"health": (
"Task: recommend tomorrow's exercise. Consider the recent workout "
"pattern, muscle-group rotation, rest balance, and the fitness goal. "
"Give one clear recommendation (type + duration), then 2-3 bullet "
"points of reasoning referencing specific recent workouts and any "
"known injury constraints."
),
"money": (
"Task: review the detected recurring subscriptions against income and "
"budget. Classify each as CANCEL, KEEP, or WATCH with a one-line "
"plain-language reason (reference cost, last-used date, and overlap "
"with other services). End with the total monthly savings if all "
"CANCEL items are dropped and what that money could fund."
),
"goal": (
"Task: act as a Socratic financial-goal coach for {name}. Ask exactly "
"ONE probing question at a time — why this goal matters, what tradeoffs "
"they'd accept, whether the timeline is realistic given income and "
"monthly payments, what spending they would cut. Keep each turn short. "
"After roughly 3-4 exchanges (use the conversation history to judge), "
"stop questioning and summarize a concrete savings plan: monthly amount "
"to set aside, what to cut, and the realistic completion date, checked "
"against {pos} income and monthly payments."
),
"meal_photo": (
"Task: a vision model has identified the food items in a photo of "
"{pos} meal (or read a grocery receipt). Using that item list, write "
"a short, well-structured markdown response with EXACTLY these three "
"sections:\n"
"**Identified** — a tight bullet list of the items, each in **bold**.\n"
"**How it fits** — 2-3 bullets on how these choices line up with "
"{pos} dietary preferences and fitness goal, calling out specific "
"items and a rough protein read.\n"
"**Buy next** — 2-3 suggested items that better fit their goals and "
"budget, each with a one-line reason.\n"
"Keep it concise. Use bullets and bold; do not invent items that were "
"not identified."
),
"payment_impact": (
"Task: {name} just updated their monthly payments. Explain how their "
"total monthly payments affect reaching their savings goal(s). Compute "
"money left to save = monthly income − total monthly payments, then for "
"each goal estimate how many months the remaining amount (target − "
"saved) will take at that rate and whether the deadline is realistic.\n"
"Format the answer EXACTLY like this, with real line breaks:\n"
"**<one-line headline with the key number>**\n"
"- <goal name>: <remaining $>, <months> at <$/mo>, deadline <date> — on "
"track / behind\n"
"Use one bullet per goal, each on its OWN line. Be concrete with dollar "
"figures. If there are no goals, reply with one short line instead."
),
"chat": (
"Task: answer the question using everything you know about {name} "
"across food, fitness, and finances. Cross-reference domains when "
"useful. If asked to plan, produce a compact, actionable plan."
),
}
def _slice_for_domain(domain: str, mem: dict) -> dict:
profile = mem["user_profile"]
finances = mem.get("finances", {})
if domain == "food":
return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)}
if domain == "meal_photo":
return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)}
if domain == "health":
return {
"user_profile": profile,
"workouts_last_14_days": memory_store.workouts_in_window(14, mem),
"calendar_next_7_days": memory_store.events_in_window(7, mem),
"workout_schedule": mem.get("workout_schedule", {}),
}
if domain in ("money", "goal", "payment_impact"):
return {
"user_profile": profile,
"finances": finances,
"monthly_payments": finances.get("monthly_payments", []),
"goals": mem.get("goals", []),
}
return { # chat sees everything
"user_profile": profile,
"recent_meals": memory_store.recent_meals(7, mem),
"workouts_last_14_days": memory_store.workouts_in_window(14, mem),
"calendar_next_7_days": memory_store.events_in_window(7, mem),
"workout_schedule": mem.get("workout_schedule", {}),
"finances": finances,
"goals": mem.get("goals", []),
}
def slice_for_domains(mem: dict, domains: list[str]) -> dict:
"""Merged memory slice for selected domains ("kitchen"->food); profile always included."""
alias = {"kitchen": "food"}
merged = {"user_profile": mem["user_profile"]}
for d in domains:
merged.update(_slice_for_domain(alias.get(d, d), mem))
return merged
def _fmt(obj, indent=0) -> str:
pad = " " * indent
if isinstance(obj, dict):
return "\n".join(f"{pad}{k}: {_fmt(v, indent + 1).lstrip() if not isinstance(v, (dict, list)) else chr(10) + _fmt(v, indent + 1)}" for k, v in obj.items())
if isinstance(obj, list):
return "\n".join(f"{pad}- {_fmt(x, indent + 1).lstrip()}" if not isinstance(x, (dict, list)) else f"{pad}-\n{_fmt(x, indent + 1)}" for x in obj)
return f"{pad}{obj}"
def _names(profile: dict) -> tuple[str, str, str]:
"""(address, possessive, header) for prompts. Falls back gracefully when a
new user hasn't set their name yet, so prompts never read "'s machine"."""
name = (profile.get("name") or profile.get("first_name") or "").strip()
if name:
return name, f"{name}'s", f"{name.upper()}'S MEMORY"
return "you", "your", "YOUR MEMORY"
def build_prompt(domain: str, mem: dict, user_input: str, domains: list[str] | None = None) -> list[dict]:
"""Assemble [system, user] messages: domain template + short-term memory
slice + long-term RAG recall. `domains` narrows the memory slice to only
the referenced domains (chat refs); None keeps the default slice."""
name, pos, header = _names(mem["user_profile"])
recall_query = user_input or DOMAIN_INSTRUCTIONS[domain]
notes = rag.recall(f"{domain}: {recall_query}", k=5)
system = SYSTEM_BASE.format(name=name, pos=pos)
if domain in DOMAIN_INSTRUCTIONS:
system += "\n\n" + DOMAIN_INSTRUCTIONS[domain].format(name=name, pos=pos)
mem_slice = slice_for_domains(mem, domains) if domains else _slice_for_domain(domain, mem)
parts = [f"=== {header} ===", _fmt(mem_slice)]
if notes:
parts.append("\n=== LONG-TERM NOTES (recalled) ===")
parts.extend(f"- {n['text']}" for n in notes)
parts.append("\n=== REQUEST ===")
parts.append(user_input.strip() if user_input.strip() else "(Use the task instructions above.)")
# Recency nudge: a final instruction at the very end of the user turn is the
# most reliable way to stop this reasoning-happy GGUF from burning the token
# budget thinking out loud. It jumps almost straight to the delimiter, which
# _clean_response strips — giving fast, clean answers.
parts.append(
"\n\nIMPORTANT: Do NOT think step by step or explain your reasoning. "
"Immediately write " + ANSWER_DELIM + " then the final answer."
)
return [
{"role": "system", "content": system},
{"role": "user", "content": "\n".join(parts)},
]
_THINK_RE = re.compile(r"<think>.*?(?:</think>|$)", re.DOTALL)
# A line that begins a markdown block — the real answer almost always starts
# with one of these across every domain (bold lead, header, bullet, number,
# table row, blockquote).
_MD_ANCHOR = re.compile(r"^(?:\*\*|#{1,6}\s|[-*+]\s|\d+[.)]\s|\||>\s?)")
# Plain-prose lines that are the model thinking out loud, not answer content.
# This GGUF reasons in first-person prose ("We need to…", "Let's compute…",
# "Now classify…", "Let's produce:") before writing the markdown answer.
_REASONING = re.compile(
r"(?i)\b(?:we (?:need|should|must|can|have to|could|want|'?ll)|let'?s\b|so we\b|"
r"the user (?:wants|needs|asks|is)|plain text|private reasoning|"
r"is (?:discarded|hidden)|then (?:markdown|final|the answer|answer)|"
r"first[,:]? |probably\b|i think\b|okay[,:]|now (?:let|we|i|classify|compute)|"
r"let'?s (?:produce|craft|compute|do|output)|markdown:|answer:?$|maybe\b|actually\b)"
)
# Trailing afterthoughts the model sometimes tacks on AFTER the answer
# ("But months 0.3 seems weird.", "Wait, let me recheck."). Trimmed from the end.
_TRAILING_META = re.compile(
r"(?i)^(?:but|wait|hmm+|note|actually|hold on|let me|i should|that|this|"
r"however)\b.*\b(?:seem|weird|odd|wrong|off|recalc|double|check|sure|"
r"strange|recompute|verify)\b|^(?:wait|hmm+)\b"
)
def strip_think(text: str) -> str:
"""Remove <think>…</think> blocks (also handles an unclosed one mid-stream)."""
return _THINK_RE.sub("", text).lstrip()
def _is_reasoning_line(line: str) -> bool:
return bool(_REASONING.search(line))
def _strip_to_last_delimiter(text: str) -> str:
"""Cut to the answer using the model's reasoning markers.
ANSWER_DELIM reliably marks where the answer STARTS, so we keep what's after
the last one. A bare </think> (no opening tag) is ambiguous: usually it ends
a reasoning block that PRECEDES the answer, but sometimes the model emits it
AFTER the answer (trailing). We disambiguate by whether real content follows
it — substantial text after </think> is the answer; otherwise the answer is
what came before."""
text = _THINK_RE.sub("", text) # drop any well-formed <think>…</think>
if ANSWER_DELIM in text:
text = text.rsplit(ANSWER_DELIM, 1)[-1]
if "</think>" in text:
before, _, after = text.rpartition("</think>")
text = after if len(after.strip()) >= 8 else before
return text.strip()
def _trim_trailing_meta(text: str) -> str:
"""Drop trailing blank / afterthought lines the model adds after the answer."""
lines = text.split("\n")
while lines and (not lines[-1].strip() or _TRAILING_META.search(lines[-1].strip())):
lines.pop()
return "\n".join(lines).strip()
def _clean_response(text: str) -> str:
"""Return only the user-facing answer, hiding the model's chain-of-thought.
The model reasons in plain prose then writes a markdown answer. Strategy:
1. drop <think> blocks; if it emitted ANSWER_DELIM, keep only what follows;
2. otherwise, if the text reads as reasoning and a markdown block appears
later, jump to that first markdown line (the answer);
3. while still mid-reasoning with no answer in sight, return "" so the UI
keeps showing its thinking state instead of the raw reasoning.
Returns the text unchanged when nothing looks like reasoning — genuine
answers pass through untouched."""
text = _strip_to_last_delimiter(text)
lines = text.strip().split("\n")
nonempty = [l for l in lines if l.strip()]
if not nonempty:
return ""
anchor = next((i for i, l in enumerate(lines) if _MD_ANCHOR.match(l.strip())), None)
looks_reasoning = any(_is_reasoning_line(l) for l in nonempty)
if anchor is not None:
pre = [l for l in lines[:anchor] if l.strip()]
# Jump to the answer when reasoning precedes the first markdown block.
if pre and any(_is_reasoning_line(l) for l in pre):
return _trim_trailing_meta("\n".join(lines[anchor:]).strip())
return _trim_trailing_meta(text.strip())
# No markdown block yet. If it's pure reasoning, hide it (streaming);
# the end-of-stream fallback will recover the answer if one exists.
return "" if looks_reasoning else _trim_trailing_meta(text.strip())
def _final_answer(text: str) -> str:
"""End-of-stream fallback: best-effort answer even if the model never wrote
a markdown block or delimiter (e.g. a plain one-line coaching question).
Drops leading reasoning lines; returns the raw text if that empties it."""
cleaned = _clean_response(text)
if cleaned:
return cleaned
body = _strip_to_last_delimiter(text)
lines = body.strip().split("\n")
while lines and (not lines[0].strip() or _is_reasoning_line(lines[0])):
lines.pop(0)
return _trim_trailing_meta("\n".join(lines).strip()) or body.strip()
_MODEL_ERROR_MSG = (
"⚠️ The local model couldn't start on this machine. Check that "
"llama-cpp-python is installed for your hardware and that there's enough "
"memory, then restart LifeOS. (Details are in the server log.)"
)
def generate_stream(
messages: list[dict],
max_tokens: int = 1024,
temperature: float = 0.4,
domain: str = "chat",
extra_context: str = "",
) -> Iterator[str]:
"""Yield cumulative user-facing response text.
The model reasons out loud and marks the answer with ANSWER_DELIM. We hide
everything until the delimiter appears, then stream the cleaned answer
(see _clean_response). If the model never emits the delimiter, we fall back
to a best-effort clean so the user is never left with an empty reply.
extra_context (e.g. web search results) is appended to the final user
message when non-empty. If the model can't be loaded, yields a single
friendly message rather than raising — the UI shows it inline.
"""
if extra_context:
messages = list(messages)
for i in range(len(messages) - 1, -1, -1):
if messages[i].get("role") == "user":
messages[i] = {
"role": "user",
"content": messages[i]["content"] + "\n\n=== WEB CONTEXT ===\n" + extra_context,
}
break
try:
llm = get_llm()
except ModelUnavailable:
yield _MODEL_ERROR_MSG
return
acc = ""
last = ""
try:
with _llm_lock:
for chunk in llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
stream=True,
):
delta = chunk["choices"][0].get("delta", {})
acc += delta.get("content") or ""
# _clean_response returns "" while the model is still reasoning,
# so the UI keeps its "thinking…" state until the answer starts.
cleaned = _clean_response(acc)
if cleaned and cleaned != last:
last = cleaned
yield cleaned
except Exception as e: # inference-time failure (e.g. OOM mid-generation)
logger.error("generation failed (%s): %s", domain, e)
if not last and not acc:
yield _MODEL_ERROR_MSG
return
# If nothing surfaced (model never wrote a markdown answer/delimiter), fall
# back to a best-effort strip so the reply is never blank.
if not last and acc:
fallback = _final_answer(acc)
if fallback:
yield fallback
def run_domain(domain: str, user_input: str = "", max_tokens: int = 1024) -> Iterator[str]:
"""One-call helper: load memory, build prompt, stream the answer."""
mem = memory_store.load()
messages = build_prompt(domain, mem, user_input)
yield from generate_stream(messages, max_tokens=max_tokens, domain=domain)
|