Spaces:
Sleeping
Sleeping
| """ | |
| app/core/context.py | |
| ------------------- | |
| Unified request context for NotiFlow Autonomous. | |
| The context object is the single source of truth for every request. | |
| It is created at the API boundary, threaded through every agent, and | |
| returned to the caller as part of the final response. | |
| Structure | |
| --------- | |
| { | |
| "message": str, # original raw message | |
| "intent": str | None, # detected intent (filled by IntentAgent) | |
| "data": dict, # extracted + validated fields | |
| "event": dict, # skill execution result | |
| "state": str, # pipeline lifecycle state | |
| "history": list[dict], # audit-level agent execution log β UPGRADED | |
| "errors": list[str], # non-fatal errors accumulated during run | |
| "priority": str, # "low" | "medium" | "high" (derived by UrgencyAgent) | |
| "priority_score": int, # additive score 0-100 (contributed by multiple agents) | |
| "priority_score_reasons": list[dict], # audit trail of score contributions | |
| "plan": list[dict], # ordered main plan steps (set by Planner) | |
| "autonomy_plan": list[dict], # ordered autonomy steps (set by AutonomyPlanner) | |
| "metadata": dict, # source, sheet_updated, model, etc. | |
| } | |
| History entry shape (audit-ready): | |
| { | |
| "agent": str, # agent name | |
| "action": str, # what the agent did (human-readable) | |
| "input_keys": list[str], # context keys read by this agent | |
| "output_keys": list[str], # context keys written by this agent | |
| "status": str, # "success" | "error" | "skipped" | |
| "detail": str, # error message or extra note | |
| "timestamp": str, # ISO-8601 UTC | |
| } | |
| Pipeline states: | |
| initialized β intent_detected β extracted β validated β routed β completed | |
| Any stage can transition to: failed | |
| Public API (unchanged from Phase 1 β fully backward compatible) | |
| ---------- | |
| create_context(message, source) -> dict | |
| update_context(ctx, **kwargs) -> dict | |
| log_step(ctx, agent, status, detail, *, action, input_keys, output_keys) -> None | |
| add_error(ctx, error) -> None | |
| """ | |
| from __future__ import annotations | |
| from datetime import datetime, timezone | |
| from typing import Any | |
| from app.core.event_bus import push_live_log | |
| # --------------------------------------------------------------------------- | |
| # Factory | |
| # --------------------------------------------------------------------------- | |
| def create_context(message: str, source: str = "system") -> dict[str, Any]: | |
| """ | |
| Create a fresh request context for a new message. | |
| Args: | |
| message: Raw business message (Hinglish or English). | |
| source: Notification source (e.g. "whatsapp", "gpay"). | |
| Returns: | |
| Fully initialised context dict. | |
| """ | |
| return { | |
| "message": message.strip(), | |
| "source": source, | |
| # ββ Intent (Phase 5: multi-intent) ββββββββββββββββββββββββββββββββ | |
| "intents": [], # all detected intents, ordered (set by IntentAgent) | |
| "intent": None, # primary intent = intents[0] β kept for backward compat | |
| # ββ Extraction ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| "multi_data": {}, # per-intent extracted fields {intent_name: {fields}} | |
| "data": {}, # primary intent extraction β kept for backward compat | |
| "event": {}, | |
| "invoice": None, | |
| "payment": None, | |
| "events": [], | |
| "live_logs": [], | |
| "state": "initialized", | |
| "history": [], | |
| "errors": [], | |
| "priority": "normal", # final derived label (set by UrgencyAgent) | |
| "priority_score": 0, # additive score 0-100 (Phase 3 fix) | |
| "priority_score_reasons": [], # audit trail of score contributions | |
| # plan is [] until Planner fills it; list[dict] in Phase 2 | |
| "plan": [], | |
| "autonomy_plan": [], # filled by autonomy_planner | |
| "metadata": { | |
| "source": source, | |
| "sheet_updated": False, | |
| "model": None, | |
| "retry_count": 0, # replan loop counter | |
| "created_at": datetime.now(timezone.utc).isoformat(), | |
| }, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Mutators | |
| # --------------------------------------------------------------------------- | |
| def update_context(ctx: dict[str, Any], **kwargs: Any) -> dict[str, Any]: | |
| """ | |
| Apply keyword updates to the context (in-place, returns ctx). | |
| Supports double-underscore for nested dicts: | |
| update_context(ctx, metadata__source="whatsapp") | |
| update_context(ctx, intent="order", state="intent_detected") | |
| """ | |
| for key, value in kwargs.items(): | |
| if "__" in key: | |
| parent, child = key.split("__", 1) | |
| if parent in ctx and isinstance(ctx[parent], dict): | |
| ctx[parent][child] = value | |
| else: | |
| ctx[key] = value | |
| return ctx | |
| def log_step( | |
| ctx: dict[str, Any], | |
| agent: str, | |
| status: str, | |
| detail: str = "", | |
| *, | |
| action: str = "", | |
| input_keys: list[str] | None = None, | |
| output_keys: list[str] | None = None, | |
| ) -> None: | |
| """ | |
| Append an audit-level execution entry to context["history"]. | |
| Backward compatible: the original 4-arg signature still works. | |
| New callers can supply keyword-only audit fields for richer logs. | |
| Args: | |
| ctx: The active context dict. | |
| agent: Agent name (e.g. "IntentAgent"). | |
| status: "success" | "error" | "skipped". | |
| detail: Error message or human-readable note. | |
| action: What the agent did (e.g. "classified intent as payment"). | |
| input_keys: Context keys the agent READ (e.g. ["message"]). | |
| output_keys: Context keys the agent WROTE (e.g. ["intent", "state"]). | |
| """ | |
| entry = { | |
| "agent": agent, | |
| "action": action or f"{agent} executed", | |
| "input_keys": input_keys or [], | |
| "output_keys": output_keys or [], | |
| "status": status, | |
| "detail": detail, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| } | |
| ctx["history"].append(entry) | |
| push_live_log(ctx, entry) | |
| def add_error(ctx: dict[str, Any], error: str) -> None: | |
| """Record a non-fatal error in context["errors"].""" | |
| ctx["errors"].append(error) | |