File size: 6,081 Bytes
83fe4f9
 
 
426e9e3
83fe4f9
 
 
 
 
426e9e3
 
 
83fe4f9
 
 
 
 
 
 
 
 
 
 
f0c7697
83fe4f9
 
 
 
 
 
 
 
 
426e9e3
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0c7697
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
b032b2d
 
 
 
 
 
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
426e9e3
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426e9e3
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
426e9e3
83fe4f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from __future__ import annotations

from copy import deepcopy
from datetime import datetime, timezone
from uuid import uuid4


class InMemoryDatabase:
    def __init__(self) -> None:
        self.reset()

    def reset(self) -> None:
        self.owners: dict[str, dict] = {}
        self.workflows: dict[str, list[dict]] = {}
        self.data_files: dict[str, list[dict]] = {}
        self.execution_logs: dict[str, list[dict]] = {}
        self.escalations: dict[str, list[dict]] = {}
        self.sheets: dict[str, dict[str, list[dict]]] = {}
        self.outbound_emails: dict[str, list[dict]] = {}

    def ensure_owner(self, owner_id: str, email: str) -> dict:
        owner = self.owners.get(owner_id)
        if owner:
            owner["email"] = email
            return owner
        owner = {
            "id": owner_id,
            "email": email,
            "business_description": "",
            "business_analysis": {},
            "spreadsheet_id": "demo-sheet",
            "spreadsheet_config": {"connected": True, "inventory_sheet": "Inventory", "orders_sheet": "Orders"},
            "preferred_tone": "friendly",
            "created_at": datetime.now(timezone.utc).isoformat(),
            "state": "onboarding",
        }
        self.owners[owner_id] = owner
        self.sheets[owner_id] = {
            "Inventory": [
                {"item": "Honeycrisp apples", "stock": 120},
                {"item": "Fuji apples", "stock": 90},
            ],
            "Orders": [],
        }
        return owner

    def save_owner(self, owner: dict) -> dict:
        self.owners[owner["id"]] = owner
        return owner

    def get_owner(self, owner_id: str) -> dict:
        if owner_id not in self.owners:
            raise KeyError(f"owner {owner_id} not found")
        return deepcopy(self.owners[owner_id])

    def save_workflow(self, owner_id: str, workflow: dict) -> dict:
        record = deepcopy(workflow)
        record["id"] = str(uuid4())
        record["status"] = "active"
        self.workflows.setdefault(owner_id, []).append(record)
        return deepcopy(record)

    def list_workflows(self, owner_id: str) -> list[dict]:
        return deepcopy(self.workflows.get(owner_id, []))

    def deactivate_workflows(self, owner_id: str) -> list[dict]:
        workflows = self.workflows.get(owner_id, [])
        for workflow in workflows:
            workflow["status"] = "inactive"
        return deepcopy(workflows)

    def get_workflow(self, owner_id: str, workflow_id: str) -> dict:
        for workflow in self.workflows.get(owner_id, []):
            if workflow["id"] == workflow_id:
                return deepcopy(workflow)
        raise KeyError(f"workflow {workflow_id} not found")

    def save_data_file(self, owner_id: str, filename: str, file_type: str, purpose: str, parsed_data: dict) -> dict:
        record = {
            "id": str(uuid4()),
            "filename": filename,
            "file_type": file_type,
            "purpose": purpose,
            "parsed_data": parsed_data,
            "uploaded_at": datetime.now(timezone.utc).isoformat(),
        }
        self.data_files.setdefault(owner_id, []).append(record)
        return deepcopy(record)

    def list_data_files(self, owner_id: str) -> list[dict]:
        return deepcopy(self.data_files.get(owner_id, []))

    def save_execution_log(self, owner_id: str, workflow_id: str, trigger_data: dict, steps_executed: list, outcome: str, error_message: str | None) -> dict:
        record = {
            "id": str(uuid4()),
            "workflow_id": workflow_id,
            "trigger_data": trigger_data,
            "steps_executed": steps_executed,
            "outcome": outcome,
            "error_message": error_message,
            "executed_at": datetime.now(timezone.utc).isoformat(),
        }
        self.execution_logs.setdefault(owner_id, []).append(record)
        return deepcopy(record)

    def list_execution_logs(self, owner_id: str) -> list[dict]:
        return deepcopy(self.execution_logs.get(owner_id, []))

    def create_escalation(self, owner_id: str, workflow_id: str, execution_id: str, payload: dict) -> dict:
        record = {
            "id": str(uuid4()),
            "workflow_id": workflow_id,
            "execution_id": execution_id,
            "reason": payload["message"],
            "context": payload,
            "options": payload.get("options", []),
            "owner_response": None,
            "status": "pending",
            "created_at": datetime.now(timezone.utc).isoformat(),
        }
        self.escalations.setdefault(owner_id, []).append(record)
        return deepcopy(record)

    def list_escalations(self, owner_id: str) -> list[dict]:
        return deepcopy(self.escalations.get(owner_id, []))

    def resolve_escalation(self, escalation_id: str, response: str) -> dict:
        for owner_escalations in self.escalations.values():
            for item in owner_escalations:
                if item["id"] == escalation_id:
                    item["owner_response"] = response
                    item["status"] = "resolved"
                    return deepcopy(item)
        raise KeyError(f"escalation {escalation_id} not found")

    def read_sheet(self, owner_id: str, sheet_name: str) -> list[dict]:
        return deepcopy(self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []))

    def append_sheet_row(self, owner_id: str, sheet_name: str, row: dict) -> None:
        self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []).append(deepcopy(row))

    def update_sheet_row(self, owner_id: str, sheet_name: str, lookup_column: str, lookup_value, update_column: str, update_value) -> None:
        for row in self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []):
            if row.get(lookup_column) == lookup_value:
                row[update_column] = update_value

    def record_outbound_email(self, owner_id: str, message: dict) -> None:
        self.outbound_emails.setdefault(owner_id, []).append(deepcopy(message))


db = InMemoryDatabase()