home-kitchen-admin / database.py
Nguyen Minh Nhat
After-Shift Admin Assistant
11b9749
Raw
History Blame Contribute Delete
6.64 kB
"""SQLite storage layer.
One table, `jobs`. Amounts are stored as plain numbers; balance owing is always
derived (`amount_charged - amount_paid`) so there is a single source of truth and
partial payments can never drift out of sync.
"""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime
from typing import Any, Optional
from config import DB_PATH
_SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
customer TEXT NOT NULL,
services TEXT NOT NULL DEFAULT '[]', -- JSON array of strings
hours REAL,
amount_charged REAL NOT NULL DEFAULT 0,
amount_paid REAL NOT NULL DEFAULT 0,
payment_method TEXT, -- cash | card | transfer | ''
next_appointment TEXT,
supplies TEXT NOT NULL DEFAULT '[]', -- JSON array of strings
notes TEXT,
raw_transcript TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_created ON jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_jobs_customer ON jobs(customer);
CREATE TABLE IF NOT EXISTS bought_supplies (
name TEXT PRIMARY KEY COLLATE NOCASE,
bought_at TEXT NOT NULL
);
"""
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
with get_conn() as conn:
conn.executescript(_SCHEMA)
def _round(x: Any) -> float:
try:
return round(float(x or 0), 2)
except (TypeError, ValueError):
return 0.0
def _row_to_dict(row: sqlite3.Row) -> dict:
d = dict(row)
d["services"] = json.loads(d.get("services") or "[]")
d["supplies"] = json.loads(d.get("supplies") or "[]")
d["amount_charged"] = _round(d.get("amount_charged"))
d["amount_paid"] = _round(d.get("amount_paid"))
d["balance"] = _round(d["amount_charged"] - d["amount_paid"])
if d["balance"] <= 0:
d["payment_status"] = "paid"
elif d["amount_paid"] > 0:
d["payment_status"] = "partial"
else:
d["payment_status"] = "unpaid"
return d
def add_job(job: dict) -> int:
"""Insert one job. `job` is a normalized dict (see extractor.JOB_FIELDS)."""
init_db()
with get_conn() as conn:
cur = conn.execute(
"""
INSERT INTO jobs
(created_at, customer, services, hours, amount_charged, amount_paid,
payment_method, next_appointment, supplies, notes, raw_transcript)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
job.get("created_at") or datetime.now().isoformat(timespec="seconds"),
(job.get("customer") or "Customer").strip(),
json.dumps(job.get("services") or []),
job.get("hours"),
_round(job.get("amount_charged")),
_round(job.get("amount_paid")),
(job.get("payment_method") or "").strip(),
(job.get("next_appointment") or "").strip() or None,
json.dumps(job.get("supplies") or []),
(job.get("notes") or "").strip() or None,
job.get("raw_transcript"),
),
)
return int(cur.lastrowid)
def list_jobs(limit: int = 100) -> list[dict]:
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM jobs ORDER BY created_at DESC, id DESC LIMIT ?", (limit,)
).fetchall()
return [_row_to_dict(r) for r in rows]
def get_outstanding() -> list[dict]:
"""Every job not fully paid — the 'Who owes me?' view, most owed first."""
with get_conn() as conn:
rows = conn.execute(
"""
SELECT * FROM jobs
WHERE (amount_charged - amount_paid) > 0.005
ORDER BY (amount_charged - amount_paid) DESC, created_at ASC
"""
).fetchall()
return [_row_to_dict(r) for r in rows]
def mark_paid(job_id: int) -> None:
"""Settle a job: set amount_paid = amount_charged."""
with get_conn() as conn:
conn.execute(
"UPDATE jobs SET amount_paid = amount_charged WHERE id = ?", (job_id,)
)
def get_earnings() -> dict:
"""Charged / collected / outstanding for today, this week, this month."""
init_db()
periods = {
"today": "date(created_at) = date('now', 'localtime')",
"week": "date(created_at) >= date('now', 'localtime', 'weekday 0', '-6 days')",
"month": "strftime('%Y-%m', created_at) = strftime('%Y-%m', 'now', 'localtime')",
}
out: dict[str, dict] = {}
with get_conn() as conn:
for label, where in periods.items():
row = conn.execute(
f"""
SELECT
COALESCE(SUM(amount_charged), 0) AS charged,
COALESCE(SUM(amount_paid), 0) AS collected,
COUNT(*) AS jobs
FROM jobs WHERE {where}
"""
).fetchone()
charged, collected = _round(row["charged"]), _round(row["collected"])
out[label] = {
"charged": charged,
"collected": collected,
"outstanding": _round(charged - collected),
"jobs": int(row["jobs"]),
}
return out
def get_open_supplies(limit_jobs: int = 50) -> list[str]:
"""De-duplicated supplies across recent jobs, excluding ones marked bought."""
init_db()
with get_conn() as conn:
bought = {r["name"].lower() for r in conn.execute("SELECT name FROM bought_supplies")}
seen: dict[str, None] = {}
for job in list_jobs(limit_jobs):
for item in job.get("supplies") or []:
key = item.strip()
low = key.lower()
if key and low not in bought and low not in {k.lower() for k in seen}:
seen[key] = None
return list(seen.keys())
def mark_supplies_bought(names) -> None:
"""Cross items off the shopping list (persisted)."""
rows = [(str(n).strip(), datetime.now().isoformat(timespec="seconds"))
for n in (names or []) if str(n).strip()]
if not rows:
return
init_db()
with get_conn() as conn:
conn.executemany(
"INSERT OR IGNORE INTO bought_supplies (name, bought_at) VALUES (?, ?)", rows
)
def reset_bought_supplies() -> None:
"""Clear all crossed-off items, bringing the full list back."""
init_db()
with get_conn() as conn:
conn.execute("DELETE FROM bought_supplies")