| """The scheduling agent: thread (+images) -> validated ActionPlan. |
| |
| Replaces the old one-shot extractor. The model reasons over a whole conversation |
| and emits a single constrained ActionPlan: events, conflicts (vs the user's |
| existing calendar), proposed alternative times, a reply draft, and an optional |
| clarification question. Output is grammar-constrained so it always parses. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import re |
| from datetime import datetime, timedelta |
| from typing import Optional |
|
|
| from dateutil import parser as dtparser |
| from pydantic import ValidationError |
|
|
| from . import events, memory |
| from .schema import ActionPlan, Event |
|
|
| SYSTEM = ( |
| "You are a scheduling assistant reading a chat conversation (text, and sometimes images " |
| "such as screenshots, invites, or flyers). Decide what calendar action is warranted and " |
| "return ONLY a JSON object matching the ActionPlan schema:\n" |
| "- reasoning: one or two sentences of why.\n" |
| "- events: concrete events with ISO 8601 datetimes; resolve relative dates from the current " |
| "datetime. Empty if there is no real plan. List EVERY distinct event separately — one thread " |
| "often holds several (e.g. a drop-off AND a pickup, or two appointments, are separate events).\n" |
| "- title: a short, self-contained calendar title summarizing the action and subject " |
| "(e.g. \"Pick up Priya — Terminal 4\", \"Mia — dental cleaning\"), not a quote of the " |
| "message.\n" |
| "- location: the venue or address when one is mentioned (join multi-line addresses into one " |
| "string); null otherwise.\n" |
| "- end: when a duration is stated (\"Duration: 30–45 min\", \"for 2 hours\", \"runs 90 " |
| "minutes\"), set end = start + duration, using the LOWER bound of a range; when an end time " |
| "is stated (\"7-9pm\"), use it; otherwise null. Never guess a duration that was not given.\n" |
| "- early arrival: if told to arrive N minutes early (\"please arrive 15 minutes early\"), " |
| "start = the arrival time (stated time minus N); end still counts from the STATED time; put " |
| "the stated time and the reason in notes.\n" |
| "- reminder_minutes: a stated lead time always wins (\"remind me 2 hours before\" -> 120); " |
| "otherwise 60 for doctor/medical visits, 30 for parties, 45 for carpools or school events; " |
| "for anything else use your judgment.\n" |
| "- conflicts: for any event that clashes with the provided existing calendar, the event_index, " |
| "what it clashes with, and severity (overlap|adjacent|tight).\n" |
| "- proposed_times: ISO 8601 alternatives when there is a conflict.\n" |
| "- reply_draft: a short, natural reply the user could send back.\n" |
| "- needs_clarification: a question if the plan is ambiguous, else null. If something should " |
| "be scheduled but its day or time is not yet known (\"TBD\", \"I'll confirm\", \"sometime " |
| "soon\"), leave events empty and ASK via needs_clarification instead of guessing.\n" |
| "Do not invent events that were not discussed." |
| ) |
|
|
|
|
| def _existing_block(existing: list[Event]) -> str: |
| if not existing: |
| return "Existing calendar: (none provided)" |
| lines = [f"- {e.title}: {e.start}..{e.end or e.start}" for e in existing] |
| return "Existing calendar:\n" + "\n".join(lines) |
|
|
|
|
| def build_messages( |
| thread: str, |
| now: datetime, |
| existing: list[Event], |
| images: Optional[list[str]] = None, |
| memory_block: Optional[str] = None, |
| ) -> list[dict]: |
| """Build chat messages. ``images`` are base64 data URIs (used from phase 3). |
| ``memory_block`` is the caller's recall block (per-user/localStorage memory); |
| when None, fall back to the server-side global memory.recall().""" |
| mem = memory.recall() if memory_block is None else memory_block |
| mem_block = f"{mem}\n\n" if mem else "" |
| text = ( |
| f"Current datetime: {now.strftime('%A')}, {now.isoformat()}\n" |
| f"{_existing_block(existing)}\n\n" |
| f"{mem_block}" |
| f"Conversation:\n{thread}\n\n" |
| "Return the ActionPlan JSON now." |
| ) |
| if not images: |
| return [ |
| {"role": "system", "content": SYSTEM}, |
| {"role": "user", "content": text}, |
| ] |
| |
| content = [{"type": "text", "text": text}] |
| for uri in images: |
| content.append({"type": "image_url", "image_url": {"url": uri}}) |
| return [ |
| {"role": "system", "content": SYSTEM}, |
| {"role": "user", "content": content}, |
| ] |
|
|
|
|
| def run_agent( |
| thread: str, |
| now: Optional[datetime] = None, |
| existing: Optional[list[Event]] = None, |
| images: Optional[list[str]] = None, |
| memory_block: Optional[str] = None, |
| ) -> ActionPlan: |
| now = now or datetime.now() |
| existing = existing or [] |
|
|
| with events.run_scope("analyze"): |
| if images: |
| events.emit("vision", f"reading {len(images)} image(s)", images=len(images)) |
|
|
| if os.environ.get("USE_STUB_EXTRACTOR") == "1": |
| plan = _stub_plan(thread, now) |
| else: |
| from .model import complete_json |
|
|
| raw = complete_json( |
| build_messages(thread, now, existing, images, memory_block), |
| json_schema=ActionPlan.model_json_schema(), |
| ) |
| plan = apply_text_rules(thread, _polish_titles(thread, _parse_plan(raw))) |
| |
| |
| |
| if memory_block is None: |
| memory.observe_plan(plan) |
| events.emit("decision", f"{len(plan.events)} event(s) detected", events=len(plan.events)) |
| return plan |
|
|
|
|
| def _parse_plan(raw: str) -> ActionPlan: |
| try: |
| return ActionPlan(**json.loads(raw)) |
| except (json.JSONDecodeError, ValidationError): |
| |
| return ActionPlan(reasoning="Could not parse model output.") |
|
|
|
|
| |
| |
| |
| |
| |
| |
| TITLE_SYSTEM = ( |
| "You rewrite calendar event titles. Given a conversation and the events extracted from " |
| "it, return ONLY a JSON object {\"titles\": [...]} with exactly one title per event, in " |
| "the same order. Each title is a short, self-contained calendar entry summarizing the " |
| "action and subject (e.g. \"Pick up Priya — Terminal 4\", \"Mia — dental cleaning\"). " |
| "Keep names and places; drop filler, hype and sender wording. Never add facts that are " |
| "not in the conversation." |
| ) |
|
|
| TITLES_SCHEMA = { |
| "type": "object", |
| "properties": {"titles": {"type": "array", "items": {"type": "string"}}}, |
| "required": ["titles"], |
| } |
|
|
|
|
| def build_title_messages(thread: str, events: list[dict]) -> list[dict]: |
| """Messages for the polish pass. ``events`` are Event-shaped dicts.""" |
| lines = [ |
| f"{i + 1}. {e.get('title') or '(untitled)'} @ {e.get('start')}" |
| + (f" ({e['location']})" if e.get("location") else "") |
| for i, e in enumerate(events) |
| ] |
| text = ( |
| f"Conversation:\n{thread}\n\n" |
| "Extracted events:\n" + "\n".join(lines) + "\n\n" |
| "Return the titles JSON now." |
| ) |
| return [ |
| {"role": "system", "content": TITLE_SYSTEM}, |
| {"role": "user", "content": text}, |
| ] |
|
|
|
|
| def merge_titles(plan: ActionPlan, raw: str) -> ActionPlan: |
| """Apply a polish-pass response onto the plan; on any mismatch keep the |
| original titles (the polish pass must never be able to lose an event).""" |
| try: |
| titles = json.loads(raw).get("titles") |
| except (json.JSONDecodeError, AttributeError): |
| return plan |
| if not isinstance(titles, list) or len(titles) != len(plan.events): |
| return plan |
| for ev, title in zip(plan.events, titles): |
| if isinstance(title, str) and title.strip(): |
| ev.title = title.strip()[:80] |
| return plan |
|
|
|
|
| def apply_text_rules(thread: str, plan: ActionPlan) -> ActionPlan: |
| """Deterministic guarantees for explicitly-communicated logistics (same |
| philosophy as conflict detection: don't leave must-hold rules to the model). |
| Single-event plans only — multi-event threads keep per-event model judgment. |
| |
| - "arrive N minutes early" -> start = arrival time, but ONLY when the model |
| demonstrably did not shift already (its start equals the stated time). |
| - end = STATED time + stated duration: a self-shifting model often counts |
| the duration from the arrival time (10:15+30=10:45 instead of 11:00). |
| - reminder: an explicit stated lead time always wins; else type defaults |
| (medical 60 / party 30 / carpool-school 45); else the model's judgment. |
| """ |
| if len(plan.events) != 1: |
| return plan |
| ev = plan.events[0] |
| early = _EARLY_RE.search(thread) |
| stated = _find_time(thread) |
| if early and stated: |
| try: |
| start_dt = datetime.fromisoformat(ev.start) |
| except ValueError: |
| start_dt = None |
| if start_dt is not None: |
| mins = int(early.group(1)) |
| appt_dt = start_dt.replace(hour=stated[0], minute=stated[1]) |
| if start_dt == appt_dt: |
| start_dt = appt_dt - timedelta(minutes=mins) |
| ev.start = start_dt.isoformat() |
| if start_dt == appt_dt - timedelta(minutes=mins): |
| |
| |
| |
| duration = _find_duration_minutes(thread) |
| if duration: |
| ev.end = (appt_dt + timedelta(minutes=duration)).isoformat() |
| hhmm = appt_dt.strftime("%H:%M") |
| if hhmm not in (ev.notes or ""): |
| note = f"Appointment at {hhmm}; arrive {mins} min early" |
| ev.notes = f"{ev.notes} — {note}" if ev.notes else note |
| m = _REMIND_EXPLICIT_RE.search(thread) |
| if m: |
| n = int(m.group(1)) |
| ev.reminder_minutes = n * 60 if m.group(2).lower().startswith("h") else n |
| elif _MEDICAL_RE.search(thread): |
| ev.reminder_minutes = 60 |
| elif _PARTY_RE.search(thread): |
| ev.reminder_minutes = 30 |
| elif _CARPOOL_SCHOOL_RE.search(thread): |
| ev.reminder_minutes = 45 |
| return plan |
|
|
|
|
| def _polish_titles(thread: str, plan: ActionPlan) -> ActionPlan: |
| if not plan.events or os.environ.get("TITLE_POLISH") != "1": |
| return plan |
| from .model import complete_json |
|
|
| try: |
| raw = complete_json( |
| build_title_messages(thread, [e.model_dump() for e in plan.events]), |
| json_schema=TITLES_SCHEMA, |
| max_tokens=256, |
| ) |
| except Exception: |
| return plan |
| return merge_titles(plan, raw) |
|
|
|
|
| def run_agent_stream( |
| thread: str, |
| now: Optional[datetime] = None, |
| existing: Optional[list[Event]] = None, |
| images: Optional[list[str]] = None, |
| busy=None, |
| memory_block: Optional[str] = None, |
| ): |
| """Generator for the UI: yields (partial_text, plan_or_None). Streams the |
| model output for a live 'thinking' panel, then yields the final ActionPlan |
| (with deterministic conflicts annotated if ``busy`` intervals are given). |
| ``memory_block`` carries the caller's per-user (localStorage) memory.""" |
| now = now or datetime.now() |
| existing = existing or [] |
|
|
| with events.run_scope("analyze"): |
| if images: |
| events.emit("vision", f"reading {len(images)} image(s)", images=len(images)) |
|
|
| if os.environ.get("USE_STUB_EXTRACTOR") == "1": |
| plan = _stub_plan(thread, now) |
| text = json.dumps(plan.model_dump(), indent=2) |
| events.emit("model", "stub inference", latency_ms=0) |
| acc = "" |
| for i in range(0, len(text), 24): |
| acc += text[i : i + 24] |
| yield acc, None |
| else: |
| from .model import stream_complete_json |
|
|
| acc = "" |
| for delta in stream_complete_json( |
| build_messages(thread, now, existing, images, memory_block), |
| ActionPlan.model_json_schema(), |
| ): |
| acc += delta |
| yield acc, None |
| plan = apply_text_rules(thread, _polish_titles(thread, _parse_plan(acc))) |
|
|
| |
| if memory_block is None: |
| memory.observe_plan(plan) |
| events.emit("decision", f"{len(plan.events)} event(s) detected", events=len(plan.events)) |
| if busy: |
| from calendar_out.freebusy import annotate_conflicts |
|
|
| plan = annotate_conflicts(plan, busy) |
| yield (json.dumps(plan.model_dump(), indent=2), plan) |
|
|
|
|
| _TIME_RE = re.compile(r"\b(\d{1,2})(?::(\d{2}))?\s*(am|pm)?\b", re.IGNORECASE) |
| _TIME_LABEL_RE = re.compile(r"(?im)^\s*time\s*[:\-]\s*(.+)$") |
| _MONTH_DATE_RE = re.compile( |
| r"\b(?:jan(?:uary)?|feb(?:ruary)?|mar(?:ch)?|apr(?:il)?|may|jun(?:e)?|jul(?:y)?|" |
| r"aug(?:ust)?|sep(?:t(?:ember)?)?|oct(?:ober)?|nov(?:ember)?|dec(?:ember)?)\.?\s+" |
| r"\d{1,2}(?:st|nd|rd|th)?(?:,?\s*\d{4})?\b", re.IGNORECASE) |
| _WEEKDAY_RE = re.compile( |
| r"\b(monday|tuesday|wednesday|thursday|friday|saturday|sunday)\b", re.IGNORECASE) |
| _LOCATION_RE = re.compile( |
| r"(?i)^\s*(?:(?:location|where|address)\s*[:\-]|\U0001F4CD)\s*(.*)$") |
| _LABEL_LINE_RE = re.compile(r"^\s*[A-Za-z][A-Za-z ]{0,20}:\s") |
| _DURATION_RE = re.compile(r"(?im)^\s*duration\s*[:\-]\s*(.*)$") |
| _EARLY_RE = re.compile(r"(?i)arrive\s+(\d{1,3})\s*min(?:ute)?s?\s+early") |
| _REMIND_EXPLICIT_RE = re.compile( |
| r"(?i)\b(?:remind(?:er)?|notify|alert)\s*(?:me\s+)?(?:for\s+)?" |
| r"(\d{1,3})\s*(min(?:ute)?s?|h(?:ou)?rs?)\s*(?:before|ahead|prior|early)") |
| _MEDICAL_RE = re.compile( |
| r"(?i)\b(?:doctor|dr\b\.?|clinic|dentist|dental|pediatric\w*|physician|" |
| r"medical|check-?up|primary\s+care|intake\s+forms?)") |
| _PARTY_RE = re.compile( |
| r"(?i)\b(?:birthday|bday)\b|\bparty\b(?!\s+of\s+\d)") |
| _CARPOOL_SCHOOL_RE = re.compile(r"(?i)\bcarpool\w*\b|\bschool\b|drive\s+the\s+kids") |
|
|
|
|
| def _find_time(thread: str) -> Optional[tuple[int, int]]: |
| """First plausible clock time, or None. A bare integer ("June 22", "112A") |
| is not a time — a match needs a minute component or an am/pm marker.""" |
| label = _TIME_LABEL_RE.search(thread) |
| scope = label.group(1) if label else thread |
| for m in _TIME_RE.finditer(scope): |
| if not (m.group(2) or m.group(3)): |
| continue |
| hour, minute = int(m.group(1)), int(m.group(2) or 0) |
| if hour > 23 or minute > 59: |
| continue |
| mer = (m.group(3) or "").lower() |
| if mer == "pm" and hour < 12: |
| hour += 12 |
| elif mer == "am" and hour == 12: |
| hour = 0 |
| return hour, minute |
| return None |
|
|
|
|
| def _find_date(thread: str, now: datetime): |
| """Resolve the event's day: explicit date > today/tomorrow > weekday > tomorrow.""" |
| m = _MONTH_DATE_RE.search(thread) |
| if m: |
| try: |
| return dtparser.parse(m.group(0), default=now).date() |
| except (ValueError, OverflowError): |
| pass |
| if re.search(r"\btomorrow\b", thread, re.IGNORECASE): |
| return (now + timedelta(days=1)).date() |
| if re.search(r"\btoday\b|\btonight\b", thread, re.IGNORECASE): |
| return now.date() |
| m = _WEEKDAY_RE.search(thread) |
| if m: |
| try: |
| return dtparser.parse(m.group(1), default=now).date() |
| except (ValueError, OverflowError): |
| pass |
| return (now + timedelta(days=1)).date() |
|
|
|
|
| def _find_location(lines: list[str]) -> tuple[Optional[str], set[int]]: |
| """A "Location:" line plus continuation lines (a wrapped address) until a |
| blank line or the next "Label:" line. Returns (joined location, line idxs).""" |
| for i, line in enumerate(lines): |
| m = _LOCATION_RE.match(line) |
| if not m: |
| continue |
| parts, used = [m.group(1).strip()], {i} |
| for j in range(i + 1, len(lines)): |
| nxt = lines[j].strip() |
| if not nxt or nxt.startswith("(") or _LABEL_LINE_RE.match(lines[j]): |
| break |
| parts.append(nxt) |
| used.add(j) |
| loc = ", ".join(p for p in parts if p) |
| return (loc or None), used |
| return None, set() |
|
|
|
|
| def _find_duration_minutes(thread: str) -> Optional[int]: |
| m = _DURATION_RE.search(thread) |
| if m: |
| num = re.search(r"\d+", m.group(1)) |
| if num: |
| return int(num.group(0)) |
| return None |
|
|
|
|
| def _reminder_minutes(thread: str) -> int: |
| """Notification lead time: an explicit ask wins, else event-type defaults |
| (medical 60, party 30, carpool/school 45 — checked in that order), else 30.""" |
| m = _REMIND_EXPLICIT_RE.search(thread) |
| if m: |
| n = int(m.group(1)) |
| return n * 60 if m.group(2).lower().startswith("h") else n |
| if _MEDICAL_RE.search(thread): |
| return 60 |
| if _PARTY_RE.search(thread): |
| return 30 |
| if _CARPOOL_SCHOOL_RE.search(thread): |
| return 45 |
| return 30 |
|
|
|
|
| def _is_date_line(line: str, now: datetime) -> bool: |
| try: |
| dtparser.parse(line, default=now) |
| return True |
| except (ValueError, OverflowError): |
| return False |
|
|
|
|
| def _pick_title(lines: list[str], now: datetime, location_idx: set[int]) -> str: |
| nonempty = [(i, ln.strip()) for i, ln in enumerate(lines) if ln.strip()] |
| if not nonempty: |
| return "Event" |
| first_i, first = nonempty[0] |
| if not _is_date_line(first, now): |
| return first[:60] |
| |
| for i, ln in nonempty[1:]: |
| if i in location_idx or _LABEL_LINE_RE.match(ln) or ln.startswith("("): |
| continue |
| if _is_date_line(ln, now): |
| continue |
| return ln[:60] |
| return "Appointment" |
|
|
|
|
| def _stub_plan(thread: str, now: datetime) -> ActionPlan: |
| """Heuristic ActionPlan so phases without a model still demo end to end.""" |
| time_found = _find_time(thread) |
| if not time_found: |
| return ActionPlan(reasoning="No time found.", reply_draft="Got it, thanks!") |
| hour, minute = time_found |
| lines = thread.strip().splitlines() |
| location, loc_idx = _find_location(lines) |
| day = _find_date(thread, now) |
| appt = now.replace(year=day.year, month=day.month, day=day.day, |
| hour=hour, minute=minute, second=0, microsecond=0) |
| duration = _find_duration_minutes(thread) or 60 |
| |
| |
| early = _EARLY_RE.search(thread) |
| start = appt - timedelta(minutes=int(early.group(1))) if early else appt |
| notes = (f"Appointment at {appt.strftime('%H:%M')}; arrive {early.group(1)} min early" |
| if early else "(stub agent — wire the model to replace this)") |
| return ActionPlan( |
| reasoning="(stub) parsed time/date/location heuristically.", |
| events=[ |
| Event( |
| title=_pick_title(lines, now, loc_idx), |
| start=start.isoformat(), |
| end=(appt + timedelta(minutes=duration)).isoformat(), |
| location=location, |
| reminder_minutes=_reminder_minutes(thread), |
| notes=notes, |
| ) |
| ], |
| reply_draft="Sounds good, see you then!", |
| ) |
|
|