File size: 6,690 Bytes
32d42b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
app/core/context.py
-------------------
Unified request context for NotiFlow Autonomous.

The context object is the single source of truth for every request.
It is created at the API boundary, threaded through every agent, and
returned to the caller as part of the final response.

Structure
---------
{
    "message":   str,           # original raw message
    "intent":    str | None,    # detected intent (filled by IntentAgent)
    "data":      dict,          # extracted + validated fields
    "event":     dict,          # skill execution result
    "state":     str,           # pipeline lifecycle state
    "history":   list[dict],    # audit-level agent execution log  ← UPGRADED
    "errors":    list[str],     # non-fatal errors accumulated during run
    "priority":              str,           # "low" | "medium" | "high" (derived by UrgencyAgent)
    "priority_score":        int,           # additive score 0-100 (contributed by multiple agents)
    "priority_score_reasons": list[dict],  # audit trail of score contributions
    "plan":          list[dict],   # ordered main plan steps (set by Planner)
    "autonomy_plan": list[dict],   # ordered autonomy steps (set by AutonomyPlanner)
    "metadata":  dict,          # source, sheet_updated, model, etc.
}

History entry shape (audit-ready):
    {
        "agent":       str,       # agent name
        "action":      str,       # what the agent did (human-readable)
        "input_keys":  list[str], # context keys read by this agent
        "output_keys": list[str], # context keys written by this agent
        "status":      str,       # "success" | "error" | "skipped"
        "detail":      str,       # error message or extra note
        "timestamp":   str,       # ISO-8601 UTC
    }

Pipeline states:
    initialized β†’ intent_detected β†’ extracted β†’ validated β†’ routed β†’ completed
    Any stage can transition to: failed

Public API (unchanged from Phase 1 β€” fully backward compatible)
----------
create_context(message, source) -> dict
update_context(ctx, **kwargs)   -> dict
log_step(ctx, agent, status, detail, *, action, input_keys, output_keys) -> None
add_error(ctx, error) -> None
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import Any

from app.core.event_bus import push_live_log


# ---------------------------------------------------------------------------
# Factory
# ---------------------------------------------------------------------------

def create_context(message: str, source: str = "system") -> dict[str, Any]:
    """
    Create a fresh request context for a new message.

    Args:
        message: Raw business message (Hinglish or English).
        source:  Notification source (e.g. "whatsapp", "gpay").

    Returns:
        Fully initialised context dict.
    """
    return {
        "message":  message.strip(),
        "source":   source,
        # ── Intent (Phase 5: multi-intent) ────────────────────────────────
        "intents":  [],     # all detected intents, ordered (set by IntentAgent)
        "intent":   None,   # primary intent = intents[0] β€” kept for backward compat
        # ── Extraction ────────────────────────────────────────────────────
        "multi_data": {},   # per-intent extracted fields {intent_name: {fields}}
        "data":     {},     # primary intent extraction β€” kept for backward compat
        "event":    {},
        "invoice":  None,
        "payment":  None,
        "events":   [],
        "live_logs": [],
        "state":    "initialized",
        "history":  [],
        "errors":   [],
        "priority": "normal",           # final derived label (set by UrgencyAgent)
        "priority_score": 0,            # additive score 0-100 (Phase 3 fix)
        "priority_score_reasons": [],   # audit trail of score contributions
        # plan is [] until Planner fills it; list[dict] in Phase 2
        "plan":          [],
        "autonomy_plan": [],            # filled by autonomy_planner
        "metadata": {
            "source":        source,
            "sheet_updated": False,
            "model":         None,
            "retry_count":   0,         # replan loop counter
            "created_at":    datetime.now(timezone.utc).isoformat(),
        },
    }


# ---------------------------------------------------------------------------
# Mutators
# ---------------------------------------------------------------------------

def update_context(ctx: dict[str, Any], **kwargs: Any) -> dict[str, Any]:
    """
    Apply keyword updates to the context (in-place, returns ctx).

    Supports double-underscore for nested dicts:
        update_context(ctx, metadata__source="whatsapp")
        update_context(ctx, intent="order", state="intent_detected")
    """
    for key, value in kwargs.items():
        if "__" in key:
            parent, child = key.split("__", 1)
            if parent in ctx and isinstance(ctx[parent], dict):
                ctx[parent][child] = value
        else:
            ctx[key] = value
    return ctx


def log_step(
    ctx:         dict[str, Any],
    agent:       str,
    status:      str,
    detail:      str = "",
    *,
    action:      str = "",
    input_keys:  list[str] | None = None,
    output_keys: list[str] | None = None,
) -> None:
    """
    Append an audit-level execution entry to context["history"].

    Backward compatible: the original 4-arg signature still works.
    New callers can supply keyword-only audit fields for richer logs.

    Args:
        ctx:         The active context dict.
        agent:       Agent name (e.g. "IntentAgent").
        status:      "success" | "error" | "skipped".
        detail:      Error message or human-readable note.
        action:      What the agent did (e.g. "classified intent as payment").
        input_keys:  Context keys the agent READ  (e.g. ["message"]).
        output_keys: Context keys the agent WROTE (e.g. ["intent", "state"]).
    """
    entry = {
        "agent":       agent,
        "action":      action or f"{agent} executed",
        "input_keys":  input_keys  or [],
        "output_keys": output_keys or [],
        "status":      status,
        "detail":      detail,
        "timestamp":   datetime.now(timezone.utc).isoformat(),
    }
    ctx["history"].append(entry)
    push_live_log(ctx, entry)


def add_error(ctx: dict[str, Any], error: str) -> None:
    """Record a non-fatal error in context["errors"]."""
    ctx["errors"].append(error)