"""SQLite storage layer. One table, `jobs`. Amounts are stored as plain numbers; balance owing is always derived (`amount_charged - amount_paid`) so there is a single source of truth and partial payments can never drift out of sync. """ from __future__ import annotations import json import sqlite3 from datetime import datetime from typing import Any, Optional from config import DB_PATH _SCHEMA = """ CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, customer TEXT NOT NULL, services TEXT NOT NULL DEFAULT '[]', -- JSON array of strings hours REAL, amount_charged REAL NOT NULL DEFAULT 0, amount_paid REAL NOT NULL DEFAULT 0, payment_method TEXT, -- cash | card | transfer | '' next_appointment TEXT, supplies TEXT NOT NULL DEFAULT '[]', -- JSON array of strings notes TEXT, raw_transcript TEXT ); CREATE INDEX IF NOT EXISTS idx_jobs_created ON jobs(created_at); CREATE INDEX IF NOT EXISTS idx_jobs_customer ON jobs(customer); CREATE TABLE IF NOT EXISTS bought_supplies ( name TEXT PRIMARY KEY COLLATE NOCASE, bought_at TEXT NOT NULL ); """ def get_conn() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: with get_conn() as conn: conn.executescript(_SCHEMA) def _round(x: Any) -> float: try: return round(float(x or 0), 2) except (TypeError, ValueError): return 0.0 def _row_to_dict(row: sqlite3.Row) -> dict: d = dict(row) d["services"] = json.loads(d.get("services") or "[]") d["supplies"] = json.loads(d.get("supplies") or "[]") d["amount_charged"] = _round(d.get("amount_charged")) d["amount_paid"] = _round(d.get("amount_paid")) d["balance"] = _round(d["amount_charged"] - d["amount_paid"]) if d["balance"] <= 0: d["payment_status"] = "paid" elif d["amount_paid"] > 0: d["payment_status"] = "partial" else: d["payment_status"] = "unpaid" return d def add_job(job: dict) -> int: """Insert one job. `job` is a normalized dict (see extractor.JOB_FIELDS).""" init_db() with get_conn() as conn: cur = conn.execute( """ INSERT INTO jobs (created_at, customer, services, hours, amount_charged, amount_paid, payment_method, next_appointment, supplies, notes, raw_transcript) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( job.get("created_at") or datetime.now().isoformat(timespec="seconds"), (job.get("customer") or "Customer").strip(), json.dumps(job.get("services") or []), job.get("hours"), _round(job.get("amount_charged")), _round(job.get("amount_paid")), (job.get("payment_method") or "").strip(), (job.get("next_appointment") or "").strip() or None, json.dumps(job.get("supplies") or []), (job.get("notes") or "").strip() or None, job.get("raw_transcript"), ), ) return int(cur.lastrowid) def list_jobs(limit: int = 100) -> list[dict]: with get_conn() as conn: rows = conn.execute( "SELECT * FROM jobs ORDER BY created_at DESC, id DESC LIMIT ?", (limit,) ).fetchall() return [_row_to_dict(r) for r in rows] def get_outstanding() -> list[dict]: """Every job not fully paid — the 'Who owes me?' view, most owed first.""" with get_conn() as conn: rows = conn.execute( """ SELECT * FROM jobs WHERE (amount_charged - amount_paid) > 0.005 ORDER BY (amount_charged - amount_paid) DESC, created_at ASC """ ).fetchall() return [_row_to_dict(r) for r in rows] def mark_paid(job_id: int) -> None: """Settle a job: set amount_paid = amount_charged.""" with get_conn() as conn: conn.execute( "UPDATE jobs SET amount_paid = amount_charged WHERE id = ?", (job_id,) ) def get_earnings() -> dict: """Charged / collected / outstanding for today, this week, this month.""" init_db() periods = { "today": "date(created_at) = date('now', 'localtime')", "week": "date(created_at) >= date('now', 'localtime', 'weekday 0', '-6 days')", "month": "strftime('%Y-%m', created_at) = strftime('%Y-%m', 'now', 'localtime')", } out: dict[str, dict] = {} with get_conn() as conn: for label, where in periods.items(): row = conn.execute( f""" SELECT COALESCE(SUM(amount_charged), 0) AS charged, COALESCE(SUM(amount_paid), 0) AS collected, COUNT(*) AS jobs FROM jobs WHERE {where} """ ).fetchone() charged, collected = _round(row["charged"]), _round(row["collected"]) out[label] = { "charged": charged, "collected": collected, "outstanding": _round(charged - collected), "jobs": int(row["jobs"]), } return out def get_open_supplies(limit_jobs: int = 50) -> list[str]: """De-duplicated supplies across recent jobs, excluding ones marked bought.""" init_db() with get_conn() as conn: bought = {r["name"].lower() for r in conn.execute("SELECT name FROM bought_supplies")} seen: dict[str, None] = {} for job in list_jobs(limit_jobs): for item in job.get("supplies") or []: key = item.strip() low = key.lower() if key and low not in bought and low not in {k.lower() for k in seen}: seen[key] = None return list(seen.keys()) def mark_supplies_bought(names) -> None: """Cross items off the shopping list (persisted).""" rows = [(str(n).strip(), datetime.now().isoformat(timespec="seconds")) for n in (names or []) if str(n).strip()] if not rows: return init_db() with get_conn() as conn: conn.executemany( "INSERT OR IGNORE INTO bought_supplies (name, bought_at) VALUES (?, ?)", rows ) def reset_bought_supplies() -> None: """Clear all crossed-off items, bringing the full list back.""" init_db() with get_conn() as conn: conn.execute("DELETE FROM bought_supplies")