Spaces:
Running
Running
| """LifeOS reasoning engine. | |
| Text reasoning runs on one small model — NVIDIA Nemotron-3-Nano-4B (Q4_K_M | |
| GGUF, 2.84GB) — 100% locally through the llama.cpp runtime (llama-cpp-python). | |
| Deterministic feature code curates a small context (memory slices + RAG | |
| recall); the model only does the judgment + explanation layer. That division | |
| is what makes a 4B on 2 vCPUs feel smart. | |
| Food photos additionally use a small vision-language model — Qwen2.5-VL-3B | |
| (Q4_K_M GGUF) — for perception only: it identifies the food items in an image, | |
| which Nemotron then reasons about against memory. The VLM is loaded lazily on | |
| the first photo, so the text-only path never pays for it. | |
| """ | |
| import logging | |
| import os | |
| import re | |
| import threading | |
| from collections.abc import Iterator | |
| import cuda_bootstrap | |
| import config | |
| import memory as memory_store | |
| import rag | |
| logger = logging.getLogger(__name__) | |
| cuda_bootstrap.ensure() # register CUDA runtime DLL dirs before llama_cpp loads | |
| MODEL_REPO = config.MODEL_REPO | |
| MODEL_FILE = config.MODEL_FILE | |
| # Fallback (plain llama arch) if the hybrid Mamba arch is unsupported by the | |
| # installed llama.cpp: bartowski/nvidia_Llama-3.1-Nemotron-Nano-4B-v1.1-GGUF | |
| # Vision model for food-photo recognition. Nemotron is text-only and cannot | |
| # "see" an image, so a small vision-language model handles perception: it | |
| # identifies the food items in a photo. The identified items are then fed to | |
| # Nemotron, which does the memory-grounded judgment (dietary fit, suggestions). | |
| # Q4_K_M (~2.4GB) + the f16 multimodal projector that encodes the image. | |
| VLM_REPO = config.VLM_REPO | |
| VLM_FILE = config.VLM_FILE | |
| VLM_MMPROJ_FILE = config.VLM_MMPROJ_FILE | |
| _llm = None | |
| _llm_lock = threading.Lock() | |
| _vlm = None | |
| _vlm_lock = threading.Lock() | |
| # GPU offload: number of model layers to push to the GPU. -1 = all layers | |
| # (full offload), 0 = CPU only. Requires a CUDA/Metal/Vulkan build of | |
| # llama-cpp-python — the plain CPU wheel ignores this and stays on CPU. | |
| GPU_LAYERS = config.GPU_LAYERS | |
| # Observable load state for the UI / status endpoint. One of: | |
| # "idle" (not loaded yet), "loading", "ready", "error". | |
| ACTIVE_BACKEND = None | |
| MODEL_STATE = "idle" | |
| MODEL_ERROR = None | |
| class ModelUnavailable(RuntimeError): | |
| """Raised when the local model cannot be loaded (bad/missing wheel, failed | |
| download, out of memory). Callers stream a friendly message instead.""" | |
| def status() -> dict: | |
| """Current model state for the /status endpoint and UI indicator.""" | |
| return {"state": MODEL_STATE, "backend": ACTIVE_BACKEND, "error": MODEL_ERROR} | |
| def _load_llm(n_gpu_layers: int): | |
| import cuda_bootstrap | |
| cuda_bootstrap.ensure() | |
| from llama_cpp import Llama | |
| cores = os.cpu_count() or 2 | |
| # When fully offloaded to the GPU the text model needs almost no CPU threads; | |
| # keeping its pool small leaves cores free for the CPU-bound vision model | |
| # that runs on food-photo uploads (otherwise the two oversubscribe the CPU). | |
| n_threads = max(2, cores // 2) if n_gpu_layers != 0 else cores | |
| return Llama.from_pretrained( | |
| repo_id=MODEL_REPO, | |
| filename=MODEL_FILE, | |
| n_ctx=8192, | |
| n_threads=n_threads, | |
| n_gpu_layers=n_gpu_layers, | |
| verbose=False, | |
| ) | |
| def get_llm(): | |
| """Load the model once. Try GPU offload first; if the GPU build is missing | |
| or crashes (bad wheel, no VRAM, driver mismatch), fall back to CPU so the | |
| app still runs. Honors LIFEOS_GPU_LAYERS=0 to skip the GPU attempt. | |
| Updates MODEL_STATE so the UI can show loading/ready/error. On total | |
| failure raises ModelUnavailable so callers can stream a friendly message | |
| instead of a raw 500.""" | |
| global _llm, ACTIVE_BACKEND, MODEL_STATE, MODEL_ERROR | |
| if _llm is not None: | |
| return _llm | |
| MODEL_STATE = "loading" | |
| if GPU_LAYERS != 0: | |
| try: | |
| _llm = _load_llm(GPU_LAYERS) | |
| ACTIVE_BACKEND = "gpu" | |
| MODEL_STATE, MODEL_ERROR = "ready", None | |
| logger.info("model loaded on GPU (n_gpu_layers=%s)", GPU_LAYERS) | |
| return _llm | |
| except BaseException as e: # noqa: BLE001 — incl. OSError/illegal-instr | |
| logger.warning("GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e) | |
| _llm = None | |
| try: | |
| _llm = _load_llm(0) | |
| except BaseException as e: # noqa: BLE001 — download/format/runtime failure | |
| MODEL_STATE, MODEL_ERROR = "error", f"{type(e).__name__}: {e}" | |
| logger.error("model load failed on CPU: %s", MODEL_ERROR) | |
| raise ModelUnavailable(MODEL_ERROR) from e | |
| ACTIVE_BACKEND = "cpu" | |
| MODEL_STATE, MODEL_ERROR = "ready", None | |
| logger.info("model loaded on CPU") | |
| return _llm | |
| def _load_vlm(n_gpu_layers: int): | |
| import cuda_bootstrap | |
| cuda_bootstrap.ensure() | |
| from llama_cpp import Llama | |
| from llama_cpp.llama_chat_format import Qwen25VLChatHandler | |
| # The chat handler downloads + owns the multimodal projector (mmproj) that | |
| # turns the image into tokens the model can attend to. | |
| handler = Qwen25VLChatHandler.from_pretrained( | |
| repo_id=VLM_REPO, | |
| filename=VLM_MMPROJ_FILE, | |
| verbose=False, | |
| ) | |
| return Llama.from_pretrained( | |
| repo_id=VLM_REPO, | |
| filename=VLM_FILE, | |
| chat_handler=handler, | |
| n_ctx=4096, | |
| n_threads=os.cpu_count() or 2, | |
| n_gpu_layers=n_gpu_layers, | |
| verbose=False, | |
| ) | |
| VLM_GPU_LAYERS = config.VLM_GPU_LAYERS | |
| def get_vlm(): | |
| """Lazily load the vision-language model (used only for food photos). Loaded | |
| on first photo so the text-only path never pays for it. Defaults to CPU | |
| (VLM_GPU_LAYERS=0) so it doesn't fight the resident text model for VRAM on | |
| small cards; if a GPU attempt is configured but fails, falls back to CPU.""" | |
| global _vlm | |
| if _vlm is not None: | |
| return _vlm | |
| if VLM_GPU_LAYERS != 0: | |
| try: | |
| _vlm = _load_vlm(VLM_GPU_LAYERS) | |
| logger.info("VLM loaded on GPU (n_gpu_layers=%s)", VLM_GPU_LAYERS) | |
| return _vlm | |
| except BaseException as e: # noqa: BLE001 | |
| logger.warning("VLM GPU load failed (%s: %s); falling back to CPU", type(e).__name__, e) | |
| _vlm = None | |
| try: | |
| _vlm = _load_vlm(0) | |
| except BaseException as e: # noqa: BLE001 | |
| raise ModelUnavailable(f"vision model unavailable: {type(e).__name__}: {e}") from e | |
| logger.info("VLM loaded on CPU") | |
| return _vlm | |
| _FOOD_VISION_PROMPT = ( | |
| "You are a food-recognition assistant. Look at this photo and list the food " | |
| "and drink items you can see. Break composed dishes into their visible " | |
| "components — e.g. a pizza becomes its toppings (crust, tomato sauce, " | |
| "mozzarella, basil); a plate of toast with egg becomes each item. If it is " | |
| "a grocery receipt or a label, read the product names instead. Respond with " | |
| "ONLY a bulleted list — one item per line starting with '- ', using plain " | |
| "common names (e.g. '- fried egg', '- whole-grain toast', '- cherry " | |
| "tomatoes'). Add a rough quantity when obvious. Aim for 3-8 items. Ignore " | |
| "plates, bowls, cutlery, and packaging. Do not add commentary, nutrition " | |
| "facts, or headings." | |
| ) | |
| # Longest-side cap for the image fed to the VLM. On this CPU path a full-res | |
| # photo decodes ~1000 image tokens (~36s); 768px cuts that ~4x to a few seconds | |
| # with no loss in food-recognition quality. | |
| VLM_MAX_IMAGE_SIDE = config.VLM_MAX_IMAGE_SIDE | |
| def _image_data_uri(path: str) -> str: | |
| """Downscale the photo to VLM_MAX_IMAGE_SIDE and return a JPEG data URI. | |
| Falls back to the raw bytes if Pillow can't open it.""" | |
| import base64 | |
| import io | |
| try: | |
| from PIL import Image | |
| im = Image.open(path) | |
| if im.mode not in ("RGB", "L"): | |
| im = im.convert("RGB") | |
| w, h = im.size | |
| scale = VLM_MAX_IMAGE_SIDE / max(w, h) | |
| if scale < 1: | |
| im = im.resize((max(1, int(w * scale)), max(1, int(h * scale)))) | |
| buf = io.BytesIO() | |
| im.convert("RGB").save(buf, format="JPEG", quality=88) | |
| data = buf.getvalue() | |
| mime = "jpeg" | |
| except Exception: # unreadable by Pillow — send original bytes | |
| with open(path, "rb") as f: | |
| data = f.read() | |
| ext = os.path.splitext(path)[1].lstrip(".").lower() or "jpeg" | |
| mime = "jpeg" if ext in ("jpg", "jpeg") else ext | |
| return f"data:image/{mime};base64," + base64.b64encode(data).decode("ascii") | |
| def _dedupe_food_items(text: str) -> str: | |
| """Keep unique '- item' bullet lines (the small VLM sometimes repeats), in | |
| order, capped to 8 — so the identified-items list stays tight.""" | |
| seen, items = set(), [] | |
| for line in text.splitlines(): | |
| line = line.strip().lstrip("-*•").strip() | |
| if not line: | |
| continue | |
| key = line.lower() | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| items.append(f"- {line}") | |
| if len(items) >= 8: | |
| break | |
| return "\n".join(items) | |
| def describe_food_image(path: str) -> str: | |
| """Identify the food items visible in a photo using the vision model. | |
| Returns a short, de-duplicated bulleted list of items (also works on | |
| receipts/labels by reading product names). This is the perception step; the | |
| memory-grounded analysis is done separately by run_domain("meal_photo", …).""" | |
| data_uri = _image_data_uri(path) | |
| vlm = get_vlm() | |
| with _vlm_lock: | |
| out = vlm.create_chat_completion( | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image_url", "image_url": {"url": data_uri}}, | |
| {"type": "text", "text": _FOOD_VISION_PROMPT}, | |
| ], | |
| } | |
| ], | |
| max_tokens=160, | |
| temperature=0.2, | |
| ) | |
| raw = strip_think(out["choices"][0]["message"]["content"] or "").strip() | |
| return _dedupe_food_items(raw) | |
| def warmup() -> None: | |
| """Load the text model at startup so the first request isn't a cold start. | |
| The vision model is loaded lazily on the first food photo. A load failure | |
| is swallowed here — MODEL_STATE captures it and requests surface a friendly | |
| message — so the web server still comes up and serves the UI.""" | |
| try: | |
| get_llm() | |
| except ModelUnavailable: | |
| pass # state already set to "error"; UI will show it | |
| # Load the embedder now (before any food-photo VLM load) and seed demo | |
| # notes when in demo mode. | |
| try: | |
| rag.warmup() | |
| except Exception as e: # embedder optional — recall just returns [] | |
| logger.warning("embedder warmup failed: %s", e) | |
| if config.DEMO: | |
| rag.ensure_seeded() | |
| # This Nemotron GGUF always "thinks out loud" in plain prose and ignores | |
| # /no_think and "detailed thinking off". Rather than fight it, we let it reason, | |
| # ask it to keep reasoning short and mark the answer with a delimiter, and strip | |
| # everything before the answer server-side (see ANSWER_DELIM / _clean_response). | |
| # The stripper is anchor-based, so it stays clean even when the model forgets | |
| # the delimiter under a long prompt. | |
| ANSWER_DELIM = "==ANSWER==" | |
| SYSTEM_BASE = ( | |
| "You are LifeOS, a sharp, friendly personal assistant running 100% locally " | |
| "on {pos} own machine.\n" | |
| "Think briefly first if you must, then write a line containing exactly " | |
| + ANSWER_DELIM + " followed by the final answer for {name}. Keep any " | |
| "reasoning short; the user only sees what comes after " + ANSWER_DELIM + ".\n" | |
| "The final answer is concise and concrete: lead with bold key items and " | |
| "short bullet lists, ground every claim in the provided memory (quote " | |
| "specific dishes, dates, prices, habits), and never invent data not in the " | |
| "context." | |
| ) | |
| DOMAIN_INSTRUCTIONS = { | |
| "food": ( | |
| "Task: recommend exactly 3 recipes for this week. For each, give the " | |
| "recipe name, which flyer deals it uses (with prices), estimated cost, " | |
| "and a one-line 'why' that references both the deals and what {name} " | |
| "cooked recently (favor variety — avoid repeating recent main " | |
| "ingredients). Respect dietary preferences strictly." | |
| ), | |
| "health": ( | |
| "Task: recommend tomorrow's exercise. Consider the recent workout " | |
| "pattern, muscle-group rotation, rest balance, and the fitness goal. " | |
| "Give one clear recommendation (type + duration), then 2-3 bullet " | |
| "points of reasoning referencing specific recent workouts and any " | |
| "known injury constraints." | |
| ), | |
| "money": ( | |
| "Task: review the detected recurring subscriptions against income and " | |
| "budget. Classify each as CANCEL, KEEP, or WATCH with a one-line " | |
| "plain-language reason (reference cost, last-used date, and overlap " | |
| "with other services). End with the total monthly savings if all " | |
| "CANCEL items are dropped and what that money could fund." | |
| ), | |
| "goal": ( | |
| "Task: act as a Socratic financial-goal coach for {name}. Ask exactly " | |
| "ONE probing question at a time — why this goal matters, what tradeoffs " | |
| "they'd accept, whether the timeline is realistic given income and " | |
| "monthly payments, what spending they would cut. Keep each turn short. " | |
| "After roughly 3-4 exchanges (use the conversation history to judge), " | |
| "stop questioning and summarize a concrete savings plan: monthly amount " | |
| "to set aside, what to cut, and the realistic completion date, checked " | |
| "against {pos} income and monthly payments." | |
| ), | |
| "meal_photo": ( | |
| "Task: a vision model has identified the food items in a photo of " | |
| "{pos} meal (or read a grocery receipt). Using that item list, write " | |
| "a short, well-structured markdown response with EXACTLY these three " | |
| "sections:\n" | |
| "**Identified** — a tight bullet list of the items, each in **bold**.\n" | |
| "**How it fits** — 2-3 bullets on how these choices line up with " | |
| "{pos} dietary preferences and fitness goal, calling out specific " | |
| "items and a rough protein read.\n" | |
| "**Buy next** — 2-3 suggested items that better fit their goals and " | |
| "budget, each with a one-line reason.\n" | |
| "Keep it concise. Use bullets and bold; do not invent items that were " | |
| "not identified." | |
| ), | |
| "payment_impact": ( | |
| "Task: {name} just updated their monthly payments. Explain how their " | |
| "total monthly payments affect reaching their savings goal(s). Compute " | |
| "money left to save = monthly income − total monthly payments, then for " | |
| "each goal estimate how many months the remaining amount (target − " | |
| "saved) will take at that rate and whether the deadline is realistic.\n" | |
| "Format the answer EXACTLY like this, with real line breaks:\n" | |
| "**<one-line headline with the key number>**\n" | |
| "- <goal name>: <remaining $>, <months> at <$/mo>, deadline <date> — on " | |
| "track / behind\n" | |
| "Use one bullet per goal, each on its OWN line. Be concrete with dollar " | |
| "figures. If there are no goals, reply with one short line instead." | |
| ), | |
| "chat": ( | |
| "Task: answer the question using everything you know about {name} " | |
| "across food, fitness, and finances. Cross-reference domains when " | |
| "useful. If asked to plan, produce a compact, actionable plan." | |
| ), | |
| } | |
| def _slice_for_domain(domain: str, mem: dict) -> dict: | |
| profile = mem["user_profile"] | |
| finances = mem.get("finances", {}) | |
| if domain == "food": | |
| return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)} | |
| if domain == "meal_photo": | |
| return {"user_profile": profile, "recent_meals": memory_store.recent_meals(7, mem)} | |
| if domain == "health": | |
| return { | |
| "user_profile": profile, | |
| "workouts_last_14_days": memory_store.workouts_in_window(14, mem), | |
| "calendar_next_7_days": memory_store.events_in_window(7, mem), | |
| "workout_schedule": mem.get("workout_schedule", {}), | |
| } | |
| if domain in ("money", "goal", "payment_impact"): | |
| return { | |
| "user_profile": profile, | |
| "finances": finances, | |
| "monthly_payments": finances.get("monthly_payments", []), | |
| "goals": mem.get("goals", []), | |
| } | |
| return { # chat sees everything | |
| "user_profile": profile, | |
| "recent_meals": memory_store.recent_meals(7, mem), | |
| "workouts_last_14_days": memory_store.workouts_in_window(14, mem), | |
| "calendar_next_7_days": memory_store.events_in_window(7, mem), | |
| "workout_schedule": mem.get("workout_schedule", {}), | |
| "finances": finances, | |
| "goals": mem.get("goals", []), | |
| } | |
| def slice_for_domains(mem: dict, domains: list[str]) -> dict: | |
| """Merged memory slice for selected domains ("kitchen"->food); profile always included.""" | |
| alias = {"kitchen": "food"} | |
| merged = {"user_profile": mem["user_profile"]} | |
| for d in domains: | |
| merged.update(_slice_for_domain(alias.get(d, d), mem)) | |
| return merged | |
| def _fmt(obj, indent=0) -> str: | |
| pad = " " * indent | |
| if isinstance(obj, dict): | |
| return "\n".join(f"{pad}{k}: {_fmt(v, indent + 1).lstrip() if not isinstance(v, (dict, list)) else chr(10) + _fmt(v, indent + 1)}" for k, v in obj.items()) | |
| if isinstance(obj, list): | |
| return "\n".join(f"{pad}- {_fmt(x, indent + 1).lstrip()}" if not isinstance(x, (dict, list)) else f"{pad}-\n{_fmt(x, indent + 1)}" for x in obj) | |
| return f"{pad}{obj}" | |
| def _names(profile: dict) -> tuple[str, str, str]: | |
| """(address, possessive, header) for prompts. Falls back gracefully when a | |
| new user hasn't set their name yet, so prompts never read "'s machine".""" | |
| name = (profile.get("name") or profile.get("first_name") or "").strip() | |
| if name: | |
| return name, f"{name}'s", f"{name.upper()}'S MEMORY" | |
| return "you", "your", "YOUR MEMORY" | |
| def build_prompt(domain: str, mem: dict, user_input: str, domains: list[str] | None = None) -> list[dict]: | |
| """Assemble [system, user] messages: domain template + short-term memory | |
| slice + long-term RAG recall. `domains` narrows the memory slice to only | |
| the referenced domains (chat refs); None keeps the default slice.""" | |
| name, pos, header = _names(mem["user_profile"]) | |
| recall_query = user_input or DOMAIN_INSTRUCTIONS[domain] | |
| notes = rag.recall(f"{domain}: {recall_query}", k=5) | |
| system = SYSTEM_BASE.format(name=name, pos=pos) | |
| if domain in DOMAIN_INSTRUCTIONS: | |
| system += "\n\n" + DOMAIN_INSTRUCTIONS[domain].format(name=name, pos=pos) | |
| mem_slice = slice_for_domains(mem, domains) if domains else _slice_for_domain(domain, mem) | |
| parts = [f"=== {header} ===", _fmt(mem_slice)] | |
| if notes: | |
| parts.append("\n=== LONG-TERM NOTES (recalled) ===") | |
| parts.extend(f"- {n['text']}" for n in notes) | |
| parts.append("\n=== REQUEST ===") | |
| parts.append(user_input.strip() if user_input.strip() else "(Use the task instructions above.)") | |
| # Recency nudge: a final instruction at the very end of the user turn is the | |
| # most reliable way to stop this reasoning-happy GGUF from burning the token | |
| # budget thinking out loud. It jumps almost straight to the delimiter, which | |
| # _clean_response strips — giving fast, clean answers. | |
| parts.append( | |
| "\n\nIMPORTANT: Do NOT think step by step or explain your reasoning. " | |
| "Immediately write " + ANSWER_DELIM + " then the final answer." | |
| ) | |
| return [ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": "\n".join(parts)}, | |
| ] | |
| _THINK_RE = re.compile(r"<think>.*?(?:</think>|$)", re.DOTALL) | |
| # A line that begins a markdown block — the real answer almost always starts | |
| # with one of these across every domain (bold lead, header, bullet, number, | |
| # table row, blockquote). | |
| _MD_ANCHOR = re.compile(r"^(?:\*\*|#{1,6}\s|[-*+]\s|\d+[.)]\s|\||>\s?)") | |
| # Plain-prose lines that are the model thinking out loud, not answer content. | |
| # This GGUF reasons in first-person prose ("We need to…", "Let's compute…", | |
| # "Now classify…", "Let's produce:") before writing the markdown answer. | |
| _REASONING = re.compile( | |
| r"(?i)\b(?:we (?:need|should|must|can|have to|could|want|'?ll)|let'?s\b|so we\b|" | |
| r"the user (?:wants|needs|asks|is)|plain text|private reasoning|" | |
| r"is (?:discarded|hidden)|then (?:markdown|final|the answer|answer)|" | |
| r"first[,:]? |probably\b|i think\b|okay[,:]|now (?:let|we|i|classify|compute)|" | |
| r"let'?s (?:produce|craft|compute|do|output)|markdown:|answer:?$|maybe\b|actually\b)" | |
| ) | |
| # Trailing afterthoughts the model sometimes tacks on AFTER the answer | |
| # ("But months 0.3 seems weird.", "Wait, let me recheck."). Trimmed from the end. | |
| _TRAILING_META = re.compile( | |
| r"(?i)^(?:but|wait|hmm+|note|actually|hold on|let me|i should|that|this|" | |
| r"however)\b.*\b(?:seem|weird|odd|wrong|off|recalc|double|check|sure|" | |
| r"strange|recompute|verify)\b|^(?:wait|hmm+)\b" | |
| ) | |
| def strip_think(text: str) -> str: | |
| """Remove <think>…</think> blocks (also handles an unclosed one mid-stream).""" | |
| return _THINK_RE.sub("", text).lstrip() | |
| def _is_reasoning_line(line: str) -> bool: | |
| return bool(_REASONING.search(line)) | |
| def _strip_to_last_delimiter(text: str) -> str: | |
| """Cut to the answer using the model's reasoning markers. | |
| ANSWER_DELIM reliably marks where the answer STARTS, so we keep what's after | |
| the last one. A bare </think> (no opening tag) is ambiguous: usually it ends | |
| a reasoning block that PRECEDES the answer, but sometimes the model emits it | |
| AFTER the answer (trailing). We disambiguate by whether real content follows | |
| it — substantial text after </think> is the answer; otherwise the answer is | |
| what came before.""" | |
| text = _THINK_RE.sub("", text) # drop any well-formed <think>…</think> | |
| if ANSWER_DELIM in text: | |
| text = text.rsplit(ANSWER_DELIM, 1)[-1] | |
| if "</think>" in text: | |
| before, _, after = text.rpartition("</think>") | |
| text = after if len(after.strip()) >= 8 else before | |
| return text.strip() | |
| def _trim_trailing_meta(text: str) -> str: | |
| """Drop trailing blank / afterthought lines the model adds after the answer.""" | |
| lines = text.split("\n") | |
| while lines and (not lines[-1].strip() or _TRAILING_META.search(lines[-1].strip())): | |
| lines.pop() | |
| return "\n".join(lines).strip() | |
| def _clean_response(text: str) -> str: | |
| """Return only the user-facing answer, hiding the model's chain-of-thought. | |
| The model reasons in plain prose then writes a markdown answer. Strategy: | |
| 1. drop <think> blocks; if it emitted ANSWER_DELIM, keep only what follows; | |
| 2. otherwise, if the text reads as reasoning and a markdown block appears | |
| later, jump to that first markdown line (the answer); | |
| 3. while still mid-reasoning with no answer in sight, return "" so the UI | |
| keeps showing its thinking state instead of the raw reasoning. | |
| Returns the text unchanged when nothing looks like reasoning — genuine | |
| answers pass through untouched.""" | |
| text = _strip_to_last_delimiter(text) | |
| lines = text.strip().split("\n") | |
| nonempty = [l for l in lines if l.strip()] | |
| if not nonempty: | |
| return "" | |
| anchor = next((i for i, l in enumerate(lines) if _MD_ANCHOR.match(l.strip())), None) | |
| looks_reasoning = any(_is_reasoning_line(l) for l in nonempty) | |
| if anchor is not None: | |
| pre = [l for l in lines[:anchor] if l.strip()] | |
| # Jump to the answer when reasoning precedes the first markdown block. | |
| if pre and any(_is_reasoning_line(l) for l in pre): | |
| return _trim_trailing_meta("\n".join(lines[anchor:]).strip()) | |
| return _trim_trailing_meta(text.strip()) | |
| # No markdown block yet. If it's pure reasoning, hide it (streaming); | |
| # the end-of-stream fallback will recover the answer if one exists. | |
| return "" if looks_reasoning else _trim_trailing_meta(text.strip()) | |
| def _final_answer(text: str) -> str: | |
| """End-of-stream fallback: best-effort answer even if the model never wrote | |
| a markdown block or delimiter (e.g. a plain one-line coaching question). | |
| Drops leading reasoning lines; returns the raw text if that empties it.""" | |
| cleaned = _clean_response(text) | |
| if cleaned: | |
| return cleaned | |
| body = _strip_to_last_delimiter(text) | |
| lines = body.strip().split("\n") | |
| while lines and (not lines[0].strip() or _is_reasoning_line(lines[0])): | |
| lines.pop(0) | |
| return _trim_trailing_meta("\n".join(lines).strip()) or body.strip() | |
| _MODEL_ERROR_MSG = ( | |
| "⚠️ The local model couldn't start on this machine. Check that " | |
| "llama-cpp-python is installed for your hardware and that there's enough " | |
| "memory, then restart LifeOS. (Details are in the server log.)" | |
| ) | |
| def generate_stream( | |
| messages: list[dict], | |
| max_tokens: int = 1024, | |
| temperature: float = 0.4, | |
| domain: str = "chat", | |
| extra_context: str = "", | |
| ) -> Iterator[str]: | |
| """Yield cumulative user-facing response text. | |
| The model reasons out loud and marks the answer with ANSWER_DELIM. We hide | |
| everything until the delimiter appears, then stream the cleaned answer | |
| (see _clean_response). If the model never emits the delimiter, we fall back | |
| to a best-effort clean so the user is never left with an empty reply. | |
| extra_context (e.g. web search results) is appended to the final user | |
| message when non-empty. If the model can't be loaded, yields a single | |
| friendly message rather than raising — the UI shows it inline. | |
| """ | |
| if extra_context: | |
| messages = list(messages) | |
| for i in range(len(messages) - 1, -1, -1): | |
| if messages[i].get("role") == "user": | |
| messages[i] = { | |
| "role": "user", | |
| "content": messages[i]["content"] + "\n\n=== WEB CONTEXT ===\n" + extra_context, | |
| } | |
| break | |
| try: | |
| llm = get_llm() | |
| except ModelUnavailable: | |
| yield _MODEL_ERROR_MSG | |
| return | |
| acc = "" | |
| last = "" | |
| try: | |
| with _llm_lock: | |
| for chunk in llm.create_chat_completion( | |
| messages=messages, | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| stream=True, | |
| ): | |
| delta = chunk["choices"][0].get("delta", {}) | |
| acc += delta.get("content") or "" | |
| # _clean_response returns "" while the model is still reasoning, | |
| # so the UI keeps its "thinking…" state until the answer starts. | |
| cleaned = _clean_response(acc) | |
| if cleaned and cleaned != last: | |
| last = cleaned | |
| yield cleaned | |
| except Exception as e: # inference-time failure (e.g. OOM mid-generation) | |
| logger.error("generation failed (%s): %s", domain, e) | |
| if not last and not acc: | |
| yield _MODEL_ERROR_MSG | |
| return | |
| # If nothing surfaced (model never wrote a markdown answer/delimiter), fall | |
| # back to a best-effort strip so the reply is never blank. | |
| if not last and acc: | |
| fallback = _final_answer(acc) | |
| if fallback: | |
| yield fallback | |
| def run_domain(domain: str, user_input: str = "", max_tokens: int = 1024) -> Iterator[str]: | |
| """One-call helper: load memory, build prompt, stream the answer.""" | |
| mem = memory_store.load() | |
| messages = build_prompt(domain, mem, user_input) | |
| yield from generate_stream(messages, max_tokens=max_tokens, domain=domain) | |