"""LifeOS reasoning engine. Text reasoning runs on one small model — NVIDIA Nemotron-3-Nano-4B (Q4_K_M GGUF, 2.84GB) — 100% locally through the llama.cpp runtime (llama-cpp-python). Deterministic feature code curates a small context (memory slices + RAG recall); the model only does the judgment + explanation layer. That division is what makes a 4B on 2 vCPUs feel smart. Food photos additionally use a small vision-language model — Qwen2.5-VL-3B (Q4_K_M GGUF) — for perception only: it identifies the food items in an image, which Nemotron then reasons about against memory. The VLM is loaded lazily on the first photo, so the text-only path never pays for it. """ import logging import os import re import threading from collections.abc import Iterator import cuda_bootstrap import config import memory as memory_store import rag logger = logging.getLogger(__name__) cuda_bootstrap.ensure() # register CUDA runtime DLL dirs before llama_cpp loads MODEL_REPO = config.MODEL_REPO MODEL_FILE = config.MODEL_FILE # Fallback (plain llama arch) if the hybrid Mamba arch is unsupported by the # installed llama.cpp: bartowski/nvidia_Llama-3.1-Nemotron-Nano-4B-v1.1-GGUF # Vision model for food-photo recognition. Nemotron is text-only and cannot # "see" an image, so a small vision-language model handles perception: it # identifies the food items in a photo. The identified items are then fed to # Nemotron, which does the memory-grounded judgment (dietary fit, suggestions). # Q4_K_M (~2.4GB) + the f16 multimodal projector that encodes the image. VLM_REPO = config.VLM_REPO VLM_FILE = config.VLM_FILE VLM_MMPROJ_FILE = config.VLM_MMPROJ_FILE _llm = None _llm_lock = threading.Lock() _vlm = None _vlm_lock = threading.Lock() # GPU offload: number of model layers to push to the GPU. -1 = all layers # (full offload), 0 = CPU only. Requires a CUDA/Metal/Vulkan build of # llama-cpp-python — the plain CPU wheel ignores this and stays on CPU. GPU_LAYERS = config.GPU_LAYERS # Observable load state for the UI / status endpoint. One of: # "idle" (not loaded yet), "loading", "ready", "error". ACTIVE_BACKEND = None MODEL_STATE = "idle" MODEL_ERROR = None class ModelUnavailable(RuntimeError): """Raised when the local model cannot be loaded (bad/missing wheel, failed download, out of memory). Callers stream a friendly message instead.""" def status() -> dict: """Current model state for the /status endpoint and UI indicator.""" return {"state": MODEL_STATE, "backend": ACTIVE_BACKEND, "error": MODEL_ERROR} def _load_llm(n_gpu_layers: int): import cuda_bootstrap cuda_bootstrap.ensure() from llama_cpp import Llama cores = os.cpu_count() or 2 # When fully offloaded to the GPU the text model needs almost no CPU threads; # keeping its pool small leaves cores free for the CPU-bound vision model # that runs on food-photo uploads (otherwise the two oversubscribe the CPU). n_threads = max(2, cores // 2) if n_gpu_layers != 0 else cores return Llama.from_pretrained( repo_id=MODEL_REPO, filename=MODEL_FILE, n_ctx=8192, n_threads=n_threads, n_gpu_layers=n_gpu_layers, verbose=False, ) def get_llm(): """Load the model once. Try GPU offload first; if the GPU build is missing or crashes (bad wheel, no VRAM, driver mismatch), fall back to CPU so the app still runs. Honors LIFEOS_GPU_LAYERS=0 to skip the GPU attempt. Updates MODEL_STATE so the UI can show loading/ready/error. On total failure raises ModelUnavailable so callers can stream a friendly message instead of a raw 500.""" global _llm, ACTIVE_BACKEND, MODEL_STATE, MODEL_ERROR if _llm is not None: return _llm MODEL_STATE = "loading" if GPU_LAYERS != 0: try: _llm = _load_llm(GPU_LAYERS) ACTIVE_BACKEND = "gpu" MODEL_STATE, MODEL_ERROR = "ready", None logger.info("model loaded on GPU (n_gpu_layers=%s)", GPU_LAYERS) return _llm except BaseException as e: # noqa: BLE001 — incl. OSError/illegal-instr logger.warning("GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e) _llm = None try: _llm = _load_llm(0) except BaseException as e: # noqa: BLE001 — download/format/runtime failure MODEL_STATE, MODEL_ERROR = "error", f"{type(e).__name__}: {e}" logger.error("model load failed on CPU: %s", MODEL_ERROR) raise ModelUnavailable(MODEL_ERROR) from e ACTIVE_BACKEND = "cpu" MODEL_STATE, MODEL_ERROR = "ready", None logger.info("model loaded on CPU") return _llm def _load_vlm(n_gpu_layers: int): import cuda_bootstrap cuda_bootstrap.ensure() from llama_cpp import Llama from llama_cpp.llama_chat_format import Qwen25VLChatHandler # The chat handler downloads + owns the multimodal projector (mmproj) that # turns the image into tokens the model can attend to. handler = Qwen25VLChatHandler.from_pretrained( repo_id=VLM_REPO, filename=VLM_MMPROJ_FILE, verbose=False, ) return Llama.from_pretrained( repo_id=VLM_REPO, filename=VLM_FILE, chat_handler=handler, n_ctx=4096, n_threads=os.cpu_count() or 2, n_gpu_layers=n_gpu_layers, verbose=False, ) VLM_GPU_LAYERS = config.VLM_GPU_LAYERS def get_vlm(): """Lazily load the vision-language model (used only for food photos). Loaded on first photo so the text-only path never pays for it. Defaults to CPU (VLM_GPU_LAYERS=0) so it doesn't fight the resident text model for VRAM on small cards; if a GPU attempt is configured but fails, falls back to CPU.""" global _vlm if _vlm is not None: return _vlm if VLM_GPU_LAYERS != 0: try: _vlm = _load_vlm(VLM_GPU_LAYERS) logger.info("VLM loaded on GPU (n_gpu_layers=%s)", VLM_GPU_LAYERS) return _vlm except BaseException as e: # noqa: BLE001 logger.warning("VLM GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e) _vlm = None try: _vlm = _load_vlm(0) except BaseException as e: # noqa: BLE001 raise ModelUnavailable(f"vision model unavailable: {type(e).__name__}: {e}") from e logger.info("VLM loaded on CPU") return _vlm _FOOD_VISION_PROMPT = ( "You are a food-recognition assistant. Look at this photo and list the food " "and drink items you can see. Break composed dishes into their visible " "components — e.g. a pizza becomes its toppings (crust, tomato sauce, " "mozzarella, basil); a plate of toast with egg becomes each item. If it is " "a grocery receipt or a label, read the product names instead. Respond with " "ONLY a bulleted list — one item per line starting with '- ', using plain " "common names (e.g. '- fried egg', '- whole-grain toast', '- cherry " "tomatoes'). Add a rough quantity when obvious. Aim for 3-8 items. Ignore " "plates, bowls, cutlery, and packaging. Do not add commentary, nutrition " "facts, or headings." ) # Longest-side cap for the image fed to the VLM. On this CPU path a full-res # photo decodes ~1000 image tokens (~36s); 768px cuts that ~4x to a few seconds # with no loss in food-recognition quality. VLM_MAX_IMAGE_SIDE = config.VLM_MAX_IMAGE_SIDE def _image_data_uri(path: str) -> str: """Downscale the photo to VLM_MAX_IMAGE_SIDE and return a JPEG data URI. Falls back to the raw bytes if Pillow can't open it.""" import base64 import io try: from PIL import Image im = Image.open(path) if im.mode not in ("RGB", "L"): im = im.convert("RGB") w, h = im.size scale = VLM_MAX_IMAGE_SIDE / max(w, h) if scale < 1: im = im.resize((max(1, int(w * scale)), max(1, int(h * scale)))) buf = io.BytesIO() im.convert("RGB").save(buf, format="JPEG", quality=88) data = buf.getvalue() mime = "jpeg" except Exception: # unreadable by Pillow — send original bytes with open(path, "rb") as f: data = f.read() ext = os.path.splitext(path)[1].lstrip(".").lower() or "jpeg" mime = "jpeg" if ext in ("jpg", "jpeg") else ext return f"data:image/{mime};base64," + base64.b64encode(data).decode("ascii") def _dedupe_food_items(text: str) -> str: """Keep unique '- item' bullet lines (the small VLM sometimes repeats), in order, capped to 8 — so the identified-items list stays tight.""" seen, items = set(), [] for line in text.splitlines(): line = line.strip().lstrip("-*•").strip() if not line: continue key = line.lower() if key in seen: continue seen.add(key) items.append(f"- {line}") if len(items) >= 8: break return "\n".join(items) def describe_food_image(path: str) -> str: """Identify the food items visible in a photo using the vision model. Returns a short, de-duplicated bulleted list of items (also works on receipts/labels by reading product names). This is the perception step; the memory-grounded analysis is done separately by run_domain("meal_photo", …).""" data_uri = _image_data_uri(path) vlm = get_vlm() with _vlm_lock: out = vlm.create_chat_completion( messages=[ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": data_uri}}, {"type": "text", "text": _FOOD_VISION_PROMPT}, ], } ], max_tokens=160, temperature=0.2, ) raw = strip_think(out["choices"][0]["message"]["content"] or "").strip() return _dedupe_food_items(raw) def warmup() -> None: """Load the text model at startup so the first request isn't a cold start. The vision model is loaded lazily on the first food photo. A load failure is swallowed here — MODEL_STATE captures it and requests surface a friendly message — so the web server still comes up and serves the UI.""" try: get_llm() except ModelUnavailable: pass # state already set to "error"; UI will show it # Load the embedder now (before any food-photo VLM load) and seed demo # notes when in demo mode. try: rag.warmup() except Exception as e: # embedder optional — recall just returns [] logger.warning("embedder warmup failed: %s", e) if config.DEMO: rag.ensure_seeded() # This Nemotron GGUF always "thinks out loud" in plain prose and ignores # /no_think and "detailed thinking off". Rather than fight it, we let it reason, # ask it to keep reasoning short and mark the answer with a delimiter, and strip # everything before the answer server-side (see ANSWER_DELIM / _clean_response). # The stripper is anchor-based, so it stays clean even when the model forgets # the delimiter under a long prompt. ANSWER_DELIM = "==ANSWER==" SYSTEM_BASE = ( "You are LifeOS, a sharp, friendly personal assistant running 100% locally " "on {pos} own machine.\n" "Think briefly first if you must, then write a line containing exactly " + ANSWER_DELIM + " followed by the final answer for {name}. Keep any " "reasoning short; the user only sees what comes after " + ANSWER_DELIM + ".\n" "The final answer is concise and concrete: lead with bold key items and " "short bullet lists, ground every claim in the provided memory (quote " "specific dishes, dates, prices, habits), and never invent data not in the " "context." ) DOMAIN_INSTRUCTIONS = { "food": ( "Task: recommend exactly 3 recipes for this week. For each, give the " "recipe name, which flyer deals it uses (with prices), estimated cost, " "and a one-line 'why' that references both the deals and what {name} " "cooked recently (favor variety — avoid repeating recent main " "ingredients). Respect dietary preferences strictly." ), "health": ( "Task: recommend tomorrow's exercise. Consider the recent workout " "pattern, muscle-group rotation, rest balance, and the fitness goal. " "Give one clear recommendation (type + duration), then 2-3 bullet " "points of reasoning referencing specific recent workouts and any " "known injury constraints." ), "money": ( "Task: review the detected recurring subscriptions against income and " "budget. Classify each as CANCEL, KEEP, or WATCH with a one-line " "plain-language reason (reference cost, last-used date, and overlap " "with other services). End with the total monthly savings if all " "CANCEL items are dropped and what that money could fund." ), "goal": ( "Task: act as a Socratic financial-goal coach for {name}. Ask exactly " "ONE probing question at a time — why this goal matters, what tradeoffs " "they'd accept, whether the timeline is realistic given income and " "monthly payments, what spending they would cut. Keep each turn short. " "After roughly 3-4 exchanges (use the conversation history to judge), " "stop questioning and summarize a concrete savings plan: monthly amount " "to set aside, what to cut, and the realistic completion date, checked " "against {pos} income and monthly payments." ), "meal_photo": ( "Task: a vision model has identified the food items in a photo of " "{pos} meal (or read a grocery receipt). Using that item list, write " "a short, well-structured markdown response with EXACTLY these three " "sections:\n" "**Identified** — a tight bullet list of the items, each in **bold**.\n" "**How it fits** — 2-3 bullets on how these choices line up with " "{pos} dietary preferences and fitness goal, calling out specific " "items and a rough protein read.\n" "**Buy next** — 2-3 suggested items that better fit their goals and " "budget, each with a one-line reason.\n" "Keep it concise. Use bullets and bold; do not invent items that were " "not identified." ), "payment_impact": ( "Task: {name} just updated their monthly payments. Explain how their " "total monthly payments affect reaching their savings goal(s). Compute " "money left to save = monthly income − total monthly payments, then for " "each goal estimate how many months the remaining amount (target − " "saved) will take at that rate and whether the deadline is realistic.\n" "Format the answer EXACTLY like this, with real line breaks:\n" "****\n" "- : , at <$/mo>, deadline — on " "track / behind\n" "Use one bullet per goal, each on its OWN line. Be concrete with dollar " "figures. If there are no goals, reply with one short line instead." ), "chat": ( "Task: answer the question using everything you know about {name} " "across food, fitness, and finances. Cross-reference domains when " "useful. If asked to plan, produce a compact, actionable plan." ), } def _slice_for_domain(domain: str, mem: dict) -> dict: profile = mem["user_profile"] finances = mem.get("finances", {}) if domain == "food": return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)} if domain == "meal_photo": return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)} if domain == "health": return { "user_profile": profile, "workouts_last_14_days": memory_store.workouts_in_window(14, mem), "calendar_next_7_days": memory_store.events_in_window(7, mem), "workout_schedule": mem.get("workout_schedule", {}), } if domain in ("money", "goal", "payment_impact"): return { "user_profile": profile, "finances": finances, "monthly_payments": finances.get("monthly_payments", []), "goals": mem.get("goals", []), } return { # chat sees everything "user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem), "workouts_last_14_days": memory_store.workouts_in_window(14, mem), "calendar_next_7_days": memory_store.events_in_window(7, mem), "workout_schedule": mem.get("workout_schedule", {}), "finances": finances, "goals": mem.get("goals", []), } def slice_for_domains(mem: dict, domains: list[str]) -> dict: """Merged memory slice for selected domains ("kitchen"->food); profile always included.""" alias = {"kitchen": "food"} merged = {"user_profile": mem["user_profile"]} for d in domains: merged.update(_slice_for_domain(alias.get(d, d), mem)) return merged def _fmt(obj, indent=0) -> str: pad = " " * indent if isinstance(obj, dict): return "\n".join(f"{pad}{k}: {_fmt(v, indent + 1).lstrip() if not isinstance(v, (dict, list)) else chr(10) + _fmt(v, indent + 1)}" for k, v in obj.items()) if isinstance(obj, list): return "\n".join(f"{pad}- {_fmt(x, indent + 1).lstrip()}" if not isinstance(x, (dict, list)) else f"{pad}-\n{_fmt(x, indent + 1)}" for x in obj) return f"{pad}{obj}" def _names(profile: dict) -> tuple[str, str, str]: """(address, possessive, header) for prompts. Falls back gracefully when a new user hasn't set their name yet, so prompts never read "'s machine".""" name = (profile.get("name") or profile.get("first_name") or "").strip() if name: return name, f"{name}'s", f"{name.upper()}'S MEMORY" return "you", "your", "YOUR MEMORY" def build_prompt(domain: str, mem: dict, user_input: str, domains: list[str] | None = None) -> list[dict]: """Assemble [system, user] messages: domain template + short-term memory slice + long-term RAG recall. `domains` narrows the memory slice to only the referenced domains (chat refs); None keeps the default slice.""" name, pos, header = _names(mem["user_profile"]) recall_query = user_input or DOMAIN_INSTRUCTIONS[domain] notes = rag.recall(f"{domain}: {recall_query}", k=5) system = SYSTEM_BASE.format(name=name, pos=pos) if domain in DOMAIN_INSTRUCTIONS: system += "\n\n" + DOMAIN_INSTRUCTIONS[domain].format(name=name, pos=pos) mem_slice = slice_for_domains(mem, domains) if domains else _slice_for_domain(domain, mem) parts = [f"=== {header} ===", _fmt(mem_slice)] if notes: parts.append("\n=== LONG-TERM NOTES (recalled) ===") parts.extend(f"- {n['text']}" for n in notes) parts.append("\n=== REQUEST ===") parts.append(user_input.strip() if user_input.strip() else "(Use the task instructions above.)") # Recency nudge: a final instruction at the very end of the user turn is the # most reliable way to stop this reasoning-happy GGUF from burning the token # budget thinking out loud. It jumps almost straight to the delimiter, which # _clean_response strips — giving fast, clean answers. parts.append( "\n\nIMPORTANT: Do NOT think step by step or explain your reasoning. " "Immediately write " + ANSWER_DELIM + " then the final answer." ) return [ {"role": "system", "content": system}, {"role": "user", "content": "\n".join(parts)}, ] _THINK_RE = re.compile(r".*?(?:|$)", re.DOTALL) # A line that begins a markdown block — the real answer almost always starts # with one of these across every domain (bold lead, header, bullet, number, # table row, blockquote). _MD_ANCHOR = re.compile(r"^(?:\*\*|#{1,6}\s|[-*+]\s|\d+[.)]\s|\||>\s?)") # Plain-prose lines that are the model thinking out loud, not answer content. # This GGUF reasons in first-person prose ("We need to…", "Let's compute…", # "Now classify…", "Let's produce:") before writing the markdown answer. _REASONING = re.compile( r"(?i)\b(?:we (?:need|should|must|can|have to|could|want|'?ll)|let'?s\b|so we\b|" r"the user (?:wants|needs|asks|is)|plain text|private reasoning|" r"is (?:discarded|hidden)|then (?:markdown|final|the answer|answer)|" r"first[,:]? |probably\b|i think\b|okay[,:]|now (?:let|we|i|classify|compute)|" r"let'?s (?:produce|craft|compute|do|output)|markdown:|answer:?$|maybe\b|actually\b)" ) # Trailing afterthoughts the model sometimes tacks on AFTER the answer # ("But months 0.3 seems weird.", "Wait, let me recheck."). Trimmed from the end. _TRAILING_META = re.compile( r"(?i)^(?:but|wait|hmm+|note|actually|hold on|let me|i should|that|this|" r"however)\b.*\b(?:seem|weird|odd|wrong|off|recalc|double|check|sure|" r"strange|recompute|verify)\b|^(?:wait|hmm+)\b" ) def strip_think(text: str) -> str: """Remove blocks (also handles an unclosed one mid-stream).""" return _THINK_RE.sub("", text).lstrip() def _is_reasoning_line(line: str) -> bool: return bool(_REASONING.search(line)) def _strip_to_last_delimiter(text: str) -> str: """Cut to the answer using the model's reasoning markers. ANSWER_DELIM reliably marks where the answer STARTS, so we keep what's after the last one. A bare (no opening tag) is ambiguous: usually it ends a reasoning block that PRECEDES the answer, but sometimes the model emits it AFTER the answer (trailing). We disambiguate by whether real content follows it — substantial text after is the answer; otherwise the answer is what came before.""" text = _THINK_RE.sub("", text) # drop any well-formed if ANSWER_DELIM in text: text = text.rsplit(ANSWER_DELIM, 1)[-1] if "" in text: before, _, after = text.rpartition("") text = after if len(after.strip()) >= 8 else before return text.strip() def _trim_trailing_meta(text: str) -> str: """Drop trailing blank / afterthought lines the model adds after the answer.""" lines = text.split("\n") while lines and (not lines[-1].strip() or _TRAILING_META.search(lines[-1].strip())): lines.pop() return "\n".join(lines).strip() def _clean_response(text: str) -> str: """Return only the user-facing answer, hiding the model's chain-of-thought. The model reasons in plain prose then writes a markdown answer. Strategy: 1. drop blocks; if it emitted ANSWER_DELIM, keep only what follows; 2. otherwise, if the text reads as reasoning and a markdown block appears later, jump to that first markdown line (the answer); 3. while still mid-reasoning with no answer in sight, return "" so the UI keeps showing its thinking state instead of the raw reasoning. Returns the text unchanged when nothing looks like reasoning — genuine answers pass through untouched.""" text = _strip_to_last_delimiter(text) lines = text.strip().split("\n") nonempty = [l for l in lines if l.strip()] if not nonempty: return "" anchor = next((i for i, l in enumerate(lines) if _MD_ANCHOR.match(l.strip())), None) looks_reasoning = any(_is_reasoning_line(l) for l in nonempty) if anchor is not None: pre = [l for l in lines[:anchor] if l.strip()] # Jump to the answer when reasoning precedes the first markdown block. if pre and any(_is_reasoning_line(l) for l in pre): return _trim_trailing_meta("\n".join(lines[anchor:]).strip()) return _trim_trailing_meta(text.strip()) # No markdown block yet. If it's pure reasoning, hide it (streaming); # the end-of-stream fallback will recover the answer if one exists. return "" if looks_reasoning else _trim_trailing_meta(text.strip()) def _final_answer(text: str) -> str: """End-of-stream fallback: best-effort answer even if the model never wrote a markdown block or delimiter (e.g. a plain one-line coaching question). Drops leading reasoning lines; returns the raw text if that empties it.""" cleaned = _clean_response(text) if cleaned: return cleaned body = _strip_to_last_delimiter(text) lines = body.strip().split("\n") while lines and (not lines[0].strip() or _is_reasoning_line(lines[0])): lines.pop(0) return _trim_trailing_meta("\n".join(lines).strip()) or body.strip() _MODEL_ERROR_MSG = ( "⚠️ The local model couldn't start on this machine. Check that " "llama-cpp-python is installed for your hardware and that there's enough " "memory, then restart LifeOS. (Details are in the server log.)" ) def generate_stream( messages: list[dict], max_tokens: int = 1024, temperature: float = 0.4, domain: str = "chat", extra_context: str = "", ) -> Iterator[str]: """Yield cumulative user-facing response text. The model reasons out loud and marks the answer with ANSWER_DELIM. We hide everything until the delimiter appears, then stream the cleaned answer (see _clean_response). If the model never emits the delimiter, we fall back to a best-effort clean so the user is never left with an empty reply. extra_context (e.g. web search results) is appended to the final user message when non-empty. If the model can't be loaded, yields a single friendly message rather than raising — the UI shows it inline. """ if extra_context: messages = list(messages) for i in range(len(messages) - 1, -1, -1): if messages[i].get("role") == "user": messages[i] = { "role": "user", "content": messages[i]["content"] + "\n\n=== WEB CONTEXT ===\n" + extra_context, } break try: llm = get_llm() except ModelUnavailable: yield _MODEL_ERROR_MSG return acc = "" last = "" try: with _llm_lock: for chunk in llm.create_chat_completion( messages=messages, max_tokens=max_tokens, temperature=temperature, stream=True, ): delta = chunk["choices"][0].get("delta", {}) acc += delta.get("content") or "" # _clean_response returns "" while the model is still reasoning, # so the UI keeps its "thinking…" state until the answer starts. cleaned = _clean_response(acc) if cleaned and cleaned != last: last = cleaned yield cleaned except Exception as e: # inference-time failure (e.g. OOM mid-generation) logger.error("generation failed (%s): %s", domain, e) if not last and not acc: yield _MODEL_ERROR_MSG return # If nothing surfaced (model never wrote a markdown answer/delimiter), fall # back to a best-effort strip so the reply is never blank. if not last and acc: fallback = _final_answer(acc) if fallback: yield fallback def run_domain(domain: str, user_input: str = "", max_tokens: int = 1024) -> Iterator[str]: """One-call helper: load memory, build prompt, stream the answer.""" mem = memory_store.load() messages = build_prompt(domain, mem, user_input) yield from generate_stream(messages, max_tokens=max_tokens, domain=domain)