"""The scheduling agent: thread (+images) -> validated ActionPlan. Replaces the old one-shot extractor. The model reasons over a whole conversation and emits a single constrained ActionPlan: events, conflicts (vs the user's existing calendar), proposed alternative times, a reply draft, and an optional clarification question. Output is grammar-constrained so it always parses. """ from __future__ import annotations import json import os import re from datetime import datetime, timedelta from typing import Optional from dateutil import parser as dtparser from pydantic import ValidationError from . import events, memory from .schema import ActionPlan, Event SYSTEM = ( "You are a scheduling assistant reading a chat conversation (text, and sometimes images " "such as screenshots, invites, or flyers). Decide what calendar action is warranted and " "return ONLY a JSON object matching the ActionPlan schema:\n" "- reasoning: one or two sentences of why.\n" "- events: concrete events with ISO 8601 datetimes; resolve relative dates from the current " "datetime. Empty if there is no real plan. List EVERY distinct event separately — one thread " "often holds several (e.g. a drop-off AND a pickup, or two appointments, are separate events).\n" "- title: a short, self-contained calendar title summarizing the action and subject " "(e.g. \"Pick up Priya — Terminal 4\", \"Mia — dental cleaning\"), not a quote of the " "message.\n" "- location: the venue or address when one is mentioned (join multi-line addresses into one " "string); null otherwise.\n" "- end: when a duration is stated (\"Duration: 30–45 min\", \"for 2 hours\", \"runs 90 " "minutes\"), set end = start + duration, using the LOWER bound of a range; when an end time " "is stated (\"7-9pm\"), use it; otherwise null. Never guess a duration that was not given.\n" "- early arrival: if told to arrive N minutes early (\"please arrive 15 minutes early\"), " "start = the arrival time (stated time minus N); end still counts from the STATED time; put " "the stated time and the reason in notes.\n" "- reminder_minutes: a stated lead time always wins (\"remind me 2 hours before\" -> 120); " "otherwise 60 for doctor/medical visits, 30 for parties, 45 for carpools or school events; " "for anything else use your judgment.\n" "- conflicts: for any event that clashes with the provided existing calendar, the event_index, " "what it clashes with, and severity (overlap|adjacent|tight).\n" "- proposed_times: ISO 8601 alternatives when there is a conflict.\n" "- reply_draft: a short, natural reply the user could send back.\n" "- needs_clarification: a question if the plan is ambiguous, else null. If something should " "be scheduled but its day or time is not yet known (\"TBD\", \"I'll confirm\", \"sometime " "soon\"), leave events empty and ASK via needs_clarification instead of guessing.\n" "Do not invent events that were not discussed." ) def _existing_block(existing: list[Event]) -> str: if not existing: return "Existing calendar: (none provided)" lines = [f"- {e.title}: {e.start}..{e.end or e.start}" for e in existing] return "Existing calendar:\n" + "\n".join(lines) def build_messages( thread: str, now: datetime, existing: list[Event], images: Optional[list[str]] = None, memory_block: Optional[str] = None, ) -> list[dict]: """Build chat messages. ``images`` are base64 data URIs (used from phase 3). ``memory_block`` is the caller's recall block (per-user/localStorage memory); when None, fall back to the server-side global memory.recall().""" mem = memory.recall() if memory_block is None else memory_block mem_block = f"{mem}\n\n" if mem else "" text = ( f"Current datetime: {now.strftime('%A')}, {now.isoformat()}\n" f"{_existing_block(existing)}\n\n" f"{mem_block}" f"Conversation:\n{thread}\n\n" "Return the ActionPlan JSON now." ) if not images: return [ {"role": "system", "content": SYSTEM}, {"role": "user", "content": text}, ] # Multimodal content format understood by llama.cpp vision chat handlers. content = [{"type": "text", "text": text}] for uri in images: content.append({"type": "image_url", "image_url": {"url": uri}}) return [ {"role": "system", "content": SYSTEM}, {"role": "user", "content": content}, ] def run_agent( thread: str, now: Optional[datetime] = None, existing: Optional[list[Event]] = None, images: Optional[list[str]] = None, memory_block: Optional[str] = None, ) -> ActionPlan: now = now or datetime.now() existing = existing or [] with events.run_scope("analyze"): if images: events.emit("vision", f"reading {len(images)} image(s)", images=len(images)) if os.environ.get("USE_STUB_EXTRACTOR") == "1": plan = _stub_plan(thread, now) else: from .model import complete_json # lazy: avoids llama.cpp in stub mode raw = complete_json( build_messages(thread, now, existing, images, memory_block), json_schema=ActionPlan.model_json_schema(), ) plan = apply_text_rules(thread, _polish_titles(thread, _parse_plan(raw))) # Global path only: with client-owned (per-user) memory, the UI merges # learned contacts itself (memory.learn_from_plan) so we don't pollute the # shared server file. if memory_block is None: memory.observe_plan(plan) # grows-with-you: learn recurring contacts events.emit("decision", f"{len(plan.events)} event(s) detected", events=len(plan.events)) return plan def _parse_plan(raw: str) -> ActionPlan: try: return ActionPlan(**json.loads(raw)) except (json.JSONDecodeError, ValidationError): # Grammar should prevent this; degrade to an empty plan rather than 500. return ActionPlan(reasoning="Could not parse model output.") # --------------------------------------------------------------------------- # # Title polish (optional second pass, TITLE_POLISH=1): rewrite each extracted # event's title into a calendar-ready action+subject summary. The extraction # pass already gets a title style instruction; this pass gives the model one # focused job, which helps on echo-prone inputs (flyers, forwarded notices). # --------------------------------------------------------------------------- # TITLE_SYSTEM = ( "You rewrite calendar event titles. Given a conversation and the events extracted from " "it, return ONLY a JSON object {\"titles\": [...]} with exactly one title per event, in " "the same order. Each title is a short, self-contained calendar entry summarizing the " "action and subject (e.g. \"Pick up Priya — Terminal 4\", \"Mia — dental cleaning\"). " "Keep names and places; drop filler, hype and sender wording. Never add facts that are " "not in the conversation." ) TITLES_SCHEMA = { "type": "object", "properties": {"titles": {"type": "array", "items": {"type": "string"}}}, "required": ["titles"], } def build_title_messages(thread: str, events: list[dict]) -> list[dict]: """Messages for the polish pass. ``events`` are Event-shaped dicts.""" lines = [ f"{i + 1}. {e.get('title') or '(untitled)'} @ {e.get('start')}" + (f" ({e['location']})" if e.get("location") else "") for i, e in enumerate(events) ] text = ( f"Conversation:\n{thread}\n\n" "Extracted events:\n" + "\n".join(lines) + "\n\n" "Return the titles JSON now." ) return [ {"role": "system", "content": TITLE_SYSTEM}, {"role": "user", "content": text}, ] def merge_titles(plan: ActionPlan, raw: str) -> ActionPlan: """Apply a polish-pass response onto the plan; on any mismatch keep the original titles (the polish pass must never be able to lose an event).""" try: titles = json.loads(raw).get("titles") except (json.JSONDecodeError, AttributeError): return plan if not isinstance(titles, list) or len(titles) != len(plan.events): return plan for ev, title in zip(plan.events, titles): if isinstance(title, str) and title.strip(): ev.title = title.strip()[:80] return plan def apply_text_rules(thread: str, plan: ActionPlan) -> ActionPlan: """Deterministic guarantees for explicitly-communicated logistics (same philosophy as conflict detection: don't leave must-hold rules to the model). Single-event plans only — multi-event threads keep per-event model judgment. - "arrive N minutes early" -> start = arrival time, but ONLY when the model demonstrably did not shift already (its start equals the stated time). - end = STATED time + stated duration: a self-shifting model often counts the duration from the arrival time (10:15+30=10:45 instead of 11:00). - reminder: an explicit stated lead time always wins; else type defaults (medical 60 / party 30 / carpool-school 45); else the model's judgment. """ if len(plan.events) != 1: return plan ev = plan.events[0] early = _EARLY_RE.search(thread) stated = _find_time(thread) if early and stated: try: start_dt = datetime.fromisoformat(ev.start) except ValueError: start_dt = None if start_dt is not None: mins = int(early.group(1)) appt_dt = start_dt.replace(hour=stated[0], minute=stated[1]) if start_dt == appt_dt: # model did not shift -> start at arrival start_dt = appt_dt - timedelta(minutes=mins) ev.start = start_dt.isoformat() if start_dt == appt_dt - timedelta(minutes=mins): # The event covers arrival (we or the model shifted it): anchor # the END to the stated time + stated duration, and make sure # the official time survives in the notes. duration = _find_duration_minutes(thread) if duration: ev.end = (appt_dt + timedelta(minutes=duration)).isoformat() hhmm = appt_dt.strftime("%H:%M") if hhmm not in (ev.notes or ""): note = f"Appointment at {hhmm}; arrive {mins} min early" ev.notes = f"{ev.notes} — {note}" if ev.notes else note m = _REMIND_EXPLICIT_RE.search(thread) if m: n = int(m.group(1)) ev.reminder_minutes = n * 60 if m.group(2).lower().startswith("h") else n elif _MEDICAL_RE.search(thread): ev.reminder_minutes = 60 elif _PARTY_RE.search(thread): ev.reminder_minutes = 30 elif _CARPOOL_SCHOOL_RE.search(thread): ev.reminder_minutes = 45 return plan def _polish_titles(thread: str, plan: ActionPlan) -> ActionPlan: if not plan.events or os.environ.get("TITLE_POLISH") != "1": return plan from .model import complete_json # lazy: avoids llama.cpp in stub mode try: raw = complete_json( build_title_messages(thread, [e.model_dump() for e in plan.events]), json_schema=TITLES_SCHEMA, max_tokens=256, ) except Exception: # noqa: BLE001 polish is best-effort, never fatal return plan return merge_titles(plan, raw) def run_agent_stream( thread: str, now: Optional[datetime] = None, existing: Optional[list[Event]] = None, images: Optional[list[str]] = None, busy=None, memory_block: Optional[str] = None, ): """Generator for the UI: yields (partial_text, plan_or_None). Streams the model output for a live 'thinking' panel, then yields the final ActionPlan (with deterministic conflicts annotated if ``busy`` intervals are given). ``memory_block`` carries the caller's per-user (localStorage) memory.""" now = now or datetime.now() existing = existing or [] with events.run_scope("analyze"): if images: events.emit("vision", f"reading {len(images)} image(s)", images=len(images)) if os.environ.get("USE_STUB_EXTRACTOR") == "1": plan = _stub_plan(thread, now) text = json.dumps(plan.model_dump(), indent=2) events.emit("model", "stub inference", latency_ms=0) acc = "" for i in range(0, len(text), 24): # simulate token streaming acc += text[i : i + 24] yield acc, None else: from .model import stream_complete_json acc = "" for delta in stream_complete_json( build_messages(thread, now, existing, images, memory_block), ActionPlan.model_json_schema(), ): acc += delta yield acc, None plan = apply_text_rules(thread, _polish_titles(thread, _parse_plan(acc))) # Global path only (see run_agent): client memory is merged by the UI. if memory_block is None: memory.observe_plan(plan) # grows-with-you: learn recurring contacts events.emit("decision", f"{len(plan.events)} event(s) detected", events=len(plan.events)) if busy: from calendar_out.freebusy import annotate_conflicts # lazy: avoid cycle plan = annotate_conflicts(plan, busy) yield (json.dumps(plan.model_dump(), indent=2), plan) _TIME_RE = re.compile(r"\b(\d{1,2})(?::(\d{2}))?\s*(am|pm)?\b", re.IGNORECASE) _TIME_LABEL_RE = re.compile(r"(?im)^\s*time\s*[:\-]\s*(.+)$") _MONTH_DATE_RE = re.compile( r"\b(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|" r"aug(?:ust)?|sep(?:t(?:ember)?)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)\.?\s+" r"\d{1,2}(?:st|nd|rd|th)?(?:,?\s*\d{4})?\b", re.IGNORECASE) _WEEKDAY_RE = re.compile( r"\b(monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b", re.IGNORECASE) _LOCATION_RE = re.compile( r"(?i)^\s*(?:(?:location|where|address)\s*[:\-]|\U0001F4CD)\s*(.*)$") _LABEL_LINE_RE = re.compile(r"^\s*[A-Za-z][A-Za-z ]{0,20}:\s") # "Time: ...", "Notes: ..." _DURATION_RE = re.compile(r"(?im)^\s*duration\s*[:\-]\s*(.*)$") _EARLY_RE = re.compile(r"(?i)arrive\s+(\d{1,3})\s*min(?:ute)?s?\s+early") _REMIND_EXPLICIT_RE = re.compile( r"(?i)\b(?:remind(?:er)?|notify|alert)\s*(?:me\s+)?(?:for\s+)?" r"(\d{1,3})\s*(min(?:ute)?s?|h(?:ou)?rs?)\s*(?:before|ahead|prior|early)") _MEDICAL_RE = re.compile( r"(?i)\b(?:doctor|dr\b\.?|clinic|dentist|dental|pediatric\w*|physician|" r"medical|check-?up|primary\s+care|intake\s+forms?)") _PARTY_RE = re.compile( # "party of 4" is a group size, not a party r"(?i)\b(?:birthday|bday)\b|\bparty\b(?!\s+of\s+\d)") _CARPOOL_SCHOOL_RE = re.compile(r"(?i)\bcarpool\w*\b|\bschool\b|drive\s+the\s+kids") def _find_time(thread: str) -> Optional[tuple[int, int]]: """First plausible clock time, or None. A bare integer ("June 22", "112A") is not a time — a match needs a minute component or an am/pm marker.""" label = _TIME_LABEL_RE.search(thread) scope = label.group(1) if label else thread for m in _TIME_RE.finditer(scope): if not (m.group(2) or m.group(3)): continue hour, minute = int(m.group(1)), int(m.group(2) or 0) if hour > 23 or minute > 59: continue mer = (m.group(3) or "").lower() if mer == "pm" and hour < 12: hour += 12 elif mer == "am" and hour == 12: hour = 0 return hour, minute return None def _find_date(thread: str, now: datetime): """Resolve the event's day: explicit date > today/tomorrow > weekday > tomorrow.""" m = _MONTH_DATE_RE.search(thread) if m: try: return dtparser.parse(m.group(0), default=now).date() except (ValueError, OverflowError): pass if re.search(r"\btomorrow\b", thread, re.IGNORECASE): return (now + timedelta(days=1)).date() if re.search(r"\btoday\b|\btonight\b", thread, re.IGNORECASE): return now.date() m = _WEEKDAY_RE.search(thread) if m: try: return dtparser.parse(m.group(1), default=now).date() # next-or-same day except (ValueError, OverflowError): pass return (now + timedelta(days=1)).date() def _find_location(lines: list[str]) -> tuple[Optional[str], set[int]]: """A "Location:" line plus continuation lines (a wrapped address) until a blank line or the next "Label:" line. Returns (joined location, line idxs).""" for i, line in enumerate(lines): m = _LOCATION_RE.match(line) if not m: continue parts, used = [m.group(1).strip()], {i} for j in range(i + 1, len(lines)): nxt = lines[j].strip() if not nxt or nxt.startswith("(") or _LABEL_LINE_RE.match(lines[j]): break parts.append(nxt) used.add(j) loc = ", ".join(p for p in parts if p) return (loc or None), used return None, set() def _find_duration_minutes(thread: str) -> Optional[int]: m = _DURATION_RE.search(thread) if m: num = re.search(r"\d+", m.group(1)) if num: return int(num.group(0)) return None def _reminder_minutes(thread: str) -> int: """Notification lead time: an explicit ask wins, else event-type defaults (medical 60, party 30, carpool/school 45 — checked in that order), else 30.""" m = _REMIND_EXPLICIT_RE.search(thread) if m: n = int(m.group(1)) return n * 60 if m.group(2).lower().startswith("h") else n if _MEDICAL_RE.search(thread): return 60 if _PARTY_RE.search(thread): return 30 if _CARPOOL_SCHOOL_RE.search(thread): return 45 return 30 def _is_date_line(line: str, now: datetime) -> bool: try: dtparser.parse(line, default=now) # non-fuzzy: chatter raises ParserError return True except (ValueError, OverflowError): return False def _pick_title(lines: list[str], now: datetime, location_idx: set[int]) -> str: nonempty = [(i, ln.strip()) for i, ln in enumerate(lines) if ln.strip()] if not nonempty: return "Event" first_i, first = nonempty[0] if not _is_date_line(first, now): return first[:60] # First line is just the date — find a more descriptive line instead. for i, ln in nonempty[1:]: if i in location_idx or _LABEL_LINE_RE.match(ln) or ln.startswith("("): continue if _is_date_line(ln, now): continue return ln[:60] return "Appointment" def _stub_plan(thread: str, now: datetime) -> ActionPlan: """Heuristic ActionPlan so phases without a model still demo end to end.""" time_found = _find_time(thread) if not time_found: return ActionPlan(reasoning="No time found.", reply_draft="Got it, thanks!") hour, minute = time_found lines = thread.strip().splitlines() location, loc_idx = _find_location(lines) day = _find_date(thread, now) appt = now.replace(year=day.year, month=day.month, day=day.day, hour=hour, minute=minute, second=0, microsecond=0) duration = _find_duration_minutes(thread) or 60 # "Arrive N minutes early" -> start at the ARRIVAL time; the end (and the # notes) stay anchored to the stated appointment time. early = _EARLY_RE.search(thread) start = appt - timedelta(minutes=int(early.group(1))) if early else appt notes = (f"Appointment at {appt.strftime('%H:%M')}; arrive {early.group(1)} min early" if early else "(stub agent — wire the model to replace this)") return ActionPlan( reasoning="(stub) parsed time/date/location heuristically.", events=[ Event( title=_pick_title(lines, now, loc_idx), start=start.isoformat(), end=(appt + timedelta(minutes=duration)).isoformat(), location=location, reminder_minutes=_reminder_minutes(thread), notes=notes, ) ], reply_draft="Sounds good, see you then!", )