Spaces:
Running
Running
File size: 6,636 Bytes
11b9749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """SQLite storage layer.
One table, `jobs`. Amounts are stored as plain numbers; balance owing is always
derived (`amount_charged - amount_paid`) so there is a single source of truth and
partial payments can never drift out of sync.
"""
from __future__ import annotations
import json
import sqlite3
from datetime import datetime
from typing import Any, Optional
from config import DB_PATH
_SCHEMA = """
CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
customer TEXT NOT NULL,
services TEXT NOT NULL DEFAULT '[]', -- JSON array of strings
hours REAL,
amount_charged REAL NOT NULL DEFAULT 0,
amount_paid REAL NOT NULL DEFAULT 0,
payment_method TEXT, -- cash | card | transfer | ''
next_appointment TEXT,
supplies TEXT NOT NULL DEFAULT '[]', -- JSON array of strings
notes TEXT,
raw_transcript TEXT
);
CREATE INDEX IF NOT EXISTS idx_jobs_created ON jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_jobs_customer ON jobs(customer);
CREATE TABLE IF NOT EXISTS bought_supplies (
name TEXT PRIMARY KEY COLLATE NOCASE,
bought_at TEXT NOT NULL
);
"""
def get_conn() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
with get_conn() as conn:
conn.executescript(_SCHEMA)
def _round(x: Any) -> float:
try:
return round(float(x or 0), 2)
except (TypeError, ValueError):
return 0.0
def _row_to_dict(row: sqlite3.Row) -> dict:
d = dict(row)
d["services"] = json.loads(d.get("services") or "[]")
d["supplies"] = json.loads(d.get("supplies") or "[]")
d["amount_charged"] = _round(d.get("amount_charged"))
d["amount_paid"] = _round(d.get("amount_paid"))
d["balance"] = _round(d["amount_charged"] - d["amount_paid"])
if d["balance"] <= 0:
d["payment_status"] = "paid"
elif d["amount_paid"] > 0:
d["payment_status"] = "partial"
else:
d["payment_status"] = "unpaid"
return d
def add_job(job: dict) -> int:
"""Insert one job. `job` is a normalized dict (see extractor.JOB_FIELDS)."""
init_db()
with get_conn() as conn:
cur = conn.execute(
"""
INSERT INTO jobs
(created_at, customer, services, hours, amount_charged, amount_paid,
payment_method, next_appointment, supplies, notes, raw_transcript)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
job.get("created_at") or datetime.now().isoformat(timespec="seconds"),
(job.get("customer") or "Customer").strip(),
json.dumps(job.get("services") or []),
job.get("hours"),
_round(job.get("amount_charged")),
_round(job.get("amount_paid")),
(job.get("payment_method") or "").strip(),
(job.get("next_appointment") or "").strip() or None,
json.dumps(job.get("supplies") or []),
(job.get("notes") or "").strip() or None,
job.get("raw_transcript"),
),
)
return int(cur.lastrowid)
def list_jobs(limit: int = 100) -> list[dict]:
with get_conn() as conn:
rows = conn.execute(
"SELECT * FROM jobs ORDER BY created_at DESC, id DESC LIMIT ?", (limit,)
).fetchall()
return [_row_to_dict(r) for r in rows]
def get_outstanding() -> list[dict]:
"""Every job not fully paid — the 'Who owes me?' view, most owed first."""
with get_conn() as conn:
rows = conn.execute(
"""
SELECT * FROM jobs
WHERE (amount_charged - amount_paid) > 0.005
ORDER BY (amount_charged - amount_paid) DESC, created_at ASC
"""
).fetchall()
return [_row_to_dict(r) for r in rows]
def mark_paid(job_id: int) -> None:
"""Settle a job: set amount_paid = amount_charged."""
with get_conn() as conn:
conn.execute(
"UPDATE jobs SET amount_paid = amount_charged WHERE id = ?", (job_id,)
)
def get_earnings() -> dict:
"""Charged / collected / outstanding for today, this week, this month."""
init_db()
periods = {
"today": "date(created_at) = date('now', 'localtime')",
"week": "date(created_at) >= date('now', 'localtime', 'weekday 0', '-6 days')",
"month": "strftime('%Y-%m', created_at) = strftime('%Y-%m', 'now', 'localtime')",
}
out: dict[str, dict] = {}
with get_conn() as conn:
for label, where in periods.items():
row = conn.execute(
f"""
SELECT
COALESCE(SUM(amount_charged), 0) AS charged,
COALESCE(SUM(amount_paid), 0) AS collected,
COUNT(*) AS jobs
FROM jobs WHERE {where}
"""
).fetchone()
charged, collected = _round(row["charged"]), _round(row["collected"])
out[label] = {
"charged": charged,
"collected": collected,
"outstanding": _round(charged - collected),
"jobs": int(row["jobs"]),
}
return out
def get_open_supplies(limit_jobs: int = 50) -> list[str]:
"""De-duplicated supplies across recent jobs, excluding ones marked bought."""
init_db()
with get_conn() as conn:
bought = {r["name"].lower() for r in conn.execute("SELECT name FROM bought_supplies")}
seen: dict[str, None] = {}
for job in list_jobs(limit_jobs):
for item in job.get("supplies") or []:
key = item.strip()
low = key.lower()
if key and low not in bought and low not in {k.lower() for k in seen}:
seen[key] = None
return list(seen.keys())
def mark_supplies_bought(names) -> None:
"""Cross items off the shopping list (persisted)."""
rows = [(str(n).strip(), datetime.now().isoformat(timespec="seconds"))
for n in (names or []) if str(n).strip()]
if not rows:
return
init_db()
with get_conn() as conn:
conn.executemany(
"INSERT OR IGNORE INTO bought_supplies (name, bought_at) VALUES (?, ?)", rows
)
def reset_bought_supplies() -> None:
"""Clear all crossed-off items, bringing the full list back."""
init_db()
with get_conn() as conn:
conn.execute("DELETE FROM bought_supplies")
|