import base64 import binascii import hashlib import json import os import re import uuid from datetime import date, datetime, timedelta, timezone from decimal import Decimal, ROUND_HALF_UP, getcontext from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from flask import Flask, jsonify, request, send_from_directory from db import ( create_account, execute, fetch_all, fetch_one, fetch_business_logo, insert_invoice, search_clients, update_business, update_business_logo, upsert_client, ) APP_ROOT = Path(__file__).parent.resolve() DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) DATA_FILE = DATA_DIR / "web_invoice_store.json" INVOICE_HISTORY_LIMIT = 200 MAX_LOGO_SIZE = 512 * 1024 # 512 KB TOKEN_TTL = timedelta(hours=12) ALLOWED_LOGO_MIME_TYPES = {"image/png", "image/jpeg"} DATABASE_AVAILABLE = bool(os.environ.get("NEON_DATABASE_URL")) VAT_RATES: Dict[str, Optional[Decimal]] = { "23": Decimal("0.23"), "8": Decimal("0.08"), "5": Decimal("0.05"), "0": Decimal("0.00"), "ZW": None, "NP": None, } DEFAULT_UNIT = "szt." ALLOWED_UNITS = {"szt.", "godz."} PASSWORD_MIN_LENGTH = 4 SESSION_TOKENS: Dict[str, Dict[str, Any]] = {} ALLOWED_STATIC = { "index.html", "styles.css", "main.js", "favicon.ico", "Roboto-VariableFont_wdth,wght.ttf", } app = Flask(__name__, static_folder=str(APP_ROOT), static_url_path="") getcontext().prec = 10 def _quantize(value: Decimal) -> Decimal: return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) def _decimal(value: Any) -> Decimal: try: return Decimal(str(value)) except Exception as error: # pragma: no cover - defensive raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error def _format_decimal_str(value: Any, default: str = "0.00") -> str: if value in (None, ""): return default if isinstance(value, Decimal): return str(_quantize(value)) try: return str(_quantize(Decimal(str(value)))) except Exception: return str(value) def hash_password(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() EMAIL_PATTERN = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$") def normalize_email(raw_email: str) -> Tuple[str, str]: display_email = (raw_email or "").strip() if not display_email: raise ValueError("Email nie moze byc pusty.") if not EMAIL_PATTERN.fullmatch(display_email): raise ValueError("Podaj poprawny adres email.") return display_email.lower(), display_email def sanitize_filename(filename: Optional[str]) -> str: if not filename: return "logo" name = str(filename).split("/")[-1].split("\\")[-1] sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._") return sanitized or "logo" def find_account_identifier(accounts: Dict[str, Any], identifier: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: key = (identifier or "").strip().lower() if not key: return None, None account = accounts.get(key) if account: return key, account for login_key, candidate in accounts.items(): candidate_login = (candidate.get("login") or "").strip().lower() candidate_email = (candidate.get("email") or "").strip().lower() if key in {candidate_login, candidate_email}: return login_key, candidate return None, None def load_store() -> Dict[str, Any]: if not DATA_FILE.exists(): return {"accounts": {}} try: with DATA_FILE.open("r", encoding="utf-8") as handle: data = json.load(handle) except json.JSONDecodeError: raise ValueError("Plik z danymi jest uszkodzony.") return data def save_store(data: Dict[str, Any]) -> None: DATA_DIR.mkdir(parents=True, exist_ok=True) tmp_path = DATA_FILE.with_suffix(".tmp") with tmp_path.open("w", encoding="utf-8") as handle: json.dump(data, handle, ensure_ascii=False, indent=2) tmp_path.replace(DATA_FILE) def get_account(data: Dict[str, Any], login_key: str) -> Dict[str, Any]: accounts = data.get("accounts") or {} account = accounts.get(login_key) if not account: raise KeyError("Nie znaleziono konta.") return account def get_account_row(login_key: str) -> Dict[str, Any]: row = fetch_one("SELECT id, login FROM accounts WHERE login = %s", (login_key,)) if not row: raise KeyError("Nie znaleziono konta.") return row def get_business_profile(account_id: int) -> Optional[Dict[str, Any]]: return fetch_one( """ SELECT company_name, owner_name, address_line, postal_code, city, tax_id, bank_account FROM business_profiles WHERE account_id = %s """, (account_id,), ) def require_auth() -> str: auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): raise PermissionError("Brak tokenu.") token = auth_header.split(" ", 1)[1].strip() session = SESSION_TOKENS.get(token) if not session: raise PermissionError("Nieprawidlowy token.") if session["expires_at"] < datetime.utcnow(): SESSION_TOKENS.pop(token, None) raise PermissionError("Token wygasl.") return session["login_key"] @app.route("/") def index() -> Any: return send_from_directory(app.static_folder, "index.html") @app.route("/") def static_files(filename: str) -> Any: if filename not in ALLOWED_STATIC: return jsonify({"error": "Nie ma takiego zasobu."}), 404 return send_from_directory(app.static_folder, filename) @app.route("/api/register", methods=["POST"]) def api_register() -> Any: payload = request.get_json(force=True) email = payload.get("email") password = payload.get("password") confirm = payload.get("confirm_password") business_fields = [ "company_name", "owner_name", "address_line", "postal_code", "city", "tax_id", "bank_account", ] business_data: Dict[str, str] = {} for field in business_fields: value = (payload.get(field) or "").strip() if not value: return jsonify({"error": f"Pole {field} jest wymagane."}), 400 business_data[field] = value if password != confirm: return jsonify({"error": "Hasla musza byc identyczne."}), 400 if len(password or "") < PASSWORD_MIN_LENGTH: return jsonify({"error": "Haslo jest za krotkie."}), 400 login_key, display_email = normalize_email(email) password_hash = hash_password(password) if DATABASE_AVAILABLE: if fetch_one("SELECT 1 FROM accounts WHERE login = %s", (login_key,)): return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400 account_id = create_account(login_key, display_email, password_hash) update_business(account_id, business_data) return jsonify({"message": "Konto zostalo utworzone."}) data = load_store() accounts = data.setdefault("accounts", {}) if login_key in accounts: return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400 accounts[login_key] = { "login": login_key, "email": display_email, "password_hash": password_hash, "business": business_data, "invoices": [], "logo": None, "created_at": datetime.utcnow().isoformat(timespec="seconds"), } save_store(data) return jsonify({"message": "Konto zostalo utworzone."}) @app.route("/api/login", methods=["POST"]) def api_login() -> Any: payload = request.get_json(force=True) identifier = payload.get("identifier") or payload.get("email") password = payload.get("password") if not identifier or not password: return jsonify({"error": "Podaj email/login i haslo."}), 400 login_key, _ = normalize_email(identifier) if DATABASE_AVAILABLE: row = fetch_one( "SELECT id, password_hash FROM accounts WHERE login = %s", (login_key,), ) if not row or row["password_hash"] != hash_password(password): return jsonify({"error": "Niepoprawne dane logowania."}), 401 token = uuid.uuid4().hex SESSION_TOKENS[token] = { "login_key": login_key, "account_id": row["id"], "expires_at": datetime.utcnow() + TOKEN_TTL, } return jsonify({"token": token, "login": login_key}) data = load_store() accounts = data.get("accounts") or {} login_key, account = find_account_identifier(accounts, login_key) if not account or account.get("password_hash") != hash_password(password): return jsonify({"error": "Niepoprawne dane logowania."}), 401 token = uuid.uuid4().hex SESSION_TOKENS[token] = { "login_key": login_key, "expires_at": datetime.utcnow() + TOKEN_TTL, } return jsonify({"token": token, "login": account.get("login", login_key)}) @app.route("/api/logout", methods=["POST"]) def api_logout() -> Any: token = request.headers.get("Authorization", "").replace("Bearer ", "") SESSION_TOKENS.pop(token, None) return jsonify({"message": "Wylogowano."}) @app.route("/api/business", methods=["GET", "POST"]) def api_business() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() account = data.get("accounts", {}).get(login_key) account_row = None if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 if request.method == "GET": if DATABASE_AVAILABLE: profile = fetch_one( """ SELECT company_name, owner_name, address_line, postal_code, city, tax_id, bank_account FROM business_profiles WHERE account_id = %s """, (account_row["id"],), ) return jsonify({"business": profile}) if not account: return jsonify({"business": None}) return jsonify({"business": account.get("business")}) payload = request.get_json(force=True) required_fields = [ "company_name", "owner_name", "address_line", "postal_code", "city", "tax_id", "bank_account", ] for field in required_fields: if not (payload.get(field) or "").strip(): return jsonify({"error": f"Pole {field} jest wymagane."}), 400 if DATABASE_AVAILABLE: update_business(account_row["id"], payload) return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."}) if not account: return jsonify({"error": "Nie znaleziono konta."}), 404 account["business"] = payload save_store(data) return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."}) @app.route("/api/clients", methods=["GET"]) def api_clients() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 if not DATABASE_AVAILABLE: return jsonify({"clients": []}) try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 search_term = (request.args.get("q") or "").strip() limit_param = request.args.get("limit", "10") try: limit_value = int(limit_param) except ValueError: limit_value = 10 limit_value = max(1, min(25, limit_value)) clients = search_clients(account_row["id"], search_term, limit_value) return jsonify({"clients": clients}) @app.route("/api/logo", methods=["GET", "POST", "DELETE"]) def api_logo() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 account_row = None account = None data = None if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 else: data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 if request.method == "GET": if DATABASE_AVAILABLE: logo_row = fetch_business_logo(account_row["id"]) if not logo_row: return jsonify({"logo": None}) mime_type = logo_row["mime_type"] encoded = logo_row["data"] data_url = f"data:{mime_type};base64,{encoded}" if mime_type and encoded else None return jsonify( { "logo": { "filename": None, "mime_type": mime_type, "data": encoded, "data_url": data_url, "uploaded_at": None, } } ) logo = account.get("logo") if account else None if not logo: return jsonify({"logo": None}) encoded = logo.get("data") mime_type = logo.get("mime_type") data_url = None if encoded and mime_type: data_url = f"data:{mime_type};base64,{encoded}" return jsonify( { "logo": { "filename": logo.get("filename"), "mime_type": mime_type, "data": encoded, "data_url": data_url, "uploaded_at": logo.get("uploaded_at"), } } ) if request.method == "DELETE": if DATABASE_AVAILABLE: update_business_logo(account_row["id"], None, None) return jsonify({"message": "Logo zostalo usuniete."}) if not account or data is None: return jsonify({"error": "Nie znaleziono konta."}), 404 account["logo"] = None save_store(data) return jsonify({"message": "Logo zostalo usuniete."}) payload = request.get_json(force=True) raw_content = (payload.get("content") or payload.get("data") or "").strip() if not raw_content: return jsonify({"error": "Brak danych logo."}), 400 provided_mime = (payload.get("mime_type") or "").strip() filename = sanitize_filename(payload.get("filename")) if raw_content.startswith("data:"): try: header, encoded_content = raw_content.split(",", 1) except ValueError: return jsonify({"error": "Niepoprawny format danych logo."}), 400 header = header.strip() if ";base64" not in header: return jsonify({"error": "Niepoprawny format danych logo (oczekiwano base64)."}), 400 mime_type = header.split(";")[0].replace("data:", "", 1) or provided_mime base64_content = encoded_content.strip() else: mime_type = provided_mime base64_content = raw_content mime_type = (mime_type or "").lower() if mime_type not in ALLOWED_LOGO_MIME_TYPES: return jsonify({"error": "Dozwolone formaty logo to PNG lub JPG."}), 400 try: logo_bytes = base64.b64decode(base64_content, validate=True) except (ValueError, binascii.Error): return jsonify({"error": "Nie udalo sie odczytac danych logo (base64)."}), 400 if len(logo_bytes) > MAX_LOGO_SIZE: return jsonify({"error": f"Logo jest zbyt duze (maksymalnie {MAX_LOGO_SIZE // 1024} KB)."}), 400 encoded_logo = base64.b64encode(logo_bytes).decode("ascii") stored_logo = { "filename": filename, "mime_type": mime_type, "data": encoded_logo, "uploaded_at": datetime.utcnow().isoformat(timespec="seconds"), } if DATABASE_AVAILABLE: update_business_logo(account_row["id"], stored_logo["mime_type"], stored_logo["data"]) return jsonify({"logo": stored_logo}) account["logo"] = stored_logo save_store(data) return jsonify({"logo": stored_logo}) def normalize_phone(phone: Optional[str]) -> Optional[str]: if not phone: return None digits = re.sub(r"[^\d+]", "", phone) return digits or None def validate_client(payload: Dict[str, Any]) -> Dict[str, str]: client_payload = payload.get("client") or {} client = { "name": (client_payload.get("name") or payload.get("clientName") or "").strip(), "tax_id": (client_payload.get("tax_id") or payload.get("clientTaxId") or "").strip(), "address_line": (client_payload.get("address_line") or payload.get("clientAddress") or "").strip(), "postal_code": (client_payload.get("postal_code") or payload.get("clientPostalCode") or "").strip(), "city": (client_payload.get("city") or payload.get("clientCity") or "").strip(), "phone": normalize_phone(client_payload.get("phone") or payload.get("clientPhone")), } return client def build_invoice(payload: Dict[str, Any], business: Dict[str, Any], client: Dict[str, str]) -> Dict[str, Any]: now = datetime.now() invoice_id = f"FV-{now.strftime('%Y%m%d-%H%M%S')}" issued_at = now.strftime("%Y-%m-%d %H:%M") sale_date = payload.get("sale_date") or payload.get("saleDate") or date.today().isoformat() payment_term = int(payload.get("payment_term") or payload.get("paymentTerm") or 14) items = payload.get("items") or [] normalized_items: List[Dict[str, Any]] = [] for item in items: name = (item.get("name") or "").strip() if not name: raise ValueError("Nazwa pozycji nie moze byc pusta.") quantity = _quantize(_decimal(item.get("quantity") or "0")) if quantity <= Decimal("0"): raise ValueError("Ilosc musi byc dodatnia.") unit = item.get("unit") or DEFAULT_UNIT vat_code = str(item.get("vat_code") or item.get("vat") or item.get("vatCode") or "23") if vat_code not in VAT_RATES: raise ValueError("Niepoprawna stawka VAT.") unit_price_raw = item.get("unit_price_gross") if unit_price_raw in (None, ""): unit_price_raw = item.get("unitPrice") or item.get("unit_price") or item.get("price") unit_price_gross = _quantize(_decimal(unit_price_raw or "0")) if unit_price_gross <= Decimal("0"): raise ValueError("Cena musi byc dodatnia.") vat_rate = VAT_RATES[vat_code] if vat_rate is None: unit_price_net = unit_price_gross vat_amount = Decimal("0.00") else: unit_price_net = _quantize(unit_price_gross / (Decimal("1.0") + vat_rate)) vat_amount = _quantize(unit_price_gross - unit_price_net) net_total = _quantize(unit_price_net * quantity) vat_total = _quantize(vat_amount * quantity) gross_total = _quantize(unit_price_gross * quantity) normalized_items.append( { "name": name, "quantity": str(quantity), "unit": unit, "vat_code": vat_code, "vat_label": item.get("vatLabel") or vat_code, "unit_price_net": str(unit_price_net), "unit_price_gross": str(unit_price_gross), "net_total": str(net_total), "vat_amount": str(vat_amount), "gross_total": str(gross_total), } ) totals = {"net": Decimal("0"), "vat": Decimal("0"), "gross": Decimal("0")} summary: Dict[str, Dict[str, Decimal]] = {} for item in normalized_items: totals["net"] += Decimal(item["net_total"]) totals["vat"] += Decimal(item["vat_amount"]) totals["gross"] += Decimal(item["gross_total"]) label = item["vat_label"] summary.setdefault(label, {"net_total": Decimal("0"), "vat_total": Decimal("0"), "gross_total": Decimal("0")}) summary[label]["net_total"] += Decimal(item["net_total"]) summary[label]["vat_total"] += Decimal(item["vat_amount"]) summary[label]["gross_total"] += Decimal(item["gross_total"]) totals = {key: str(_quantize(value)) for key, value in totals.items()} summary_list = [ { "vat_label": label, "net_total": str(_quantize(values["net_total"])), "vat_total": str(_quantize(values["vat_total"])), "gross_total": str(_quantize(values["gross_total"])), } for label, values in summary.items() ] exemption_note = (payload.get("exemption_note") or payload.get("exemptionNote") or "").strip() return { "invoice_id": invoice_id, "issued_at": issued_at, "sale_date": sale_date, "payment_term": payment_term, "items": normalized_items, "summary": summary_list, "totals": totals, "client": client, "business": business, "exemption_note": exemption_note, } @app.route("/api/invoices", methods=["GET", "POST"]) def api_invoices() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 if request.method == "GET": if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 invoice_rows = fetch_all( """ SELECT i.id, i.invoice_number, i.issued_at, i.sale_date, i.payment_term_days, i.exemption_note, i.total_net, i.total_vat, i.total_gross, c.name AS client_name, c.address_line AS client_address, c.postal_code AS client_postal_code, c.city AS client_city, c.tax_id AS client_tax_id, c.phone AS client_phone FROM invoices AS i LEFT JOIN clients AS c ON c.id = i.client_id WHERE i.account_id = %s ORDER BY i.issued_at DESC LIMIT %s """, (account_row["id"], INVOICE_HISTORY_LIMIT), ) if not invoice_rows: return jsonify({"invoices": []}) invoice_ids = [row["id"] for row in invoice_rows] items_map: Dict[int, List[Dict[str, Any]]] = {row_id: [] for row_id in invoice_ids} summary_map: Dict[int, List[Dict[str, str]]] = {row_id: [] for row_id in invoice_ids} if invoice_ids: item_rows = fetch_all( """ SELECT invoice_id, line_no, name, quantity, unit, vat_code, vat_label, unit_price_net, unit_price_gross, net_total, vat_amount, gross_total FROM invoice_items WHERE invoice_id = ANY(%s) ORDER BY line_no """, (invoice_ids,), ) for item in item_rows: items_map.setdefault(item["invoice_id"], []).append( { "name": item["name"], "quantity": _format_decimal_str(item.get("quantity"), "0.00"), "unit": item.get("unit") or DEFAULT_UNIT, "vat_code": item.get("vat_code"), "vat_label": item.get("vat_label") or item.get("vat_code"), "unit_price_net": _format_decimal_str(item.get("unit_price_net")), "unit_price_gross": _format_decimal_str(item.get("unit_price_gross")), "net_total": _format_decimal_str(item.get("net_total")), "vat_amount": _format_decimal_str(item.get("vat_amount")), "gross_total": _format_decimal_str(item.get("gross_total")), } ) summary_rows = fetch_all( """ SELECT invoice_id, vat_label, net_total, vat_total, gross_total FROM invoice_vat_summary WHERE invoice_id = ANY(%s) ORDER BY vat_label """, (invoice_ids,), ) for entry in summary_rows: summary_map.setdefault(entry["invoice_id"], []).append( { "vat_label": entry.get("vat_label"), "net_total": _format_decimal_str(entry.get("net_total")), "vat_total": _format_decimal_str(entry.get("vat_total")), "gross_total": _format_decimal_str(entry.get("gross_total")), } ) business_profile = get_business_profile(account_row["id"]) invoices: List[Dict[str, Any]] = [] for row in invoice_rows: issued_at_value = row.get("issued_at") sale_date_value = row.get("sale_date") if isinstance(issued_at_value, datetime): issued_at = issued_at_value.strftime("%Y-%m-%d %H:%M") else: issued_at = issued_at_value if hasattr(sale_date_value, "isoformat"): sale_date = sale_date_value.isoformat() else: sale_date = sale_date_value client = None if row.get("client_name"): client = { "name": row.get("client_name"), "address_line": row.get("client_address"), "postal_code": row.get("client_postal_code"), "city": row.get("client_city"), "tax_id": row.get("client_tax_id"), "phone": row.get("client_phone"), } invoices.append( { "invoice_id": row.get("invoice_number"), "issued_at": issued_at, "sale_date": sale_date, "payment_term": row.get("payment_term_days"), "exemption_note": row.get("exemption_note"), "items": items_map.get(row["id"], []), "summary": summary_map.get(row["id"], []), "totals": { "net": _format_decimal_str(row.get("total_net")), "vat": _format_decimal_str(row.get("total_vat")), "gross": _format_decimal_str(row.get("total_gross")), }, "client": client, "business": business_profile, } ) return jsonify({"invoices": invoices}) data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 invoices = account.get("invoices", [])[:INVOICE_HISTORY_LIMIT] return jsonify({"invoices": invoices}) payload = request.get_json(force=True) if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 business = get_business_profile(account_row["id"]) if not business: return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 client = validate_client(payload) try: invoice = build_invoice(payload, business, client) except ValueError as error: return jsonify({"error": str(error)}), 400 client_id = upsert_client( account_row["id"], { "name": client["name"], "address_line": client["address_line"], "postal_code": client["postal_code"], "city": client["city"], "tax_id": client["tax_id"], "phone": client.get("phone"), }, ) insert_invoice(account_row["id"], client_id, invoice) return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice}) data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 business = account.get("business") if not business: return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 client = validate_client(payload) try: invoice = build_invoice(payload, business, client) except ValueError as error: return jsonify({"error": str(error)}), 400 invoices = account.setdefault("invoices", []) invoices.insert(0, invoice) account["invoices"] = invoices[:INVOICE_HISTORY_LIMIT] save_store(data) return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice}) @app.route("/api/invoices/", methods=["PUT", "DELETE"]) def api_invoice_detail(invoice_id: str) -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 invoice_row = fetch_one( """ SELECT id, issued_at FROM invoices WHERE account_id = %s AND invoice_number = %s """, (account_row["id"], invoice_id), ) if not invoice_row: return jsonify({"error": "Nie znaleziono faktury."}), 404 if request.method == "DELETE": execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],)) execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],)) execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],)) return jsonify({"message": "Faktura zostala usunieta."}) payload = request.get_json(force=True) business = get_business_profile(account_row["id"]) if not business: return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 client = validate_client(payload) try: invoice = build_invoice(payload, business, client) except ValueError as error: return jsonify({"error": str(error)}), 400 invoice["invoice_id"] = invoice_id existing_issued_at = invoice_row.get("issued_at") if isinstance(existing_issued_at, datetime): invoice["issued_at"] = existing_issued_at.strftime("%Y-%m-%d %H:%M") elif existing_issued_at: invoice["issued_at"] = str(existing_issued_at) client_id = upsert_client( account_row["id"], { "name": client["name"], "address_line": client["address_line"], "postal_code": client["postal_code"], "city": client["city"], "tax_id": client["tax_id"], "phone": client.get("phone"), }, ) execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],)) execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],)) execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],)) insert_invoice(account_row["id"], client_id, invoice) return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice}) data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 invoices = account.get("invoices", []) invoice_index = next( (idx for idx, entry in enumerate(invoices) if entry.get("invoice_id") == invoice_id), None, ) if invoice_index is None: return jsonify({"error": "Nie znaleziono faktury."}), 404 if request.method == "DELETE": invoices.pop(invoice_index) save_store(data) return jsonify({"message": "Faktura zostala usunieta."}) payload = request.get_json(force=True) business = account.get("business") if not business: return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400 client = validate_client(payload) try: invoice = build_invoice(payload, business, client) except ValueError as error: return jsonify({"error": str(error)}), 400 invoice["invoice_id"] = invoice_id existing_invoice = invoices[invoice_index] if existing_invoice.get("issued_at"): invoice["issued_at"] = existing_invoice.get("issued_at") invoices[invoice_index] = invoice save_store(data) return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice}) @app.route("/api/invoices/summary", methods=["GET"]) def api_invoice_summary() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 now = datetime.utcnow() last_month_start = now - timedelta(days=30) quarter_first_month = ((now.month - 1) // 3) * 3 + 1 quarter_start = now.replace(month=quarter_first_month, day=1, hour=0, minute=0, second=0, microsecond=0) year_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) def normalize_issued_at(value: Any) -> Optional[datetime]: if isinstance(value, datetime): tzinfo = value.tzinfo if tzinfo is not None and tzinfo.utcoffset(value) is not None: return value.astimezone(timezone.utc).replace(tzinfo=None) return value if isinstance(value, str): candidate = value.strip() if not candidate: return None for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"): try: return datetime.strptime(candidate, fmt) except ValueError: continue return None def aggregate_from_rows(rows: List[Dict[str, Any]], start: datetime) -> Dict[str, Any]: count = 0 gross_total = Decimal("0.00") for row in rows: issued_dt = normalize_issued_at(row.get("issued_at")) if issued_dt is None or issued_dt < start: continue try: gross_total += _decimal(row.get("total_gross") or "0") except ValueError: continue count += 1 return {"count": count, "gross_total": str(_quantize(gross_total))} if DATABASE_AVAILABLE: try: account_row = get_account_row(login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 rows = fetch_all( """ SELECT issued_at, total_gross FROM invoices WHERE account_id = %s """, (account_row["id"],), ) summary = { "last_month": aggregate_from_rows(rows, last_month_start), "quarter": aggregate_from_rows(rows, quarter_start), "year": aggregate_from_rows(rows, year_start), } return jsonify({"summary": summary}) data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 invoices = account.get("invoices", []) rows = [ { "issued_at": invoice.get("issued_at"), "total_gross": (invoice.get("totals") or {}).get("gross", "0"), } for invoice in invoices ] summary = { "last_month": aggregate_from_rows(rows, last_month_start), "quarter": aggregate_from_rows(rows, quarter_start), "year": aggregate_from_rows(rows, year_start), } return jsonify({"summary": summary}) if __name__ == "__main__": port = int(os.environ.get("PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True)