flow-pilot / backend /storage /database.py
DevelopedBy-Siva
change Ui
b032b2d
from __future__ import annotations
from copy import deepcopy
from datetime import datetime, timezone
from uuid import uuid4
class InMemoryDatabase:
def __init__(self) -> None:
self.reset()
def reset(self) -> None:
self.owners: dict[str, dict] = {}
self.workflows: dict[str, list[dict]] = {}
self.data_files: dict[str, list[dict]] = {}
self.execution_logs: dict[str, list[dict]] = {}
self.escalations: dict[str, list[dict]] = {}
self.sheets: dict[str, dict[str, list[dict]]] = {}
self.outbound_emails: dict[str, list[dict]] = {}
def ensure_owner(self, owner_id: str, email: str) -> dict:
owner = self.owners.get(owner_id)
if owner:
owner["email"] = email
return owner
owner = {
"id": owner_id,
"email": email,
"business_description": "",
"business_analysis": {},
"spreadsheet_id": "demo-sheet",
"spreadsheet_config": {"connected": True, "inventory_sheet": "Inventory", "orders_sheet": "Orders"},
"preferred_tone": "friendly",
"created_at": datetime.now(timezone.utc).isoformat(),
"state": "onboarding",
}
self.owners[owner_id] = owner
self.sheets[owner_id] = {
"Inventory": [
{"item": "Honeycrisp apples", "stock": 120},
{"item": "Fuji apples", "stock": 90},
],
"Orders": [],
}
return owner
def save_owner(self, owner: dict) -> dict:
self.owners[owner["id"]] = owner
return owner
def get_owner(self, owner_id: str) -> dict:
if owner_id not in self.owners:
raise KeyError(f"owner {owner_id} not found")
return deepcopy(self.owners[owner_id])
def save_workflow(self, owner_id: str, workflow: dict) -> dict:
record = deepcopy(workflow)
record["id"] = str(uuid4())
record["status"] = "active"
self.workflows.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_workflows(self, owner_id: str) -> list[dict]:
return deepcopy(self.workflows.get(owner_id, []))
def deactivate_workflows(self, owner_id: str) -> list[dict]:
workflows = self.workflows.get(owner_id, [])
for workflow in workflows:
workflow["status"] = "inactive"
return deepcopy(workflows)
def get_workflow(self, owner_id: str, workflow_id: str) -> dict:
for workflow in self.workflows.get(owner_id, []):
if workflow["id"] == workflow_id:
return deepcopy(workflow)
raise KeyError(f"workflow {workflow_id} not found")
def save_data_file(self, owner_id: str, filename: str, file_type: str, purpose: str, parsed_data: dict) -> dict:
record = {
"id": str(uuid4()),
"filename": filename,
"file_type": file_type,
"purpose": purpose,
"parsed_data": parsed_data,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
}
self.data_files.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_data_files(self, owner_id: str) -> list[dict]:
return deepcopy(self.data_files.get(owner_id, []))
def save_execution_log(self, owner_id: str, workflow_id: str, trigger_data: dict, steps_executed: list, outcome: str, error_message: str | None) -> dict:
record = {
"id": str(uuid4()),
"workflow_id": workflow_id,
"trigger_data": trigger_data,
"steps_executed": steps_executed,
"outcome": outcome,
"error_message": error_message,
"executed_at": datetime.now(timezone.utc).isoformat(),
}
self.execution_logs.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_execution_logs(self, owner_id: str) -> list[dict]:
return deepcopy(self.execution_logs.get(owner_id, []))
def create_escalation(self, owner_id: str, workflow_id: str, execution_id: str, payload: dict) -> dict:
record = {
"id": str(uuid4()),
"workflow_id": workflow_id,
"execution_id": execution_id,
"reason": payload["message"],
"context": payload,
"options": payload.get("options", []),
"owner_response": None,
"status": "pending",
"created_at": datetime.now(timezone.utc).isoformat(),
}
self.escalations.setdefault(owner_id, []).append(record)
return deepcopy(record)
def list_escalations(self, owner_id: str) -> list[dict]:
return deepcopy(self.escalations.get(owner_id, []))
def resolve_escalation(self, escalation_id: str, response: str) -> dict:
for owner_escalations in self.escalations.values():
for item in owner_escalations:
if item["id"] == escalation_id:
item["owner_response"] = response
item["status"] = "resolved"
return deepcopy(item)
raise KeyError(f"escalation {escalation_id} not found")
def read_sheet(self, owner_id: str, sheet_name: str) -> list[dict]:
return deepcopy(self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []))
def append_sheet_row(self, owner_id: str, sheet_name: str, row: dict) -> None:
self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []).append(deepcopy(row))
def update_sheet_row(self, owner_id: str, sheet_name: str, lookup_column: str, lookup_value, update_column: str, update_value) -> None:
for row in self.sheets.setdefault(owner_id, {}).setdefault(sheet_name, []):
if row.get(lookup_column) == lookup_value:
row[update_column] = update_value
def record_outbound_email(self, owner_id: str, message: dict) -> None:
self.outbound_emails.setdefault(owner_id, []).append(deepcopy(message))
db = InMemoryDatabase()