File size: 4,238 Bytes
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from __future__ import annotations

from backend.ai.classifier import classify_email
from backend.ai.composer import compose_reply
from backend.ai.extractor import extract_email_data


def email_received(trigger: dict) -> dict:
    return trigger


def schedule(trigger: dict) -> dict:
    return trigger


def file_uploaded(trigger: dict) -> dict:
    return trigger


def manual_trigger(trigger: dict) -> dict:
    return trigger


def ai_extract(params: dict, runtime: dict) -> dict:
    email = runtime["trigger"]["email"]
    return extract_email_data(email["from"], email.get("subject", ""), email.get("body", ""))


def ai_classify(params: dict, runtime: dict) -> dict:
    email = runtime["trigger"]["email"]
    return classify_email(email["from"], email.get("subject", ""), email.get("body", ""), runtime["owner"].get("business_description", ""))


def ai_compose(params: dict, runtime: dict) -> dict:
    context = _resolve_value(params.get("context"), runtime)
    return {"reply_body": compose_reply(context if isinstance(context, dict) else {"customer_name": "there"})}


def sheets_read(params: dict, runtime: dict) -> dict:
    sheet_name = params["sheet_name"]
    return {"rows": runtime["db"].read_sheet(runtime["owner_id"], sheet_name)}


def sheets_write(params: dict, runtime: dict) -> dict:
    data = _resolve_value(params["data"], runtime)
    sheet_name = params["sheet_name"]
    runtime["db"].append_sheet_row(runtime["owner_id"], sheet_name, data)
    return {"written": data}


def sheets_update(params: dict, runtime: dict) -> dict:
    runtime["db"].update_sheet_row(
        runtime["owner_id"],
        params["sheet_name"],
        params["lookup_column"],
        _resolve_value(params["lookup_value"], runtime),
        params["update_column"],
        _resolve_value(params["update_value"], runtime),
    )
    return {"updated": True}


def send_email(params: dict, runtime: dict) -> dict:
    message = {
        "to": _resolve_value(params["to"], runtime),
        "subject": params["subject"],
        "body": _resolve_value(params["body"], runtime),
    }
    runtime["db"].record_outbound_email(runtime["owner_id"], message)
    return message


def notify_owner(params: dict, runtime: dict) -> dict:
    payload = {
        "message": params["message"],
        "severity": params.get("severity", "info"),
        "options": params.get("options", []),
    }
    runtime["db"].create_escalation(runtime["owner_id"], runtime["workflow_id"], runtime.get("execution_id", "pending"), payload)
    return payload


def condition(params: dict, runtime: dict) -> dict:
    check = params["check"]
    if "needs_clarification" in check:
        extracted = runtime["step_results"].get("step_extract", {})
        return {"next_step": params["if_false"] if extracted.get("needs_clarification") else params["if_true"]}
    return {"next_step": params["if_true"]}


def loop(params: dict, runtime: dict) -> dict:
    items = _resolve_value(params["items"], runtime)
    return {"items": items}


def wait_for_input(params: dict, runtime: dict) -> dict:
    return notify_owner(
        {
            "message": params["prompt"],
            "severity": "warning",
            "options": params["options"],
        },
        runtime,
    )


def _resolve_value(value, runtime: dict):
    if isinstance(value, dict):
        return {key: _resolve_value(item, runtime) for key, item in value.items()}
    if isinstance(value, list):
        return [_resolve_value(item, runtime) for item in value]
    if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
        path = value[2:-2].strip().split(".")
        current = runtime
        for part in path:
            if part in current:
                current = current[part]
            elif part == "trigger":
                current = runtime["trigger"]
            elif part in runtime["step_results"]:
                current = runtime["step_results"][part]
            elif isinstance(current, dict) and part in current:
                current = current[part]
            elif part == "config":
                current = runtime["owner"]
            else:
                return value
        return current
    return value