Spaces:
Sleeping
Sleeping
File size: 6,081 Bytes
83fe4f9 426e9e3 83fe4f9 426e9e3 83fe4f9 f0c7697 83fe4f9 426e9e3 83fe4f9 f0c7697 83fe4f9 b032b2d 83fe4f9 426e9e3 83fe4f9 426e9e3 83fe4f9 426e9e3 83fe4f9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | from __future__ import annotations
from copy import deepcopy
from datetime import datetime, timezone
from uuid import uuid4
class InMemoryDatabase:
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self.owners: dict[str, dict] = {}
self.workflows: dict[str, list[dict]] = {}
self.data_files: dict[str, list[dict]] = {}
self.execution_logs: dict[str, list[dict]] = {}
self.escalations: dict[str, list[dict]] = {}
self.sheets: dict[str, dict[str, list[dict]]] = {}
self.outbound_emails: dict[str, list[dict]] = {}
def ensure_owner(self, owner_id: str, email: str) -> dict:
owner = self.owners.get(owner_id)
if owner:
owner["email"] = email
return owner
owner = {
"id": owner_id,
"email": email,
"business_description": "",
"business_analysis": {},
"spreadsheet_id": "demo-sheet",
"spreadsheet_config": {"connected": True, "inventory_sheet": "Inventory", "orders_sheet": "Orders"},
"preferred_tone": "friendly",
"created_at": datetime.now(timezone.utc).isoformat(),
"state": "onboarding",
}
self.owners[owner_id] = owner
self.sheets[owner_id] = {
"Inventory": [
{"item": "Honeycrisp apples", "stock": 120},
{"item": "Fuji apples", "stock": 90},
],
"Orders": [],
}
return owner
def save_owner(self, owner: dict) -> dict:
self.owners[owner["id"]] = owner
return owner
def get_owner(self, owner_id: str) -> dict:
if owner_id not in self.owners:
raise KeyError(f"owner {owner_id} not found")
return deepcopy(self.owners[owner_id])
def save_workflow(self, owner_id: str, workflow: dict) -> dict:
record = deepcopy(workflow)
record["id"] = str(uuid4())
record["status"] = "active"
self.workflows.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_workflows(self, owner_id: str) -> list[dict]:
return deepcopy(self.workflows.get(owner_id, []))
def deactivate_workflows(self, owner_id: str) -> list[dict]:
workflows = self.workflows.get(owner_id, [])
for workflow in workflows:
workflow["status"] = "inactive"
return deepcopy(workflows)
def get_workflow(self, owner_id: str, workflow_id: str) -> dict:
for workflow in self.workflows.get(owner_id, []):
if workflow["id"] == workflow_id:
return deepcopy(workflow)
raise KeyError(f"workflow {workflow_id} not found")
def save_data_file(self, owner_id: str, filename: str, file_type: str, purpose: str, parsed_data: dict) -> dict:
record = {
"id": str(uuid4()),
"filename": filename,
"file_type": file_type,
"purpose": purpose,
"parsed_data": parsed_data,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
}
self.data_files.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_data_files(self, owner_id: str) -> list[dict]:
return deepcopy(self.data_files.get(owner_id, []))
def save_execution_log(self, owner_id: str, workflow_id: str, trigger_data: dict, steps_executed: list, outcome: str, error_message: str | None) -> dict:
record = {
"id": str(uuid4()),
"workflow_id": workflow_id,
"trigger_data": trigger_data,
"steps_executed": steps_executed,
"outcome": outcome,
"error_message": error_message,
"executed_at": datetime.now(timezone.utc).isoformat(),
}
self.execution_logs.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_execution_logs(self, owner_id: str) -> list[dict]:
return deepcopy(self.execution_logs.get(owner_id, []))
def create_escalation(self, owner_id: str, workflow_id: str, execution_id: str, payload: dict) -> dict:
record = {
"id": str(uuid4()),
"workflow_id": workflow_id,
"execution_id": execution_id,
"reason": payload["message"],
"context": payload,
"options": payload.get("options", []),
"owner_response": None,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}
self.escalations.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_escalations(self, owner_id: str) -> list[dict]:
return deepcopy(self.escalations.get(owner_id, []))
def resolve_escalation(self, escalation_id: str, response: str) -> dict:
for owner_escalations in self.escalations.values():
for item in owner_escalations:
if item["id"] == escalation_id:
item["owner_response"] = response
item["status"] = "resolved"
return deepcopy(item)
raise KeyError(f"escalation {escalation_id} not found")
def read_sheet(self, owner_id: str, sheet_name: str) -> list[dict]:
return deepcopy(self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []))
def append_sheet_row(self, owner_id: str, sheet_name: str, row: dict) -> None:
self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []).append(deepcopy(row))
def update_sheet_row(self, owner_id: str, sheet_name: str, lookup_column: str, lookup_value, update_column: str, update_value) -> None:
for row in self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []):
if row.get(lookup_column) == lookup_value:
row[update_column] = update_value
def record_outbound_email(self, owner_id: str, message: dict) -> None:
self.outbound_emails.setdefault(owner_id, []).append(deepcopy(message))
db = InMemoryDatabase()
|