from __future__ import annotations from copy import deepcopy from datetime import datetime, timezone from uuid import uuid4 class InMemoryDatabase: def __init__(self) -> None: self.reset() def reset(self) -> None: self.owners: dict[str, dict] = {} self.workflows: dict[str, list[dict]] = {} self.data_files: dict[str, list[dict]] = {} self.execution_logs: dict[str, list[dict]] = {} self.escalations: dict[str, list[dict]] = {} self.sheets: dict[str, dict[str, list[dict]]] = {} self.outbound_emails: dict[str, list[dict]] = {} def ensure_owner(self, owner_id: str, email: str) -> dict: owner = self.owners.get(owner_id) if owner: owner["email"] = email return owner owner = { "id": owner_id, "email": email, "business_description": "", "business_analysis": {}, "spreadsheet_id": "demo-sheet", "spreadsheet_config": {"connected": True, "inventory_sheet": "Inventory", "orders_sheet": "Orders"}, "preferred_tone": "friendly", "created_at": datetime.now(timezone.utc).isoformat(), "state": "onboarding", } self.owners[owner_id] = owner self.sheets[owner_id] = { "Inventory": [ {"item": "Honeycrisp apples", "stock": 120}, {"item": "Fuji apples", "stock": 90}, ], "Orders": [], } return owner def save_owner(self, owner: dict) -> dict: self.owners[owner["id"]] = owner return owner def get_owner(self, owner_id: str) -> dict: if owner_id not in self.owners: raise KeyError(f"owner {owner_id} not found") return deepcopy(self.owners[owner_id]) def save_workflow(self, owner_id: str, workflow: dict) -> dict: record = deepcopy(workflow) record["id"] = str(uuid4()) record["status"] = "active" self.workflows.setdefault(owner_id, []).append(record) return deepcopy(record) def list_workflows(self, owner_id: str) -> list[dict]: return deepcopy(self.workflows.get(owner_id, [])) def deactivate_workflows(self, owner_id: str) -> list[dict]: workflows = self.workflows.get(owner_id, []) for workflow in workflows: workflow["status"] = "inactive" return deepcopy(workflows) def get_workflow(self, owner_id: str, workflow_id: str) -> dict: for workflow in self.workflows.get(owner_id, []): if workflow["id"] == workflow_id: return deepcopy(workflow) raise KeyError(f"workflow {workflow_id} not found") def save_data_file(self, owner_id: str, filename: str, file_type: str, purpose: str, parsed_data: dict) -> dict: record = { "id": str(uuid4()), "filename": filename, "file_type": file_type, "purpose": purpose, "parsed_data": parsed_data, "uploaded_at": datetime.now(timezone.utc).isoformat(), } self.data_files.setdefault(owner_id, []).append(record) return deepcopy(record) def list_data_files(self, owner_id: str) -> list[dict]: return deepcopy(self.data_files.get(owner_id, [])) def save_execution_log(self, owner_id: str, workflow_id: str, trigger_data: dict, steps_executed: list, outcome: str, error_message: str | None) -> dict: record = { "id": str(uuid4()), "workflow_id": workflow_id, "trigger_data": trigger_data, "steps_executed": steps_executed, "outcome": outcome, "error_message": error_message, "executed_at": datetime.now(timezone.utc).isoformat(), } self.execution_logs.setdefault(owner_id, []).append(record) return deepcopy(record) def list_execution_logs(self, owner_id: str) -> list[dict]: return deepcopy(self.execution_logs.get(owner_id, [])) def create_escalation(self, owner_id: str, workflow_id: str, execution_id: str, payload: dict) -> dict: record = { "id": str(uuid4()), "workflow_id": workflow_id, "execution_id": execution_id, "reason": payload["message"], "context": payload, "options": payload.get("options", []), "owner_response": None, "status": "pending", "created_at": datetime.now(timezone.utc).isoformat(), } self.escalations.setdefault(owner_id, []).append(record) return deepcopy(record) def list_escalations(self, owner_id: str) -> list[dict]: return deepcopy(self.escalations.get(owner_id, [])) def resolve_escalation(self, escalation_id: str, response: str) -> dict: for owner_escalations in self.escalations.values(): for item in owner_escalations: if item["id"] == escalation_id: item["owner_response"] = response item["status"] = "resolved" return deepcopy(item) raise KeyError(f"escalation {escalation_id} not found") def read_sheet(self, owner_id: str, sheet_name: str) -> list[dict]: return deepcopy(self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, [])) def append_sheet_row(self, owner_id: str, sheet_name: str, row: dict) -> None: self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []).append(deepcopy(row)) def update_sheet_row(self, owner_id: str, sheet_name: str, lookup_column: str, lookup_value, update_column: str, update_value) -> None: for row in self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []): if row.get(lookup_column) == lookup_value: row[update_column] = update_value def record_outbound_email(self, owner_id: str, message: dict) -> None: self.outbound_emails.setdefault(owner_id, []).append(deepcopy(message)) db = InMemoryDatabase()