Spaces:
Sleeping
Sleeping
| import base64 | |
| import binascii | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import uuid | |
| from datetime import date, datetime, timedelta | |
| from decimal import Decimal, ROUND_HALF_UP, getcontext | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from flask import Flask, jsonify, request, send_from_directory | |
| APP_ROOT = Path(__file__).parent.resolve() | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) | |
| DATA_FILE = DATA_DIR / "web_invoice_store.json" | |
| INVOICE_HISTORY_LIMIT = 200 | |
| MAX_LOGO_SIZE = 512 * 1024 # 512 KB | |
| TOKEN_TTL = timedelta(hours=12) | |
| ALLOWED_LOGO_MIME_TYPES = {"image/png", "image/jpeg"} | |
| VAT_RATES: Dict[str, Optional[Decimal]] = { | |
| "23": Decimal("0.23"), | |
| "8": Decimal("0.08"), | |
| "5": Decimal("0.05"), | |
| "0": Decimal("0.00"), | |
| "ZW": None, | |
| "NP": None, | |
| } | |
| DEFAULT_UNIT = "szt." | |
| ALLOWED_UNITS = {"szt.", "godz."} | |
| PASSWORD_MIN_LENGTH = 4 | |
| SESSION_TOKENS: Dict[str, Dict[str, Any]] = {} | |
| ALLOWED_STATIC = { | |
| "index.html", | |
| "styles.css", | |
| "main.js", | |
| "favicon.ico", | |
| "Roboto-VariableFont_wdth,wght.ttf", | |
| } | |
| app = Flask(__name__, static_folder=str(APP_ROOT), static_url_path="") | |
| getcontext().prec = 10 | |
| def _quantize(value: Decimal) -> Decimal: | |
| return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) | |
| def _decimal(value: Any) -> Decimal: | |
| try: | |
| return Decimal(str(value)) | |
| except Exception as error: # pragma: no cover - defensive | |
| raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error | |
| def hash_password(password: str) -> str: | |
| return hashlib.sha256(password.encode("utf-8")).hexdigest() | |
| EMAIL_PATTERN = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$") | |
| def normalize_email(raw_email: str) -> Tuple[str, str]: | |
| display_email = (raw_email or "").strip() | |
| if not display_email: | |
| raise ValueError("Email nie moze byc pusty.") | |
| if not EMAIL_PATTERN.fullmatch(display_email): | |
| raise ValueError("Podaj poprawny adres email.") | |
| return display_email.lower(), display_email | |
| def sanitize_filename(filename: Optional[str]) -> str: | |
| if not filename: | |
| return "logo" | |
| name = str(filename).split("/")[-1].split("\\")[-1] | |
| sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._") | |
| return sanitized or "logo" | |
| def find_account_identifier(accounts: Dict[str, Any], identifier: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: | |
| key = (identifier or "").strip().lower() | |
| if not key: | |
| return None, None | |
| account = accounts.get(key) | |
| if account: | |
| return key, account | |
| for login_key, candidate in accounts.items(): | |
| candidate_login = (candidate.get("login") or "").strip().lower() | |
| candidate_email = (candidate.get("email") or "").strip().lower() | |
| if key in {candidate_login, candidate_email}: | |
| return login_key, candidate | |
| return None, None | |
| def migrate_store_if_needed(data: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]: | |
| if "accounts" in data: | |
| accounts = data.get("accounts") or {} | |
| for login, account in accounts.items(): | |
| account.setdefault("login", login) | |
| email = (account.get("email") or account.get("login") or "").strip() | |
| account["email"] = email | |
| account.setdefault("business", None) | |
| account.setdefault("password_hash", None) | |
| account.setdefault("invoices", []) | |
| account.setdefault("logo", None) | |
| account.setdefault("created_at", datetime.utcnow().isoformat(timespec="seconds")) | |
| data["accounts"] = accounts | |
| return data, False | |
| legacy_business = data.get("business") | |
| legacy_password = data.get("password_hash") | |
| legacy_invoices = data.get("invoices", []) | |
| legacy_logo = data.get("logo") | |
| accounts: Dict[str, Any] = {} | |
| legacy_login_hint = None | |
| if legacy_password: | |
| login_key = "admin" | |
| accounts[login_key] = { | |
| "login": login_key, | |
| "password_hash": legacy_password, | |
| "business": legacy_business, | |
| "invoices": legacy_invoices, | |
| "logo": legacy_logo, | |
| "created_at": datetime.utcnow().isoformat(timespec="seconds"), | |
| } | |
| legacy_login_hint = login_key | |
| migrated: Dict[str, Any] = {"accounts": accounts} | |
| if legacy_login_hint: | |
| migrated["legacy_login_hint"] = legacy_login_hint | |
| return migrated, True | |
| def load_store() -> Dict[str, Any]: | |
| if not DATA_FILE.exists(): | |
| return {"accounts": {}} | |
| with DATA_FILE.open("r", encoding="utf-8") as handle: | |
| data = json.load(handle) | |
| normalized, migrated = migrate_store_if_needed(data) | |
| if migrated: | |
| save_store(normalized) | |
| return normalized | |
| def save_store(data: Dict[str, Any]) -> None: | |
| DATA_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with DATA_FILE.open("w", encoding="utf-8") as handle: | |
| json.dump(data, handle, ensure_ascii=False, indent=2) | |
| def ensure_business_configured(account: Dict[str, Any]) -> None: | |
| if not account.get("business"): | |
| raise ValueError("Dane sprzedawcy nie zostaly uzupelnione.") | |
| def ensure_account_defaults(account: Dict[str, Any], login_key: str) -> Dict[str, Any]: | |
| account.setdefault("login", login_key) | |
| email_value = (account.get("email") or account.get("login") or "").strip() | |
| account["email"] = email_value | |
| account.setdefault("business", None) | |
| account.setdefault("password_hash", None) | |
| invoices = account.setdefault("invoices", []) | |
| if isinstance(invoices, list): | |
| for invoice in invoices: | |
| if not isinstance(invoice, dict): | |
| continue | |
| items = invoice.get("items") | |
| if not isinstance(items, list): | |
| continue | |
| for item in items: | |
| if not isinstance(item, dict): | |
| continue | |
| raw_quantity = str(item.get("quantity", "")).strip() | |
| try: | |
| quantity_decimal = _decimal(raw_quantity or "0") | |
| except ValueError: | |
| quantity_decimal = Decimal("0") | |
| if quantity_decimal > 0: | |
| quantity_integral = quantity_decimal.to_integral_value(rounding=ROUND_HALF_UP) | |
| item["quantity"] = str(int(quantity_integral)) | |
| unit_value = (item.get("unit") or "").strip() | |
| if unit_value not in ALLOWED_UNITS: | |
| unit_value = DEFAULT_UNIT | |
| item["unit"] = unit_value | |
| account.setdefault("logo", None) | |
| account.setdefault("created_at", datetime.utcnow().isoformat(timespec="seconds")) | |
| return account | |
| def get_accounts(data: Dict[str, Any]) -> Dict[str, Any]: | |
| accounts = data.setdefault("accounts", {}) | |
| for login_key, account in list(accounts.items()): | |
| accounts[login_key] = ensure_account_defaults(account, login_key) | |
| return accounts | |
| def get_account(data: Dict[str, Any], login_key: str) -> Dict[str, Any]: | |
| accounts = get_accounts(data) | |
| account = accounts.get(login_key) | |
| if not account: | |
| raise KeyError("Nie znaleziono konta.") | |
| return account | |
| def parse_iso_date(value: Optional[str]) -> Optional[str]: | |
| if not value: | |
| return None | |
| try: | |
| parsed = datetime.fromisoformat(value) | |
| return parsed.strftime("%Y-%m-%d") | |
| except ValueError: | |
| return None | |
| def compute_invoice_items(items_payload: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Decimal]]]: | |
| if not items_payload: | |
| raise ValueError("Dodaj przynajmniej jedna pozycje.") | |
| computed_items: List[Dict[str, Any]] = [] | |
| summary: Dict[str, Dict[str, Decimal]] = {} | |
| for raw in items_payload: | |
| name = (raw.get("name") or "").strip() | |
| if not name: | |
| raise ValueError("Kazda pozycja musi miec nazwe.") | |
| quantity_decimal = _decimal(raw.get("quantity", "0")) | |
| if quantity_decimal <= 0: | |
| raise ValueError("Ilosc musi byc wieksza od zera.") | |
| quantity_integral = quantity_decimal.to_integral_value(rounding=ROUND_HALF_UP) | |
| if quantity_decimal != quantity_integral: | |
| raise ValueError("Ilosc musi byc liczba calkowita.") | |
| quantity = int(quantity_integral) | |
| unit_raw = str(raw.get("unit", "") or DEFAULT_UNIT).strip() | |
| unit = unit_raw if unit_raw in ALLOWED_UNITS else None | |
| if unit is None: | |
| raise ValueError("Wybrano nieprawidlowa jednostke.") | |
| vat_code = str(raw.get("vat_code", "")).upper() | |
| if vat_code not in VAT_RATES: | |
| raise ValueError(f"Nieznana stawka VAT: {vat_code}") | |
| unit_price_gross = _decimal(raw.get("unit_price_gross", "0")) | |
| if unit_price_gross <= 0: | |
| raise ValueError("Cena brutto musi byc wieksza od zera.") | |
| rate = VAT_RATES[vat_code] | |
| if rate is None: | |
| unit_price_net = unit_price_gross | |
| vat_amount = Decimal("0.00") | |
| else: | |
| unit_price_net = unit_price_gross / (Decimal("1.00") + rate) | |
| vat_amount = unit_price_gross - unit_price_net | |
| unit_price_net = _quantize(unit_price_net) | |
| unit_price_gross = _quantize(unit_price_gross) | |
| quantity_decimal_value = Decimal(quantity) | |
| net_total = _quantize(unit_price_net * quantity_decimal_value) | |
| vat_amount_total = _quantize(vat_amount * quantity_decimal_value if rate is not None else Decimal("0.00")) | |
| gross_total = _quantize(unit_price_gross * quantity_decimal_value) | |
| vat_label = "ZW" if vat_code in {"ZW", "0"} else ("NP" if vat_code == "NP" else f"{vat_code}%") | |
| computed_items.append( | |
| { | |
| "name": name, | |
| "unit": unit, | |
| "quantity": str(quantity), | |
| "vat_code": vat_code, | |
| "vat_label": vat_label, | |
| "unit_price_net": str(unit_price_net), | |
| "unit_price_gross": str(unit_price_gross), | |
| "net_total": str(net_total), | |
| "vat_amount": str(vat_amount_total), | |
| "gross_total": str(gross_total), | |
| } | |
| ) | |
| summary_key = vat_label | |
| bucket = summary.setdefault(summary_key, {"net": Decimal("0.00"), "vat": Decimal("0.00"), "gross": Decimal("0.00")}) | |
| bucket["net"] += net_total | |
| bucket["vat"] += vat_amount_total | |
| bucket["gross"] += gross_total | |
| return computed_items, summary | |
| def computed_summary_to_serializable(summary: Dict[str, Dict[str, Decimal]]) -> List[Dict[str, str]]: | |
| serialized = [] | |
| for vat_label, values in summary.items(): | |
| serialized.append( | |
| { | |
| "vat_label": vat_label, | |
| "net_total": str(_quantize(values["net"])), | |
| "vat_total": str(_quantize(values["vat"])), | |
| "gross_total": str(_quantize(values["gross"])), | |
| } | |
| ) | |
| serialized.sort(key=lambda item: item["vat_label"]) | |
| return serialized | |
| def compute_invoice(payload: Dict[str, Any], business: Dict[str, Any], *, invoice_id: Optional[str] = None, issued_at: Optional[str] = None) -> Dict[str, Any]: | |
| items_payload = payload.get("items", []) | |
| computed_items, summary = compute_invoice_items(items_payload) | |
| net_sum = sum(Decimal(item["net_total"]) for item in computed_items) | |
| vat_sum = sum(Decimal(item["vat_amount"]) for item in computed_items) | |
| gross_sum = sum(Decimal(item["gross_total"]) for item in computed_items) | |
| issued_timestamp = datetime.now() | |
| if issued_at: | |
| try: | |
| issued_timestamp = datetime.strptime(issued_at, "%Y-%m-%d %H:%M") | |
| except ValueError: | |
| issued_timestamp = datetime.now() | |
| generated_id = invoice_id or issued_timestamp.strftime("FV-%Y%m%d-%H%M%S") | |
| sale_date = parse_iso_date(payload.get("sale_date")) or issued_timestamp.strftime("%Y-%m-%d") | |
| payment_term = payload.get("payment_term") | |
| client_payload = payload.get("client") or {} | |
| client = { | |
| "name": (client_payload.get("name") or "").strip(), | |
| "address_line": (client_payload.get("address_line") or "").strip(), | |
| "postal_code": (client_payload.get("postal_code") or "").strip(), | |
| "city": (client_payload.get("city") or "").strip(), | |
| "tax_id": (client_payload.get("tax_id") or "").strip(), | |
| "phone": (client_payload.get("phone") or "").strip(), | |
| } | |
| invoice = { | |
| "invoice_id": generated_id, | |
| "issued_at": issued_timestamp.strftime("%Y-%m-%d %H:%M"), | |
| "sale_date": sale_date, | |
| "payment_term": payment_term, | |
| "items": computed_items, | |
| "summary": computed_summary_to_serializable(summary), | |
| "totals": { | |
| "net": str(_quantize(net_sum)), | |
| "vat": str(_quantize(vat_sum)), | |
| "gross": str(_quantize(gross_sum)), | |
| }, | |
| "client": client, | |
| "exemption_note": (payload.get("exemption_note") or "").strip(), | |
| } | |
| return invoice | |
| def cleanup_tokens() -> None: | |
| now = datetime.utcnow() | |
| expired = [token for token, payload in SESSION_TOKENS.items() if now - payload["issued_at"] > TOKEN_TTL] | |
| for token in expired: | |
| SESSION_TOKENS.pop(token, None) | |
| def create_token(login: str) -> str: | |
| cleanup_tokens() | |
| token = uuid.uuid4().hex | |
| SESSION_TOKENS[token] = {"login": login, "issued_at": datetime.utcnow()} | |
| return token | |
| def get_token() -> Optional[str]: | |
| header = request.headers.get("Authorization", "") | |
| if not header.startswith("Bearer "): | |
| return None | |
| return header.split(" ", 1)[1].strip() | |
| def require_auth() -> str: | |
| cleanup_tokens() | |
| token = get_token() | |
| if not token: | |
| raise PermissionError("Brak autoryzacji.") | |
| payload = SESSION_TOKENS.get(token) | |
| if not payload: | |
| raise PermissionError("Brak autoryzacji.") | |
| payload["issued_at"] = datetime.utcnow() | |
| return payload["login"] | |
| def serve_index() -> Any: | |
| return send_from_directory(app.static_folder, "index.html") | |
| def serve_static(path: str) -> Any: | |
| if path not in ALLOWED_STATIC: | |
| return jsonify({"error": "Nie znaleziono pliku."}), 404 | |
| target = Path(app.static_folder) / path | |
| if target.is_file(): | |
| return send_from_directory(app.static_folder, path) | |
| return jsonify({"error": "Nie znaleziono pliku."}), 404 | |
| def api_status() -> Any: | |
| data = load_store() | |
| accounts = get_accounts(data) | |
| response = { | |
| "configured": bool(accounts), | |
| "legacy_login_hint": data.get("legacy_login_hint"), | |
| "max_logo_size": MAX_LOGO_SIZE, | |
| } | |
| return jsonify(response) | |
| def api_setup() -> Any: | |
| data = load_store() | |
| payload = request.get_json(force=True) | |
| try: | |
| email_key, display_email = normalize_email(payload.get("email", "")) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| accounts = get_accounts(data) | |
| existing_key, existing_account = find_account_identifier(accounts, display_email) | |
| if existing_key and existing_account: | |
| return jsonify({"error": "Podany adres email jest juz zajety."}), 400 | |
| required_fields = [ | |
| "company_name", | |
| "owner_name", | |
| "address_line", | |
| "postal_code", | |
| "city", | |
| "tax_id", | |
| "bank_account", | |
| ] | |
| missing = [field for field in required_fields if not (payload.get(field) or "").strip()] | |
| if missing: | |
| return jsonify({"error": f"Brakuje pol: {', '.join(missing)}"}), 400 | |
| password = (payload.get("password") or "").strip() | |
| if len(password) < PASSWORD_MIN_LENGTH: | |
| return jsonify({"error": f"Haslo musi miec co najmniej {PASSWORD_MIN_LENGTH} znakow."}), 400 | |
| new_account = { | |
| "login": display_email, | |
| "email": display_email, | |
| "password_hash": hash_password(password), | |
| "business": { | |
| "company_name": payload["company_name"].strip(), | |
| "owner_name": payload["owner_name"].strip(), | |
| "address_line": payload["address_line"].strip(), | |
| "postal_code": payload["postal_code"].strip(), | |
| "city": payload["city"].strip(), | |
| "tax_id": payload["tax_id"].strip(), | |
| "bank_account": payload["bank_account"].strip(), | |
| }, | |
| "invoices": [], | |
| "logo": None, | |
| "created_at": datetime.utcnow().isoformat(timespec="seconds"), | |
| } | |
| accounts[email_key] = new_account | |
| data.pop("legacy_login_hint", None) | |
| save_store(data) | |
| return jsonify({"message": "Konto utworzone. Mozesz sie zalogowac."}) | |
| def api_login() -> Any: | |
| payload = request.get_json(force=True) | |
| identifier_raw = (payload.get("email") or payload.get("login") or "").strip() | |
| password = (payload.get("password") or "").strip() | |
| if not identifier_raw: | |
| return jsonify({"error": "Podaj adres email."}), 400 | |
| data = load_store() | |
| accounts = get_accounts(data) | |
| login_key, account = find_account_identifier(accounts, identifier_raw) | |
| if not account: | |
| return jsonify({"error": "Nieprawidlowy email lub haslo."}), 401 | |
| stored_hash = account.get("password_hash") | |
| if not stored_hash: | |
| return jsonify({"error": "Konto nie zostalo jeszcze skonfigurowane."}), 400 | |
| if hash_password(password) != stored_hash: | |
| return jsonify({"error": "Nieprawidlowy email lub haslo."}), 401 | |
| token = create_token(login_key or (identifier_raw.lower())) | |
| display_email = account.get("email") or account.get("login") or identifier_raw | |
| return jsonify({"token": token, "login": account.get("login", display_email), "email": display_email}) | |
| def api_business() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| if request.method == "GET": | |
| return jsonify({"business": account.get("business")}) | |
| payload = request.get_json(force=True) | |
| current = account.get("business") or {} | |
| updated = { | |
| "company_name": (payload.get("company_name") or current.get("company_name") or "").strip(), | |
| "owner_name": (payload.get("owner_name") or current.get("owner_name") or "").strip(), | |
| "address_line": (payload.get("address_line") or current.get("address_line") or "").strip(), | |
| "postal_code": (payload.get("postal_code") or current.get("postal_code") or "").strip(), | |
| "city": (payload.get("city") or current.get("city") or "").strip(), | |
| "tax_id": (payload.get("tax_id") or current.get("tax_id") or "").strip(), | |
| "bank_account": (payload.get("bank_account") or current.get("bank_account") or "").strip(), | |
| } | |
| missing = [field for field, value in updated.items() if not value] | |
| if missing: | |
| return jsonify({"error": f"Wypelnij wszystkie pola: {', '.join(missing)}"}), 400 | |
| account["business"] = updated | |
| save_store(data) | |
| return jsonify({"business": updated}) | |
| def api_invoices() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| try: | |
| ensure_business_configured(account) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| if request.method == "GET": | |
| invoices = list(account.get("invoices", [])) | |
| start_param = request.args.get("start_date") | |
| end_param = request.args.get("end_date") | |
| start_date: Optional[date] = None | |
| end_date: Optional[date] = None | |
| if start_param: | |
| try: | |
| start_date = datetime.fromisoformat(start_param).date() | |
| except ValueError: | |
| return jsonify({"error": "Niepoprawny format daty poczatkowej (YYYY-MM-DD)."}), 400 | |
| if end_param: | |
| try: | |
| end_date = datetime.fromisoformat(end_param).date() | |
| except ValueError: | |
| return jsonify({"error": "Niepoprawny format daty koncowej (YYYY-MM-DD)."}), 400 | |
| if start_date and end_date and start_date > end_date: | |
| return jsonify({"error": "Data poczatkowa nie moze byc pozniejsza niz data koncowa."}), 400 | |
| def issued_at_to_datetime(issued_at: str) -> Optional[datetime]: | |
| try: | |
| return datetime.strptime(issued_at, "%Y-%m-%d %H:%M") | |
| except (TypeError, ValueError): | |
| return None | |
| filtered: List[Dict[str, Any]] = [] | |
| for invoice in invoices: | |
| issued_at_str = invoice.get("issued_at") | |
| issued_dt = issued_at_to_datetime(issued_at_str) | |
| if issued_dt is None: | |
| filtered.append(invoice) | |
| continue | |
| issued_date = issued_dt.date() | |
| if start_date and issued_date < start_date: | |
| continue | |
| if end_date and issued_date > end_date: | |
| continue | |
| filtered.append(invoice) | |
| filtered.sort(key=lambda item: item.get("issued_at", ""), reverse=True) | |
| return jsonify({"invoices": filtered}) | |
| payload = request.get_json(force=True) | |
| try: | |
| invoice = compute_invoice(payload, account["business"]) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoices = account.setdefault("invoices", []) | |
| invoices.append(invoice) | |
| if len(invoices) > INVOICE_HISTORY_LIMIT: | |
| account["invoices"] = invoices[-INVOICE_HISTORY_LIMIT:] | |
| save_store(data) | |
| return jsonify({"invoice": invoice}) | |
| def api_invoice_detail(invoice_id: str) -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| try: | |
| ensure_business_configured(account) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoices = account.setdefault("invoices", []) | |
| try: | |
| index = next(index for index, inv in enumerate(invoices) if inv.get("invoice_id") == invoice_id) | |
| except StopIteration: | |
| return jsonify({"error": "Nie znaleziono faktury."}), 404 | |
| current_invoice = invoices[index] | |
| if request.method == "GET": | |
| return jsonify({"invoice": current_invoice}) | |
| if request.method == "DELETE": | |
| invoices.pop(index) | |
| save_store(data) | |
| return jsonify({"message": "Faktura zostala usunieta."}) | |
| payload = request.get_json(force=True) | |
| try: | |
| updated_invoice = compute_invoice( | |
| payload, | |
| account["business"], | |
| invoice_id=current_invoice.get("invoice_id"), | |
| issued_at=current_invoice.get("issued_at"), | |
| ) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoices[index] = updated_invoice | |
| save_store(data) | |
| return jsonify({"invoice": updated_invoice}) | |
| def api_logo() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| if request.method == "GET": | |
| logo = account.get("logo") | |
| if not logo: | |
| return jsonify({"logo": None}) | |
| encoded = logo.get("data") | |
| mime_type = logo.get("mime_type") | |
| data_url = None | |
| if encoded and mime_type: | |
| data_url = f"data:{mime_type};base64,{encoded}" | |
| return jsonify( | |
| { | |
| "logo": { | |
| "filename": logo.get("filename"), | |
| "mime_type": mime_type, | |
| "data": encoded, | |
| "data_url": data_url, | |
| "uploaded_at": logo.get("uploaded_at"), | |
| } | |
| } | |
| ) | |
| if request.method == "DELETE": | |
| account["logo"] = None | |
| save_store(data) | |
| return jsonify({"message": "Logo zostalo usuniete."}) | |
| payload = request.get_json(force=True) | |
| raw_content = (payload.get("content") or payload.get("data") or "").strip() | |
| if not raw_content: | |
| return jsonify({"error": "Brak danych logo."}), 400 | |
| provided_mime = (payload.get("mime_type") or "").strip() | |
| filename = sanitize_filename(payload.get("filename")) | |
| if raw_content.startswith("data:"): | |
| try: | |
| header, encoded_content = raw_content.split(",", 1) | |
| except ValueError: | |
| return jsonify({"error": "Niepoprawny format danych logo."}), 400 | |
| header = header.strip() | |
| if ";base64" not in header: | |
| return jsonify({"error": "Niepoprawny format danych logo (oczekiwano base64)."}), 400 | |
| mime_type = header.split(";")[0].replace("data:", "", 1) or provided_mime | |
| base64_content = encoded_content.strip() | |
| else: | |
| mime_type = provided_mime | |
| base64_content = raw_content | |
| mime_type = (mime_type or "").lower() | |
| if mime_type not in ALLOWED_LOGO_MIME_TYPES: | |
| return jsonify({"error": "Dozwolone formaty logo to PNG lub JPG."}), 400 | |
| try: | |
| logo_bytes = base64.b64decode(base64_content, validate=True) | |
| except (ValueError, binascii.Error): | |
| return jsonify({"error": "Nie udalo sie odczytac danych logo (base64)."}), 400 | |
| if len(logo_bytes) > MAX_LOGO_SIZE: | |
| return jsonify({"error": f"Logo jest zbyt duze (maksymalnie {MAX_LOGO_SIZE // 1024} KB)."}), 400 | |
| stored_logo = { | |
| "filename": filename, | |
| "mime_type": mime_type, | |
| "data": base64.b64encode(logo_bytes).decode("ascii"), | |
| "uploaded_at": datetime.utcnow().isoformat(timespec="seconds"), | |
| } | |
| account["logo"] = stored_logo | |
| save_store(data) | |
| return jsonify({"logo": stored_logo}) | |
| def api_invoice_summary() -> Any: | |
| try: | |
| login_key = require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| try: | |
| account = get_account(data, login_key) | |
| except KeyError: | |
| return jsonify({"error": "Nie znaleziono konta."}), 404 | |
| try: | |
| ensure_business_configured(account) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| now = datetime.utcnow() | |
| last_month_start = now - timedelta(days=30) | |
| quarter_first_month = ((now.month - 1) // 3) * 3 + 1 | |
| quarter_start = now.replace(month=quarter_first_month, day=1, hour=0, minute=0, second=0, microsecond=0) | |
| year_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) | |
| def parse_issued_at(value: Optional[str]) -> Optional[datetime]: | |
| if not value: | |
| return None | |
| try: | |
| return datetime.strptime(value, "%Y-%m-%d %H:%M") | |
| except ValueError: | |
| return None | |
| def aggregate(start: datetime) -> Dict[str, Any]: | |
| count = 0 | |
| gross_total = Decimal("0.00") | |
| for invoice in account.get("invoices", []): | |
| issued_dt = parse_issued_at(invoice.get("issued_at")) | |
| if issued_dt is None or issued_dt < start: | |
| continue | |
| count += 1 | |
| gross_value = invoice.get("totals", {}).get("gross", "0") | |
| try: | |
| gross_total += _decimal(gross_value) | |
| except ValueError: | |
| continue | |
| return {"count": count, "gross_total": str(_quantize(gross_total))} | |
| summary = { | |
| "last_month": aggregate(last_month_start), | |
| "quarter": aggregate(quarter_start), | |
| "year": aggregate(year_start), | |
| } | |
| return jsonify({"summary": summary}) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "5000")) | |
| app.run(host="0.0.0.0", port=port, debug=True) | |