Spaces:
Sleeping
Sleeping
File size: 6,690 Bytes
32d42b3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """
app/core/context.py
-------------------
Unified request context for NotiFlow Autonomous.
The context object is the single source of truth for every request.
It is created at the API boundary, threaded through every agent, and
returned to the caller as part of the final response.
Structure
---------
{
"message": str, # original raw message
"intent": str | None, # detected intent (filled by IntentAgent)
"data": dict, # extracted + validated fields
"event": dict, # skill execution result
"state": str, # pipeline lifecycle state
"history": list[dict], # audit-level agent execution log β UPGRADED
"errors": list[str], # non-fatal errors accumulated during run
"priority": str, # "low" | "medium" | "high" (derived by UrgencyAgent)
"priority_score": int, # additive score 0-100 (contributed by multiple agents)
"priority_score_reasons": list[dict], # audit trail of score contributions
"plan": list[dict], # ordered main plan steps (set by Planner)
"autonomy_plan": list[dict], # ordered autonomy steps (set by AutonomyPlanner)
"metadata": dict, # source, sheet_updated, model, etc.
}
History entry shape (audit-ready):
{
"agent": str, # agent name
"action": str, # what the agent did (human-readable)
"input_keys": list[str], # context keys read by this agent
"output_keys": list[str], # context keys written by this agent
"status": str, # "success" | "error" | "skipped"
"detail": str, # error message or extra note
"timestamp": str, # ISO-8601 UTC
}
Pipeline states:
initialized β intent_detected β extracted β validated β routed β completed
Any stage can transition to: failed
Public API (unchanged from Phase 1 β fully backward compatible)
----------
create_context(message, source) -> dict
update_context(ctx, **kwargs) -> dict
log_step(ctx, agent, status, detail, *, action, input_keys, output_keys) -> None
add_error(ctx, error) -> None
"""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from app.core.event_bus import push_live_log
# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------
def create_context(message: str, source: str = "system") -> dict[str, Any]:
"""
Create a fresh request context for a new message.
Args:
message: Raw business message (Hinglish or English).
source: Notification source (e.g. "whatsapp", "gpay").
Returns:
Fully initialised context dict.
"""
return {
"message": message.strip(),
"source": source,
# ββ Intent (Phase 5: multi-intent) ββββββββββββββββββββββββββββββββ
"intents": [], # all detected intents, ordered (set by IntentAgent)
"intent": None, # primary intent = intents[0] β kept for backward compat
# ββ Extraction ββββββββββββββββββββββββββββββββββββββββββββββββββββ
"multi_data": {}, # per-intent extracted fields {intent_name: {fields}}
"data": {}, # primary intent extraction β kept for backward compat
"event": {},
"invoice": None,
"payment": None,
"events": [],
"live_logs": [],
"state": "initialized",
"history": [],
"errors": [],
"priority": "normal", # final derived label (set by UrgencyAgent)
"priority_score": 0, # additive score 0-100 (Phase 3 fix)
"priority_score_reasons": [], # audit trail of score contributions
# plan is [] until Planner fills it; list[dict] in Phase 2
"plan": [],
"autonomy_plan": [], # filled by autonomy_planner
"metadata": {
"source": source,
"sheet_updated": False,
"model": None,
"retry_count": 0, # replan loop counter
"created_at": datetime.now(timezone.utc).isoformat(),
},
}
# ---------------------------------------------------------------------------
# Mutators
# ---------------------------------------------------------------------------
def update_context(ctx: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
"""
Apply keyword updates to the context (in-place, returns ctx).
Supports double-underscore for nested dicts:
update_context(ctx, metadata__source="whatsapp")
update_context(ctx, intent="order", state="intent_detected")
"""
for key, value in kwargs.items():
if "__" in key:
parent, child = key.split("__", 1)
if parent in ctx and isinstance(ctx[parent], dict):
ctx[parent][child] = value
else:
ctx[key] = value
return ctx
def log_step(
ctx: dict[str, Any],
agent: str,
status: str,
detail: str = "",
*,
action: str = "",
input_keys: list[str] | None = None,
output_keys: list[str] | None = None,
) -> None:
"""
Append an audit-level execution entry to context["history"].
Backward compatible: the original 4-arg signature still works.
New callers can supply keyword-only audit fields for richer logs.
Args:
ctx: The active context dict.
agent: Agent name (e.g. "IntentAgent").
status: "success" | "error" | "skipped".
detail: Error message or human-readable note.
action: What the agent did (e.g. "classified intent as payment").
input_keys: Context keys the agent READ (e.g. ["message"]).
output_keys: Context keys the agent WROTE (e.g. ["intent", "state"]).
"""
entry = {
"agent": agent,
"action": action or f"{agent} executed",
"input_keys": input_keys or [],
"output_keys": output_keys or [],
"status": status,
"detail": detail,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
ctx["history"].append(entry)
push_live_log(ctx, entry)
def add_error(ctx: dict[str, Any], error: str) -> None:
"""Record a non-fatal error in context["errors"]."""
ctx["errors"].append(error)
|