Spaces:
Sleeping
Sleeping
| import base64 | |
| import binascii | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import uuid | |
| from datetime import date, datetime, timedelta, timezone | |
| from decimal import Decimal, ROUND_HALF_UP, getcontext | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from flask import Flask, jsonify, request, send_from_directory | |
| from db import ( | |
| create_account, | |
| execute, | |
| fetch_all, | |
| fetch_one, | |
| fetch_business_logo, | |
| insert_invoice, | |
| search_clients, | |
| update_business, | |
| update_business_logo, | |
| upsert_client, | |
| ) | |
| APP_ROOT = Path(__file__).parent.resolve() | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) | |
| DATA_FILE = DATA_DIR / "web_invoice_store.json" | |
| INVOICE_HISTORY_LIMIT = 200 | |
| MAX_LOGO_SIZE = 512 * 1024 # 512 KB | |
| TOKEN_TTL = timedelta(hours=12) | |
| ALLOWED_LOGO_MIME_TYPES = {"image/png", "image/jpeg"} | |
| DATABASE_AVAILABLE = bool(os.environ.get("NEON_DATABASE_URL")) | |
| VAT_RATES: Dict[str, Optional[Decimal]] = { | |
| "23": Decimal("0.23"), | |
| "8": Decimal("0.08"), | |
| "5": Decimal("0.05"), | |
| "0": Decimal("0.00"), | |
| "ZW": None, | |
| "NP": None, | |
| } | |
| DEFAULT_UNIT = "szt." | |
| ALLOWED_UNITS = {"szt.", "godz."} | |
| PASSWORD_MIN_LENGTH = 4 | |
| SESSION_TOKENS: Dict[str, Dict[str, Any]] = {} | |
| ALLOWED_STATIC = { | |
| "index.html", | |
| "styles.css", | |
| "main.js", | |
| "favicon.ico", | |
| "Roboto-VariableFont_wdth,wght.ttf", | |
| } | |
| app = Flask(__name__, static_folder=str(APP_ROOT), static_url_path="") | |
| getcontext().prec = 10 | |
| def _quantize(value: Decimal) -> Decimal: | |
| return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) | |
| def _decimal(value: Any) -> Decimal: | |
| try: | |
| return Decimal(str(value)) | |
| except Exception as error: # pragma: no cover - defensive | |
| raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error | |
| def _format_decimal_str(value: Any, default: str = "0.00") -> str: | |
| if value in (None, ""): | |
| return default | |
| if isinstance(value, Decimal): | |
| return str(_quantize(value)) | |
| try: | |
| return str(_quantize(Decimal(str(value)))) | |
| except Exception: | |
| return str(value) | |
| def hash_password(password: str) -> str: | |
| return hashlib.sha256(password.encode("utf-8")).hexdigest() | |
| EMAIL_PATTERN = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$") | |
| def normalize_email(raw_email: str) -> Tuple[str, str]: | |
| display_email = (raw_email or "").strip() | |
| if not display_email: | |
| raise ValueError("Email nie moze byc pusty.") | |
| if not EMAIL_PATTERN.fullmatch(display_email): | |
| raise ValueError("Podaj poprawny adres email.") | |
| return display_email.lower(), display_email | |
| def sanitize_filename(filename: Optional[str]) -> str: | |
| if not filename: | |
| return "logo" | |
| name = str(filename).split("/")[-1].split("\\")[-1] | |
| sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._") | |
| return sanitized or "logo" | |
| def find_account_identifier(accounts: Dict[str, Any], identifier: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: | |
| key = (identifier or "").strip().lower() | |
| if not key: | |
| return None, None | |
| account = accounts.get(key) | |
| if account: | |
| return key, account | |
| for login_key, candidate in accounts.items(): | |
| candidate_login = (candidate.get("login") or "").strip().lower() | |
| candidate_email = (candidate.get("email") or "").strip().lower() | |
| if key in {candidate_login, candidate_email}: | |
| return login_key, candidate | |
| return None, None | |
| def load_store() -> Dict[str, Any]: | |
| if not DATA_FILE.exists(): | |
| return {"accounts": {}} | |
| try: | |
| with DATA_FILE.open("r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| except json.JSONDecodeError: | |
| raise ValueError("Plik z danymi jest uszkodzony.") | |
| return data | |
| def save_store(data: Dict[str, Any]) -> None: | |
| DATA_DIR.mkdir(parents=True, exist_ok=True) | |
| tmp_path = DATA_FILE.with_suffix(".tmp") | |
| with tmp_path.open("w", encoding="utf-8") as handle: | |
| json.dump(data, handle, ensure_ascii=False, indent=2) | |
| tmp_path.replace(DATA_FILE) | |
| def get_account(data: Dict[str, Any], login_key: str) -> Dict[str, Any]: | |
| accounts = data.get("accounts") or {} | |
| account = accounts.get(login_key) | |
| if not account: | |
| raise KeyError("Nie znaleziono konta.") | |
| return account | |
| def get_account_row(login_key: str) -> Dict[str, Any]: | |
| row = fetch_one("SELECT id, login FROM accounts WHERE login = %s", (login_key,)) | |
| if not row: | |
| raise KeyError("Nie znaleziono konta.") | |
| return row | |
| def get_business_profile(account_id: int) -> Optional[Dict[str, Any]]: | |
| return fetch_one( | |
| """ | |
| SELECT company_name, owner_name, address_line, postal_code, | |
| city, tax_id, bank_account | |
| FROM business_profiles | |
| WHERE account_id = %s | |
| """, | |
| (account_id,), | |
| ) | |
| def require_auth() -> str: | |
| auth_header = request.headers.get("Authorization") | |
| if not auth_header or not auth_header.startswith("Bearer "): | |
| raise PermissionError("Brak tokenu.") | |
| token = auth_header.split(" ", 1)[1].strip() | |
| session = SESSION_TOKENS.get(token) | |
| if not session: | |
| raise PermissionError("Nieprawidlowy token.") | |
| if session["expires_at"] < datetime.utcnow(): | |
| SESSION_TOKENS.pop(token, None) | |
| raise PermissionError("Token wygasl.") | |
| return session["login_key"] | |
| def index() -> Any: | |
| return send_from_directory(app.static_folder, "index.html") | |
| def static_files(filename: str) -> Any: | |
| if filename not in ALLOWED_STATIC: | |
| return jsonify({"error": "Nie ma takiego zasobu."}), 404 | |
| return send_from_directory(app.static_folder, filename) | |
| def api_register() -> Any: | |
| payload = request.get_json(force=True) | |
| email = payload.get("email") | |
| password = payload.get("password") | |
| confirm = payload.get("confirm_password") | |
| business_fields = [ | |
| "company_name", | |
| "owner_name", | |
| "address_line", | |
| "postal_code", | |
| "city", | |
| "tax_id", | |
| "bank_account", | |
| ] | |
| business_data: Dict[str, str] = {} | |
| for field in business_fields: | |
| value = (payload.get(field) or "").strip() | |
| if not value: | |
| return jsonify({"error": f"Pole {field} jest wymagane."}), 400 | |
| business_data[field] = value | |
| if password != confirm: | |
| return jsonify({"error": "Hasla musza byc identyczne."}), 400 | |
| if len(password or "") < PASSWORD_MIN_LENGTH: | |
| return jsonify({"error": "Haslo jest za krotkie."}), 400 | |
| login_key, display_email = normalize_email(email) | |
| password_hash = hash_password(password) | |
| if DATABASE_AVAILABLE: | |
| if fetch_one("SELECT 1 FROM accounts WHERE login = %s", (login_key,)): | |
| return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400 | |
| account_id = create_account(login_key, display_email, password_hash) | |
| update_business(account_id, business_data) | |
| return jsonify({"message": "Konto zostalo utworzone."}) | |
| data = load_store() | |
| accounts = data.setdefault("accounts", {}) | |
| if login_key in accounts: | |
| return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400 | |
| accounts[login_key] = { | |
| "login": login_key, | |
| "email": display_email, | |
| "password_hash": password_hash, | |
| "business": business_data, | |
| "invoices": [], | |
| "logo": None, | |
| "created_at": datetime.utcnow().isoformat(timespec="seconds"), | |
| } | |
| save_store(data) | |
| return jsonify({"message": "Konto zostalo utworzone."}) | |
| def api_login() -> Any: | |
| payload = request.get_json(force=True) | |
| identifier = payload.get("identifier") or payload.get("email") | |
| password = payload.get("password") | |
| if not identifier or not password: | |
| return jsonify({"error": "Podaj email/login i haslo."}), 400 | |
| login_key, _ = normalize_email(identifier) | |
| if DATABASE_AVAILABLE: | |
| row = fetch_one( | |
| "SELECT id, password_hash FROM accounts WHERE login = %s", | |
| (login_key,), | |
| ) | |
| if not row or row["password_hash"] != hash_password(password): | |
| return jsonify({"error": "Niepoprawne dane logowania."}), 401 | |
| token = uuid.uuid4().hex | |
| SESSION_TOKENS[token] = { | |
| "login_key": login_key, | |
| "account_id": row["id"], | |
| "expires_at": datetime.utcnow() + TOKEN_TTL, | |
| } | |
| return jsonify({"token": token, "login": login_key}) | |
| data = load_store() | |
| accounts = data.get("accounts") or {} | |
| login_key, account = find_account_identifier(accounts, login_key) | |
| if not account or account.get("password_hash") != hash_password(password): | |
| return jsonify({"error": "Niepoprawne dane logowania."}), 401 | |
| token = uuid.uuid4().hex | |
| SESSION_TOKENS[token] = { | |
| "login_key": login_key, | |
| "expires_at": datetime.utcnow() + TOKEN_TTL, | |
| } | |
| return jsonify({"token": token, "login": account.get("login", login_key)}) | |
| def api_logout() -> Any: | |
| token = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| SESSION_TOKENS.pop(token, None) | |
| return jsonify({"message": "Wylogowano."}) | |
| def api_business() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| account = data.get("accounts", {}).get(login_key) | |
| account_row = None | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| if request.method == "GET": | |
| if DATABASE_AVAILABLE: | |
| profile = fetch_one( | |
| """ | |
| SELECT company_name, owner_name, address_line, postal_code, | |
| city, tax_id, bank_account | |
| FROM business_profiles | |
| WHERE account_id = %s | |
| """, | |
| (account_row["id"],), | |
| ) | |
| return jsonify({"business": profile}) | |
| if not account: | |
| return jsonify({"business": None}) | |
| return jsonify({"business": account.get("business")}) | |
| payload = request.get_json(force=True) | |
| required_fields = [ | |
| "company_name", | |
| "owner_name", | |
| "address_line", | |
| "postal_code", | |
| "city", | |
| "tax_id", | |
| "bank_account", | |
| ] | |
| for field in required_fields: | |
| if not (payload.get(field) or "").strip(): | |
| return jsonify({"error": f"Pole {field} jest wymagane."}), 400 | |
| if DATABASE_AVAILABLE: | |
| update_business(account_row["id"], payload) | |
| return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."}) | |
| if not account: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| account["business"] = payload | |
| save_store(data) | |
| return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."}) | |
| def api_clients() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| if not DATABASE_AVAILABLE: | |
| return jsonify({"clients": []}) | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| search_term = (request.args.get("q") or "").strip() | |
| limit_param = request.args.get("limit", "10") | |
| try: | |
| limit_value = int(limit_param) | |
| except ValueError: | |
| limit_value = 10 | |
| limit_value = max(1, min(25, limit_value)) | |
| clients = search_clients(account_row["id"], search_term, limit_value) | |
| return jsonify({"clients": clients}) | |
| def api_logo() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| account_row = None | |
| account = None | |
| data = None | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| else: | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| if request.method == "GET": | |
| if DATABASE_AVAILABLE: | |
| logo_row = fetch_business_logo(account_row["id"]) | |
| if not logo_row: | |
| return jsonify({"logo": None}) | |
| mime_type = logo_row["mime_type"] | |
| encoded = logo_row["data"] | |
| data_url = f"data:{mime_type};base64,{encoded}" if mime_type and encoded else None | |
| return jsonify( | |
| { | |
| "logo": { | |
| "filename": None, | |
| "mime_type": mime_type, | |
| "data": encoded, | |
| "data_url": data_url, | |
| "uploaded_at": None, | |
| } | |
| } | |
| ) | |
| logo = account.get("logo") if account else None | |
| if not logo: | |
| return jsonify({"logo": None}) | |
| encoded = logo.get("data") | |
| mime_type = logo.get("mime_type") | |
| data_url = None | |
| if encoded and mime_type: | |
| data_url = f"data:{mime_type};base64,{encoded}" | |
| return jsonify( | |
| { | |
| "logo": { | |
| "filename": logo.get("filename"), | |
| "mime_type": mime_type, | |
| "data": encoded, | |
| "data_url": data_url, | |
| "uploaded_at": logo.get("uploaded_at"), | |
| } | |
| } | |
| ) | |
| if request.method == "DELETE": | |
| if DATABASE_AVAILABLE: | |
| update_business_logo(account_row["id"], None, None) | |
| return jsonify({"message": "Logo zostalo usuniete."}) | |
| if not account or data is None: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| account["logo"] = None | |
| save_store(data) | |
| return jsonify({"message": "Logo zostalo usuniete."}) | |
| payload = request.get_json(force=True) | |
| raw_content = (payload.get("content") or payload.get("data") or "").strip() | |
| if not raw_content: | |
| return jsonify({"error": "Brak danych logo."}), 400 | |
| provided_mime = (payload.get("mime_type") or "").strip() | |
| filename = sanitize_filename(payload.get("filename")) | |
| if raw_content.startswith("data:"): | |
| try: | |
| header, encoded_content = raw_content.split(",", 1) | |
| except ValueError: | |
| return jsonify({"error": "Niepoprawny format danych logo."}), 400 | |
| header = header.strip() | |
| if ";base64" not in header: | |
| return jsonify({"error": "Niepoprawny format danych logo (oczekiwano base64)."}), 400 | |
| mime_type = header.split(";")[0].replace("data:", "", 1) or provided_mime | |
| base64_content = encoded_content.strip() | |
| else: | |
| mime_type = provided_mime | |
| base64_content = raw_content | |
| mime_type = (mime_type or "").lower() | |
| if mime_type not in ALLOWED_LOGO_MIME_TYPES: | |
| return jsonify({"error": "Dozwolone formaty logo to PNG lub JPG."}), 400 | |
| try: | |
| logo_bytes = base64.b64decode(base64_content, validate=True) | |
| except (ValueError, binascii.Error): | |
| return jsonify({"error": "Nie udalo sie odczytac danych logo (base64)."}), 400 | |
| if len(logo_bytes) > MAX_LOGO_SIZE: | |
| return jsonify({"error": f"Logo jest zbyt duze (maksymalnie {MAX_LOGO_SIZE // 1024} KB)."}), 400 | |
| encoded_logo = base64.b64encode(logo_bytes).decode("ascii") | |
| stored_logo = { | |
| "filename": filename, | |
| "mime_type": mime_type, | |
| "data": encoded_logo, | |
| "uploaded_at": datetime.utcnow().isoformat(timespec="seconds"), | |
| } | |
| if DATABASE_AVAILABLE: | |
| update_business_logo(account_row["id"], stored_logo["mime_type"], stored_logo["data"]) | |
| return jsonify({"logo": stored_logo}) | |
| account["logo"] = stored_logo | |
| save_store(data) | |
| return jsonify({"logo": stored_logo}) | |
| def normalize_phone(phone: Optional[str]) -> Optional[str]: | |
| if not phone: | |
| return None | |
| digits = re.sub(r"[^\d+]", "", phone) | |
| return digits or None | |
| def validate_client(payload: Dict[str, Any]) -> Dict[str, str]: | |
| client_payload = payload.get("client") or {} | |
| client = { | |
| "name": (client_payload.get("name") or payload.get("clientName") or "").strip(), | |
| "tax_id": (client_payload.get("tax_id") or payload.get("clientTaxId") or "").strip(), | |
| "address_line": (client_payload.get("address_line") or payload.get("clientAddress") or "").strip(), | |
| "postal_code": (client_payload.get("postal_code") or payload.get("clientPostalCode") or "").strip(), | |
| "city": (client_payload.get("city") or payload.get("clientCity") or "").strip(), | |
| "phone": normalize_phone(client_payload.get("phone") or payload.get("clientPhone")), | |
| } | |
| return client | |
| def build_invoice(payload: Dict[str, Any], business: Dict[str, Any], client: Dict[str, str]) -> Dict[str, Any]: | |
| now = datetime.now() | |
| invoice_id = f"FV-{now.strftime('%Y%m%d-%H%M%S')}" | |
| issued_at = now.strftime("%Y-%m-%d %H:%M") | |
| sale_date = payload.get("sale_date") or payload.get("saleDate") or date.today().isoformat() | |
| payment_term = int(payload.get("payment_term") or payload.get("paymentTerm") or 14) | |
| items = payload.get("items") or [] | |
| normalized_items: List[Dict[str, Any]] = [] | |
| for item in items: | |
| name = (item.get("name") or "").strip() | |
| if not name: | |
| raise ValueError("Nazwa pozycji nie moze byc pusta.") | |
| quantity = _quantize(_decimal(item.get("quantity") or "0")) | |
| if quantity <= Decimal("0"): | |
| raise ValueError("Ilosc musi byc dodatnia.") | |
| unit = item.get("unit") or DEFAULT_UNIT | |
| vat_code = str(item.get("vat_code") or item.get("vat") or item.get("vatCode") or "23") | |
| if vat_code not in VAT_RATES: | |
| raise ValueError("Niepoprawna stawka VAT.") | |
| unit_price_raw = item.get("unit_price_gross") | |
| if unit_price_raw in (None, ""): | |
| unit_price_raw = item.get("unitPrice") or item.get("unit_price") or item.get("price") | |
| unit_price_gross = _quantize(_decimal(unit_price_raw or "0")) | |
| if unit_price_gross <= Decimal("0"): | |
| raise ValueError("Cena musi byc dodatnia.") | |
| vat_rate = VAT_RATES[vat_code] | |
| if vat_rate is None: | |
| unit_price_net = unit_price_gross | |
| vat_amount = Decimal("0.00") | |
| else: | |
| unit_price_net = _quantize(unit_price_gross / (Decimal("1.0") + vat_rate)) | |
| vat_amount = _quantize(unit_price_gross - unit_price_net) | |
| net_total = _quantize(unit_price_net * quantity) | |
| vat_total = _quantize(vat_amount * quantity) | |
| gross_total = _quantize(unit_price_gross * quantity) | |
| normalized_items.append( | |
| { | |
| "name": name, | |
| "quantity": str(quantity), | |
| "unit": unit, | |
| "vat_code": vat_code, | |
| "vat_label": item.get("vatLabel") or vat_code, | |
| "unit_price_net": str(unit_price_net), | |
| "unit_price_gross": str(unit_price_gross), | |
| "net_total": str(net_total), | |
| "vat_amount": str(vat_amount), | |
| "gross_total": str(gross_total), | |
| } | |
| ) | |
| totals = {"net": Decimal("0"), "vat": Decimal("0"), "gross": Decimal("0")} | |
| summary: Dict[str, Dict[str, Decimal]] = {} | |
| for item in normalized_items: | |
| totals["net"] += Decimal(item["net_total"]) | |
| totals["vat"] += Decimal(item["vat_amount"]) | |
| totals["gross"] += Decimal(item["gross_total"]) | |
| label = item["vat_label"] | |
| summary.setdefault(label, {"net_total": Decimal("0"), "vat_total": Decimal("0"), "gross_total": Decimal("0")}) | |
| summary[label]["net_total"] += Decimal(item["net_total"]) | |
| summary[label]["vat_total"] += Decimal(item["vat_amount"]) | |
| summary[label]["gross_total"] += Decimal(item["gross_total"]) | |
| totals = {key: str(_quantize(value)) for key, value in totals.items()} | |
| summary_list = [ | |
| { | |
| "vat_label": label, | |
| "net_total": str(_quantize(values["net_total"])), | |
| "vat_total": str(_quantize(values["vat_total"])), | |
| "gross_total": str(_quantize(values["gross_total"])), | |
| } | |
| for label, values in summary.items() | |
| ] | |
| exemption_note = (payload.get("exemption_note") or payload.get("exemptionNote") or "").strip() | |
| return { | |
| "invoice_id": invoice_id, | |
| "issued_at": issued_at, | |
| "sale_date": sale_date, | |
| "payment_term": payment_term, | |
| "items": normalized_items, | |
| "summary": summary_list, | |
| "totals": totals, | |
| "client": client, | |
| "business": business, | |
| "exemption_note": exemption_note, | |
| } | |
| def api_invoices() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| if request.method == "GET": | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| invoice_rows = fetch_all( | |
| """ | |
| SELECT i.id, | |
| i.invoice_number, | |
| i.issued_at, | |
| i.sale_date, | |
| i.payment_term_days, | |
| i.exemption_note, | |
| i.total_net, | |
| i.total_vat, | |
| i.total_gross, | |
| c.name AS client_name, | |
| c.address_line AS client_address, | |
| c.postal_code AS client_postal_code, | |
| c.city AS client_city, | |
| c.tax_id AS client_tax_id, | |
| c.phone AS client_phone | |
| FROM invoices AS i | |
| LEFT JOIN clients AS c ON c.id = i.client_id | |
| WHERE i.account_id = %s | |
| ORDER BY i.issued_at DESC | |
| LIMIT %s | |
| """, | |
| (account_row["id"], INVOICE_HISTORY_LIMIT), | |
| ) | |
| if not invoice_rows: | |
| return jsonify({"invoices": []}) | |
| invoice_ids = [row["id"] for row in invoice_rows] | |
| items_map: Dict[int, List[Dict[str, Any]]] = {row_id: [] for row_id in invoice_ids} | |
| summary_map: Dict[int, List[Dict[str, str]]] = {row_id: [] for row_id in invoice_ids} | |
| if invoice_ids: | |
| item_rows = fetch_all( | |
| """ | |
| SELECT invoice_id, line_no, name, quantity, unit, | |
| vat_code, vat_label, unit_price_net, | |
| unit_price_gross, net_total, vat_amount, gross_total | |
| FROM invoice_items | |
| WHERE invoice_id = ANY(%s) | |
| ORDER BY line_no | |
| """, | |
| (invoice_ids,), | |
| ) | |
| for item in item_rows: | |
| items_map.setdefault(item["invoice_id"], []).append( | |
| { | |
| "name": item["name"], | |
| "quantity": _format_decimal_str(item.get("quantity"), "0.00"), | |
| "unit": item.get("unit") or DEFAULT_UNIT, | |
| "vat_code": item.get("vat_code"), | |
| "vat_label": item.get("vat_label") or item.get("vat_code"), | |
| "unit_price_net": _format_decimal_str(item.get("unit_price_net")), | |
| "unit_price_gross": _format_decimal_str(item.get("unit_price_gross")), | |
| "net_total": _format_decimal_str(item.get("net_total")), | |
| "vat_amount": _format_decimal_str(item.get("vat_amount")), | |
| "gross_total": _format_decimal_str(item.get("gross_total")), | |
| } | |
| ) | |
| summary_rows = fetch_all( | |
| """ | |
| SELECT invoice_id, vat_label, net_total, vat_total, gross_total | |
| FROM invoice_vat_summary | |
| WHERE invoice_id = ANY(%s) | |
| ORDER BY vat_label | |
| """, | |
| (invoice_ids,), | |
| ) | |
| for entry in summary_rows: | |
| summary_map.setdefault(entry["invoice_id"], []).append( | |
| { | |
| "vat_label": entry.get("vat_label"), | |
| "net_total": _format_decimal_str(entry.get("net_total")), | |
| "vat_total": _format_decimal_str(entry.get("vat_total")), | |
| "gross_total": _format_decimal_str(entry.get("gross_total")), | |
| } | |
| ) | |
| business_profile = get_business_profile(account_row["id"]) | |
| invoices: List[Dict[str, Any]] = [] | |
| for row in invoice_rows: | |
| issued_at_value = row.get("issued_at") | |
| sale_date_value = row.get("sale_date") | |
| if isinstance(issued_at_value, datetime): | |
| issued_at = issued_at_value.strftime("%Y-%m-%d %H:%M") | |
| else: | |
| issued_at = issued_at_value | |
| if hasattr(sale_date_value, "isoformat"): | |
| sale_date = sale_date_value.isoformat() | |
| else: | |
| sale_date = sale_date_value | |
| client = None | |
| if row.get("client_name"): | |
| client = { | |
| "name": row.get("client_name"), | |
| "address_line": row.get("client_address"), | |
| "postal_code": row.get("client_postal_code"), | |
| "city": row.get("client_city"), | |
| "tax_id": row.get("client_tax_id"), | |
| "phone": row.get("client_phone"), | |
| } | |
| invoices.append( | |
| { | |
| "invoice_id": row.get("invoice_number"), | |
| "issued_at": issued_at, | |
| "sale_date": sale_date, | |
| "payment_term": row.get("payment_term_days"), | |
| "exemption_note": row.get("exemption_note"), | |
| "items": items_map.get(row["id"], []), | |
| "summary": summary_map.get(row["id"], []), | |
| "totals": { | |
| "net": _format_decimal_str(row.get("total_net")), | |
| "vat": _format_decimal_str(row.get("total_vat")), | |
| "gross": _format_decimal_str(row.get("total_gross")), | |
| }, | |
| "client": client, | |
| "business": business_profile, | |
| } | |
| ) | |
| return jsonify({"invoices": invoices}) | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| invoices = account.get("invoices", [])[:INVOICE_HISTORY_LIMIT] | |
| return jsonify({"invoices": invoices}) | |
| payload = request.get_json(force=True) | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| business = get_business_profile(account_row["id"]) | |
| if not business: | |
| return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 | |
| client = validate_client(payload) | |
| try: | |
| invoice = build_invoice(payload, business, client) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| client_id = upsert_client( | |
| account_row["id"], | |
| { | |
| "name": client["name"], | |
| "address_line": client["address_line"], | |
| "postal_code": client["postal_code"], | |
| "city": client["city"], | |
| "tax_id": client["tax_id"], | |
| "phone": client.get("phone"), | |
| }, | |
| ) | |
| insert_invoice(account_row["id"], client_id, invoice) | |
| return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice}) | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| business = account.get("business") | |
| if not business: | |
| return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 | |
| client = validate_client(payload) | |
| try: | |
| invoice = build_invoice(payload, business, client) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoices = account.setdefault("invoices", []) | |
| invoices.insert(0, invoice) | |
| account["invoices"] = invoices[:INVOICE_HISTORY_LIMIT] | |
| save_store(data) | |
| return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice}) | |
| def api_invoice_detail(invoice_id: str) -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| invoice_row = fetch_one( | |
| """ | |
| SELECT id, issued_at | |
| FROM invoices | |
| WHERE account_id = %s AND invoice_number = %s | |
| """, | |
| (account_row["id"], invoice_id), | |
| ) | |
| if not invoice_row: | |
| return jsonify({"error": "Nie znaleziono faktury."}), 404 | |
| if request.method == "DELETE": | |
| execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],)) | |
| execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],)) | |
| execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],)) | |
| return jsonify({"message": "Faktura zostala usunieta."}) | |
| payload = request.get_json(force=True) | |
| business = get_business_profile(account_row["id"]) | |
| if not business: | |
| return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 | |
| client = validate_client(payload) | |
| try: | |
| invoice = build_invoice(payload, business, client) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoice["invoice_id"] = invoice_id | |
| existing_issued_at = invoice_row.get("issued_at") | |
| if isinstance(existing_issued_at, datetime): | |
| invoice["issued_at"] = existing_issued_at.strftime("%Y-%m-%d %H:%M") | |
| elif existing_issued_at: | |
| invoice["issued_at"] = str(existing_issued_at) | |
| client_id = upsert_client( | |
| account_row["id"], | |
| { | |
| "name": client["name"], | |
| "address_line": client["address_line"], | |
| "postal_code": client["postal_code"], | |
| "city": client["city"], | |
| "tax_id": client["tax_id"], | |
| "phone": client.get("phone"), | |
| }, | |
| ) | |
| execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],)) | |
| execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],)) | |
| execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],)) | |
| insert_invoice(account_row["id"], client_id, invoice) | |
| return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice}) | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| invoices = account.get("invoices", []) | |
| invoice_index = next( | |
| (idx for idx, entry in enumerate(invoices) if entry.get("invoice_id") == invoice_id), | |
| None, | |
| ) | |
| if invoice_index is None: | |
| return jsonify({"error": "Nie znaleziono faktury."}), 404 | |
| if request.method == "DELETE": | |
| invoices.pop(invoice_index) | |
| save_store(data) | |
| return jsonify({"message": "Faktura zostala usunieta."}) | |
| payload = request.get_json(force=True) | |
| business = account.get("business") | |
| if not business: | |
| return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 | |
| client = validate_client(payload) | |
| try: | |
| invoice = build_invoice(payload, business, client) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoice["invoice_id"] = invoice_id | |
| existing_invoice = invoices[invoice_index] | |
| if existing_invoice.get("issued_at"): | |
| invoice["issued_at"] = existing_invoice.get("issued_at") | |
| invoices[invoice_index] = invoice | |
| save_store(data) | |
| return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice}) | |
| def api_invoice_summary() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| now = datetime.utcnow() | |
| last_month_start = now - timedelta(days=30) | |
| quarter_first_month = ((now.month - 1) // 3) * 3 + 1 | |
| quarter_start = now.replace(month=quarter_first_month, day=1, hour=0, minute=0, second=0, microsecond=0) | |
| year_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) | |
| def normalize_issued_at(value: Any) -> Optional[datetime]: | |
| if isinstance(value, datetime): | |
| tzinfo = value.tzinfo | |
| if tzinfo is not None and tzinfo.utcoffset(value) is not None: | |
| return value.astimezone(timezone.utc).replace(tzinfo=None) | |
| return value | |
| if isinstance(value, str): | |
| candidate = value.strip() | |
| if not candidate: | |
| return None | |
| for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"): | |
| try: | |
| return datetime.strptime(candidate, fmt) | |
| except ValueError: | |
| continue | |
| return None | |
| def aggregate_from_rows(rows: List[Dict[str, Any]], start: datetime) -> Dict[str, Any]: | |
| count = 0 | |
| gross_total = Decimal("0.00") | |
| for row in rows: | |
| issued_dt = normalize_issued_at(row.get("issued_at")) | |
| if issued_dt is None or issued_dt < start: | |
| continue | |
| try: | |
| gross_total += _decimal(row.get("total_gross") or "0") | |
| except ValueError: | |
| continue | |
| count += 1 | |
| return {"count": count, "gross_total": str(_quantize(gross_total))} | |
| if DATABASE_AVAILABLE: | |
| try: | |
| account_row = get_account_row(login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| rows = fetch_all( | |
| """ | |
| SELECT issued_at, total_gross | |
| FROM invoices | |
| WHERE account_id = %s | |
| """, | |
| (account_row["id"],), | |
| ) | |
| summary = { | |
| "last_month": aggregate_from_rows(rows, last_month_start), | |
| "quarter": aggregate_from_rows(rows, quarter_start), | |
| "year": aggregate_from_rows(rows, year_start), | |
| } | |
| return jsonify({"summary": summary}) | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| invoices = account.get("invoices", []) | |
| rows = [ | |
| { | |
| "issued_at": invoice.get("issued_at"), | |
| "total_gross": (invoice.get("totals") or {}).get("gross", "0"), | |
| } | |
| for invoice in invoices | |
| ] | |
| summary = { | |
| "last_month": aggregate_from_rows(rows, last_month_start), | |
| "quarter": aggregate_from_rows(rows, quarter_start), | |
| "year": aggregate_from_rows(rows, year_start), | |
| } | |
| return jsonify({"summary": summary}) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "5000")) | |
| app.run(host="0.0.0.0", port=port, debug=True) | |