import base64 import binascii import hashlib import json import os import re import uuid from datetime import date, datetime, timedelta from decimal import Decimal, ROUND_HALF_UP, getcontext from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from flask import Flask, jsonify, request, send_from_directory APP_ROOT = Path(__file__).parent.resolve() DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) DATA_FILE = DATA_DIR / "web_invoice_store.json" INVOICE_HISTORY_LIMIT = 200 MAX_LOGO_SIZE = 512 * 1024 # 512 KB TOKEN_TTL = timedelta(hours=12) ALLOWED_LOGO_MIME_TYPES = {"image/png", "image/jpeg"} VAT_RATES: Dict[str, Optional[Decimal]] = { "23": Decimal("0.23"), "8": Decimal("0.08"), "5": Decimal("0.05"), "0": Decimal("0.00"), "ZW": None, "NP": None, } DEFAULT_UNIT = "szt." ALLOWED_UNITS = {"szt.", "godz."} PASSWORD_MIN_LENGTH = 4 SESSION_TOKENS: Dict[str, Dict[str, Any]] = {} ALLOWED_STATIC = { "index.html", "styles.css", "main.js", "favicon.ico", "Roboto-VariableFont_wdth,wght.ttf", } app = Flask(__name__, static_folder=str(APP_ROOT), static_url_path="") getcontext().prec = 10 def _quantize(value: Decimal) -> Decimal: return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) def _decimal(value: Any) -> Decimal: try: return Decimal(str(value)) except Exception as error: # pragma: no cover - defensive raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error def hash_password(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() EMAIL_PATTERN = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$") def normalize_email(raw_email: str) -> Tuple[str, str]: display_email = (raw_email or "").strip() if not display_email: raise ValueError("Email nie moze byc pusty.") if not EMAIL_PATTERN.fullmatch(display_email): raise ValueError("Podaj poprawny adres email.") return display_email.lower(), display_email def sanitize_filename(filename: Optional[str]) -> str: if not filename: return "logo" name = str(filename).split("/")[-1].split("\\")[-1] sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._") return sanitized or "logo" def find_account_identifier(accounts: Dict[str, Any], identifier: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: key = (identifier or "").strip().lower() if not key: return None, None account = accounts.get(key) if account: return key, account for login_key, candidate in accounts.items(): candidate_login = (candidate.get("login") or "").strip().lower() candidate_email = (candidate.get("email") or "").strip().lower() if key in {candidate_login, candidate_email}: return login_key, candidate return None, None def migrate_store_if_needed(data: Dict[str, Any]) -> Tuple[Dict[str, Any], bool]: if "accounts" in data: accounts = data.get("accounts") or {} for login, account in accounts.items(): account.setdefault("login", login) email = (account.get("email") or account.get("login") or "").strip() account["email"] = email account.setdefault("business", None) account.setdefault("password_hash", None) account.setdefault("invoices", []) account.setdefault("logo", None) account.setdefault("created_at", datetime.utcnow().isoformat(timespec="seconds")) data["accounts"] = accounts return data, False legacy_business = data.get("business") legacy_password = data.get("password_hash") legacy_invoices = data.get("invoices", []) legacy_logo = data.get("logo") accounts: Dict[str, Any] = {} legacy_login_hint = None if legacy_password: login_key = "admin" accounts[login_key] = { "login": login_key, "password_hash": legacy_password, "business": legacy_business, "invoices": legacy_invoices, "logo": legacy_logo, "created_at": datetime.utcnow().isoformat(timespec="seconds"), } legacy_login_hint = login_key migrated: Dict[str, Any] = {"accounts": accounts} if legacy_login_hint: migrated["legacy_login_hint"] = legacy_login_hint return migrated, True def load_store() -> Dict[str, Any]: if not DATA_FILE.exists(): return {"accounts": {}} with DATA_FILE.open("r", encoding="utf-8") as handle: data = json.load(handle) normalized, migrated = migrate_store_if_needed(data) if migrated: save_store(normalized) return normalized def save_store(data: Dict[str, Any]) -> None: DATA_FILE.parent.mkdir(parents=True, exist_ok=True) with DATA_FILE.open("w", encoding="utf-8") as handle: json.dump(data, handle, ensure_ascii=False, indent=2) def ensure_business_configured(account: Dict[str, Any]) -> None: if not account.get("business"): raise ValueError("Dane sprzedawcy nie zostaly uzupelnione.") def ensure_account_defaults(account: Dict[str, Any], login_key: str) -> Dict[str, Any]: account.setdefault("login", login_key) email_value = (account.get("email") or account.get("login") or "").strip() account["email"] = email_value account.setdefault("business", None) account.setdefault("password_hash", None) invoices = account.setdefault("invoices", []) if isinstance(invoices, list): for invoice in invoices: if not isinstance(invoice, dict): continue items = invoice.get("items") if not isinstance(items, list): continue for item in items: if not isinstance(item, dict): continue raw_quantity = str(item.get("quantity", "")).strip() try: quantity_decimal = _decimal(raw_quantity or "0") except ValueError: quantity_decimal = Decimal("0") if quantity_decimal > 0: quantity_integral = quantity_decimal.to_integral_value(rounding=ROUND_HALF_UP) item["quantity"] = str(int(quantity_integral)) unit_value = (item.get("unit") or "").strip() if unit_value not in ALLOWED_UNITS: unit_value = DEFAULT_UNIT item["unit"] = unit_value account.setdefault("logo", None) account.setdefault("created_at", datetime.utcnow().isoformat(timespec="seconds")) return account def get_accounts(data: Dict[str, Any]) -> Dict[str, Any]: accounts = data.setdefault("accounts", {}) for login_key, account in list(accounts.items()): accounts[login_key] = ensure_account_defaults(account, login_key) return accounts def get_account(data: Dict[str, Any], login_key: str) -> Dict[str, Any]: accounts = get_accounts(data) account = accounts.get(login_key) if not account: raise KeyError("Nie znaleziono konta.") return account def parse_iso_date(value: Optional[str]) -> Optional[str]: if not value: return None try: parsed = datetime.fromisoformat(value) return parsed.strftime("%Y-%m-%d") except ValueError: return None def compute_invoice_items(items_payload: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Decimal]]]: if not items_payload: raise ValueError("Dodaj przynajmniej jedna pozycje.") computed_items: List[Dict[str, Any]] = [] summary: Dict[str, Dict[str, Decimal]] = {} for raw in items_payload: name = (raw.get("name") or "").strip() if not name: raise ValueError("Kazda pozycja musi miec nazwe.") quantity_decimal = _decimal(raw.get("quantity", "0")) if quantity_decimal <= 0: raise ValueError("Ilosc musi byc wieksza od zera.") quantity_integral = quantity_decimal.to_integral_value(rounding=ROUND_HALF_UP) if quantity_decimal != quantity_integral: raise ValueError("Ilosc musi byc liczba calkowita.") quantity = int(quantity_integral) unit_raw = str(raw.get("unit", "") or DEFAULT_UNIT).strip() unit = unit_raw if unit_raw in ALLOWED_UNITS else None if unit is None: raise ValueError("Wybrano nieprawidlowa jednostke.") vat_code = str(raw.get("vat_code", "")).upper() if vat_code not in VAT_RATES: raise ValueError(f"Nieznana stawka VAT: {vat_code}") unit_price_gross = _decimal(raw.get("unit_price_gross", "0")) if unit_price_gross <= 0: raise ValueError("Cena brutto musi byc wieksza od zera.") rate = VAT_RATES[vat_code] if rate is None: unit_price_net = unit_price_gross vat_amount = Decimal("0.00") else: unit_price_net = unit_price_gross / (Decimal("1.00") + rate) vat_amount = unit_price_gross - unit_price_net unit_price_net = _quantize(unit_price_net) unit_price_gross = _quantize(unit_price_gross) quantity_decimal_value = Decimal(quantity) net_total = _quantize(unit_price_net * quantity_decimal_value) vat_amount_total = _quantize(vat_amount * quantity_decimal_value if rate is not None else Decimal("0.00")) gross_total = _quantize(unit_price_gross * quantity_decimal_value) vat_label = "ZW" if vat_code in {"ZW", "0"} else ("NP" if vat_code == "NP" else f"{vat_code}%") computed_items.append( { "name": name, "unit": unit, "quantity": str(quantity), "vat_code": vat_code, "vat_label": vat_label, "unit_price_net": str(unit_price_net), "unit_price_gross": str(unit_price_gross), "net_total": str(net_total), "vat_amount": str(vat_amount_total), "gross_total": str(gross_total), } ) summary_key = vat_label bucket = summary.setdefault(summary_key, {"net": Decimal("0.00"), "vat": Decimal("0.00"), "gross": Decimal("0.00")}) bucket["net"] += net_total bucket["vat"] += vat_amount_total bucket["gross"] += gross_total return computed_items, summary def computed_summary_to_serializable(summary: Dict[str, Dict[str, Decimal]]) -> List[Dict[str, str]]: serialized = [] for vat_label, values in summary.items(): serialized.append( { "vat_label": vat_label, "net_total": str(_quantize(values["net"])), "vat_total": str(_quantize(values["vat"])), "gross_total": str(_quantize(values["gross"])), } ) serialized.sort(key=lambda item: item["vat_label"]) return serialized def compute_invoice(payload: Dict[str, Any], business: Dict[str, Any], *, invoice_id: Optional[str] = None, issued_at: Optional[str] = None) -> Dict[str, Any]: items_payload = payload.get("items", []) computed_items, summary = compute_invoice_items(items_payload) net_sum = sum(Decimal(item["net_total"]) for item in computed_items) vat_sum = sum(Decimal(item["vat_amount"]) for item in computed_items) gross_sum = sum(Decimal(item["gross_total"]) for item in computed_items) issued_timestamp = datetime.now() if issued_at: try: issued_timestamp = datetime.strptime(issued_at, "%Y-%m-%d %H:%M") except ValueError: issued_timestamp = datetime.now() generated_id = invoice_id or issued_timestamp.strftime("FV-%Y%m%d-%H%M%S") sale_date = parse_iso_date(payload.get("sale_date")) or issued_timestamp.strftime("%Y-%m-%d") payment_term = payload.get("payment_term") client_payload = payload.get("client") or {} client = { "name": (client_payload.get("name") or "").strip(), "address_line": (client_payload.get("address_line") or "").strip(), "postal_code": (client_payload.get("postal_code") or "").strip(), "city": (client_payload.get("city") or "").strip(), "tax_id": (client_payload.get("tax_id") or "").strip(), "phone": (client_payload.get("phone") or "").strip(), } invoice = { "invoice_id": generated_id, "issued_at": issued_timestamp.strftime("%Y-%m-%d %H:%M"), "sale_date": sale_date, "payment_term": payment_term, "items": computed_items, "summary": computed_summary_to_serializable(summary), "totals": { "net": str(_quantize(net_sum)), "vat": str(_quantize(vat_sum)), "gross": str(_quantize(gross_sum)), }, "client": client, "exemption_note": (payload.get("exemption_note") or "").strip(), } return invoice def cleanup_tokens() -> None: now = datetime.utcnow() expired = [token for token, payload in SESSION_TOKENS.items() if now - payload["issued_at"] > TOKEN_TTL] for token in expired: SESSION_TOKENS.pop(token, None) def create_token(login: str) -> str: cleanup_tokens() token = uuid.uuid4().hex SESSION_TOKENS[token] = {"login": login, "issued_at": datetime.utcnow()} return token def get_token() -> Optional[str]: header = request.headers.get("Authorization", "") if not header.startswith("Bearer "): return None return header.split(" ", 1)[1].strip() def require_auth() -> str: cleanup_tokens() token = get_token() if not token: raise PermissionError("Brak autoryzacji.") payload = SESSION_TOKENS.get(token) if not payload: raise PermissionError("Brak autoryzacji.") payload["issued_at"] = datetime.utcnow() return payload["login"] @app.route("/") def serve_index() -> Any: return send_from_directory(app.static_folder, "index.html") @app.route("/") def serve_static(path: str) -> Any: if path not in ALLOWED_STATIC: return jsonify({"error": "Nie znaleziono pliku."}), 404 target = Path(app.static_folder) / path if target.is_file(): return send_from_directory(app.static_folder, path) return jsonify({"error": "Nie znaleziono pliku."}), 404 @app.route("/api/status", methods=["GET"]) def api_status() -> Any: data = load_store() accounts = get_accounts(data) response = { "configured": bool(accounts), "legacy_login_hint": data.get("legacy_login_hint"), "max_logo_size": MAX_LOGO_SIZE, } return jsonify(response) @app.route("/api/setup", methods=["POST"]) def api_setup() -> Any: data = load_store() payload = request.get_json(force=True) try: email_key, display_email = normalize_email(payload.get("email", "")) except ValueError as error: return jsonify({"error": str(error)}), 400 accounts = get_accounts(data) existing_key, existing_account = find_account_identifier(accounts, display_email) if existing_key and existing_account: return jsonify({"error": "Podany adres email jest juz zajety."}), 400 required_fields = [ "company_name", "owner_name", "address_line", "postal_code", "city", "tax_id", "bank_account", ] missing = [field for field in required_fields if not (payload.get(field) or "").strip()] if missing: return jsonify({"error": f"Brakuje pol: {', '.join(missing)}"}), 400 password = (payload.get("password") or "").strip() if len(password) < PASSWORD_MIN_LENGTH: return jsonify({"error": f"Haslo musi miec co najmniej {PASSWORD_MIN_LENGTH} znakow."}), 400 new_account = { "login": display_email, "email": display_email, "password_hash": hash_password(password), "business": { "company_name": payload["company_name"].strip(), "owner_name": payload["owner_name"].strip(), "address_line": payload["address_line"].strip(), "postal_code": payload["postal_code"].strip(), "city": payload["city"].strip(), "tax_id": payload["tax_id"].strip(), "bank_account": payload["bank_account"].strip(), }, "invoices": [], "logo": None, "created_at": datetime.utcnow().isoformat(timespec="seconds"), } accounts[email_key] = new_account data.pop("legacy_login_hint", None) save_store(data) return jsonify({"message": "Konto utworzone. Mozesz sie zalogowac."}) @app.route("/api/login", methods=["POST"]) def api_login() -> Any: payload = request.get_json(force=True) identifier_raw = (payload.get("email") or payload.get("login") or "").strip() password = (payload.get("password") or "").strip() if not identifier_raw: return jsonify({"error": "Podaj adres email."}), 400 data = load_store() accounts = get_accounts(data) login_key, account = find_account_identifier(accounts, identifier_raw) if not account: return jsonify({"error": "Nieprawidlowy email lub haslo."}), 401 stored_hash = account.get("password_hash") if not stored_hash: return jsonify({"error": "Konto nie zostalo jeszcze skonfigurowane."}), 400 if hash_password(password) != stored_hash: return jsonify({"error": "Nieprawidlowy email lub haslo."}), 401 token = create_token(login_key or (identifier_raw.lower())) display_email = account.get("email") or account.get("login") or identifier_raw return jsonify({"token": token, "login": account.get("login", display_email), "email": display_email}) @app.route("/api/business", methods=["GET", "PUT"]) def api_business() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 if request.method == "GET": return jsonify({"business": account.get("business")}) payload = request.get_json(force=True) current = account.get("business") or {} updated = { "company_name": (payload.get("company_name") or current.get("company_name") or "").strip(), "owner_name": (payload.get("owner_name") or current.get("owner_name") or "").strip(), "address_line": (payload.get("address_line") or current.get("address_line") or "").strip(), "postal_code": (payload.get("postal_code") or current.get("postal_code") or "").strip(), "city": (payload.get("city") or current.get("city") or "").strip(), "tax_id": (payload.get("tax_id") or current.get("tax_id") or "").strip(), "bank_account": (payload.get("bank_account") or current.get("bank_account") or "").strip(), } missing = [field for field, value in updated.items() if not value] if missing: return jsonify({"error": f"Wypelnij wszystkie pola: {', '.join(missing)}"}), 400 account["business"] = updated save_store(data) return jsonify({"business": updated}) @app.route("/api/invoices", methods=["POST", "GET"]) def api_invoices() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 try: ensure_business_configured(account) except ValueError as error: return jsonify({"error": str(error)}), 400 if request.method == "GET": invoices = list(account.get("invoices", [])) start_param = request.args.get("start_date") end_param = request.args.get("end_date") start_date: Optional[date] = None end_date: Optional[date] = None if start_param: try: start_date = datetime.fromisoformat(start_param).date() except ValueError: return jsonify({"error": "Niepoprawny format daty poczatkowej (YYYY-MM-DD)."}), 400 if end_param: try: end_date = datetime.fromisoformat(end_param).date() except ValueError: return jsonify({"error": "Niepoprawny format daty koncowej (YYYY-MM-DD)."}), 400 if start_date and end_date and start_date > end_date: return jsonify({"error": "Data poczatkowa nie moze byc pozniejsza niz data koncowa."}), 400 def issued_at_to_datetime(issued_at: str) -> Optional[datetime]: try: return datetime.strptime(issued_at, "%Y-%m-%d %H:%M") except (TypeError, ValueError): return None filtered: List[Dict[str, Any]] = [] for invoice in invoices: issued_at_str = invoice.get("issued_at") issued_dt = issued_at_to_datetime(issued_at_str) if issued_dt is None: filtered.append(invoice) continue issued_date = issued_dt.date() if start_date and issued_date < start_date: continue if end_date and issued_date > end_date: continue filtered.append(invoice) filtered.sort(key=lambda item: item.get("issued_at", ""), reverse=True) return jsonify({"invoices": filtered}) payload = request.get_json(force=True) try: invoice = compute_invoice(payload, account["business"]) except ValueError as error: return jsonify({"error": str(error)}), 400 invoices = account.setdefault("invoices", []) invoices.append(invoice) if len(invoices) > INVOICE_HISTORY_LIMIT: account["invoices"] = invoices[-INVOICE_HISTORY_LIMIT:] save_store(data) return jsonify({"invoice": invoice}) @app.route("/api/invoices/", methods=["GET", "PUT", "DELETE"]) def api_invoice_detail(invoice_id: str) -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 try: ensure_business_configured(account) except ValueError as error: return jsonify({"error": str(error)}), 400 invoices = account.setdefault("invoices", []) try: index = next(index for index, inv in enumerate(invoices) if inv.get("invoice_id") == invoice_id) except StopIteration: return jsonify({"error": "Nie znaleziono faktury."}), 404 current_invoice = invoices[index] if request.method == "GET": return jsonify({"invoice": current_invoice}) if request.method == "DELETE": invoices.pop(index) save_store(data) return jsonify({"message": "Faktura zostala usunieta."}) payload = request.get_json(force=True) try: updated_invoice = compute_invoice( payload, account["business"], invoice_id=current_invoice.get("invoice_id"), issued_at=current_invoice.get("issued_at"), ) except ValueError as error: return jsonify({"error": str(error)}), 400 invoices[index] = updated_invoice save_store(data) return jsonify({"invoice": updated_invoice}) @app.route("/api/logo", methods=["GET", "POST", "DELETE"]) def api_logo() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 if request.method == "GET": logo = account.get("logo") if not logo: return jsonify({"logo": None}) encoded = logo.get("data") mime_type = logo.get("mime_type") data_url = None if encoded and mime_type: data_url = f"data:{mime_type};base64,{encoded}" return jsonify( { "logo": { "filename": logo.get("filename"), "mime_type": mime_type, "data": encoded, "data_url": data_url, "uploaded_at": logo.get("uploaded_at"), } } ) if request.method == "DELETE": account["logo"] = None save_store(data) return jsonify({"message": "Logo zostalo usuniete."}) payload = request.get_json(force=True) raw_content = (payload.get("content") or payload.get("data") or "").strip() if not raw_content: return jsonify({"error": "Brak danych logo."}), 400 provided_mime = (payload.get("mime_type") or "").strip() filename = sanitize_filename(payload.get("filename")) if raw_content.startswith("data:"): try: header, encoded_content = raw_content.split(",", 1) except ValueError: return jsonify({"error": "Niepoprawny format danych logo."}), 400 header = header.strip() if ";base64" not in header: return jsonify({"error": "Niepoprawny format danych logo (oczekiwano base64)."}), 400 mime_type = header.split(";")[0].replace("data:", "", 1) or provided_mime base64_content = encoded_content.strip() else: mime_type = provided_mime base64_content = raw_content mime_type = (mime_type or "").lower() if mime_type not in ALLOWED_LOGO_MIME_TYPES: return jsonify({"error": "Dozwolone formaty logo to PNG lub JPG."}), 400 try: logo_bytes = base64.b64decode(base64_content, validate=True) except (ValueError, binascii.Error): return jsonify({"error": "Nie udalo sie odczytac danych logo (base64)."}), 400 if len(logo_bytes) > MAX_LOGO_SIZE: return jsonify({"error": f"Logo jest zbyt duze (maksymalnie {MAX_LOGO_SIZE // 1024} KB)."}), 400 stored_logo = { "filename": filename, "mime_type": mime_type, "data": base64.b64encode(logo_bytes).decode("ascii"), "uploaded_at": datetime.utcnow().isoformat(timespec="seconds"), } account["logo"] = stored_logo save_store(data) return jsonify({"logo": stored_logo}) @app.route("/api/invoices/summary", methods=["GET"]) def api_invoice_summary() -> Any: try: login_key = require_auth() except PermissionError: return jsonify({"error": "Brak autoryzacji."}), 401 data = load_store() try: account = get_account(data, login_key) except KeyError: return jsonify({"error": "Nie znaleziono konta."}), 404 try: ensure_business_configured(account) except ValueError as error: return jsonify({"error": str(error)}), 400 now = datetime.utcnow() last_month_start = now - timedelta(days=30) quarter_first_month = ((now.month - 1) // 3) * 3 + 1 quarter_start = now.replace(month=quarter_first_month, day=1, hour=0, minute=0, second=0, microsecond=0) year_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0) def parse_issued_at(value: Optional[str]) -> Optional[datetime]: if not value: return None try: return datetime.strptime(value, "%Y-%m-%d %H:%M") except ValueError: return None def aggregate(start: datetime) -> Dict[str, Any]: count = 0 gross_total = Decimal("0.00") for invoice in account.get("invoices", []): issued_dt = parse_issued_at(invoice.get("issued_at")) if issued_dt is None or issued_dt < start: continue count += 1 gross_value = invoice.get("totals", {}).get("gross", "0") try: gross_total += _decimal(gross_value) except ValueError: continue return {"count": count, "gross_total": str(_quantize(gross_total))} summary = { "last_month": aggregate(last_month_start), "quarter": aggregate(quarter_start), "year": aggregate(year_start), } return jsonify({"summary": summary}) if __name__ == "__main__": port = int(os.environ.get("PORT", "5000")) app.run(host="0.0.0.0", port=port, debug=True)