Spaces:
Sleeping
Sleeping
| import hashlib | |
| import json | |
| import os | |
| from flask import Flask, render_template, request, redirect, url_for | |
| from sqlalchemy import create_engine, text | |
| from flask import render_template | |
| app = Flask(__name__) | |
| engine = create_engine(os.environ["DATABASE_URL"], pool_pre_ping=True) | |
| import uuid | |
| from datetime import datetime | |
| from decimal import Decimal, ROUND_HALF_UP, getcontext | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| from flask import Flask, jsonify, request, send_from_directory | |
| APP_ROOT = Path(__file__).parent.resolve() | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) | |
| DATA_FILE = DATA_DIR / "web_invoice_store.json" | |
| INVOICE_HISTORY_LIMIT = 200 | |
| VAT_RATES: Dict[str, Optional[Decimal]] = { | |
| "23": Decimal("0.23"), | |
| "8": Decimal("0.08"), | |
| "5": Decimal("0.05"), | |
| "0": Decimal("0.00"), | |
| "ZW": None, | |
| "NP": None, | |
| } | |
| SESSION_TOKENS: Dict[str, datetime] = {} | |
| ALLOWED_STATIC = { | |
| "index.html", | |
| "styles.css", | |
| "main.js", | |
| "favicon.ico", | |
| "Roboto-VariableFont_wdth,wght.ttf", | |
| } | |
| def index(): | |
| # Pobierz ostatnie notatki i pokaż w HTML | |
| with engine.begin() as conn: | |
| rows = conn.execute(text( | |
| "SELECT id, body, created_at FROM notes ORDER BY id DESC LIMIT 20" | |
| )).mappings().all() | |
| return render_template("index.html", notes=rows) | |
| if __name__ == "__main__": # Sprawdzamy, czy uruchamiamy główny skrypt | |
| port = int(os.environ.get("PORT", "7860")) # Wcięcie pod warunkiem | |
| app.run(host="0.0.0.0", port=port, debug=False) # Wcięcie pod warunkiem | |
| def add(): | |
| body = request.form.get("body","").strip() | |
| if body: | |
| with engine.begin() as conn: | |
| conn.execute(text("INSERT INTO notes (body) VALUES (:b)"), {"b": body}) | |
| return redirect(url_for("index")) | |
| getcontext().prec = 10 | |
| def _quantize(value: Decimal) -> Decimal: | |
| return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP) | |
| def _decimal(value: Any) -> Decimal: | |
| try: | |
| return Decimal(str(value)) | |
| except Exception as error: # pragma: no cover - defensive | |
| raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error | |
| def hash_password(password: str) -> str: | |
| return hashlib.sha256(password.encode("utf-8")).hexdigest() | |
| def load_store() -> Dict[str, Any]: | |
| if not DATA_FILE.exists(): | |
| return {"business": None, "password_hash": None, "invoices": []} | |
| with DATA_FILE.open("r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| def save_store(data: Dict[str, Any]) -> None: | |
| DATA_FILE.parent.mkdir(parents=True, exist_ok=True) | |
| with DATA_FILE.open("w", encoding="utf-8") as handle: | |
| json.dump(data, handle, ensure_ascii=False, indent=2) | |
| def ensure_configured(data: Dict[str, Any]) -> None: | |
| if not data.get("business") or not data.get("password_hash"): | |
| raise ValueError("Aplikacja nie zostala skonfigurowana.") | |
| def parse_iso_date(value: Optional[str]) -> Optional[str]: | |
| if not value: | |
| return None | |
| try: | |
| parsed = datetime.fromisoformat(value) | |
| return parsed.strftime("%Y-%m-%d") | |
| except ValueError: | |
| return None | |
| def compute_invoice_items(items_payload: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict[str, Dict[str, Decimal]]]: | |
| if not items_payload: | |
| raise ValueError("Dodaj przynajmniej jedna pozycje.") | |
| computed_items: List[Dict[str, Any]] = [] | |
| summary: Dict[str, Dict[str, Decimal]] = {} | |
| for raw in items_payload: | |
| name = (raw.get("name") or "").strip() | |
| if not name: | |
| raise ValueError("Kazda pozycja musi miec nazwe.") | |
| quantity = _decimal(raw.get("quantity", "0")) | |
| if quantity <= 0: | |
| raise ValueError("Ilosc musi byc wieksza od zera.") | |
| vat_code = str(raw.get("vat_code", "")).upper() | |
| if vat_code not in VAT_RATES: | |
| raise ValueError(f"Nieznana stawka VAT: {vat_code}") | |
| unit_price_gross = _decimal(raw.get("unit_price_gross", "0")) | |
| if unit_price_gross <= 0: | |
| raise ValueError("Cena brutto musi byc wieksza od zera.") | |
| rate = VAT_RATES[vat_code] | |
| if rate is None: | |
| unit_price_net = unit_price_gross | |
| vat_amount = Decimal("0.00") | |
| else: | |
| unit_price_net = unit_price_gross / (Decimal("1.00") + rate) | |
| vat_amount = unit_price_gross - unit_price_net | |
| unit_price_net = _quantize(unit_price_net) | |
| unit_price_gross = _quantize(unit_price_gross) | |
| net_total = _quantize(unit_price_net * quantity) | |
| vat_amount_total = _quantize(vat_amount * quantity if rate is not None else Decimal("0.00")) | |
| gross_total = _quantize(unit_price_gross * quantity) | |
| vat_label = "ZW" if vat_code in {"ZW", "0"} else ("NP" if vat_code == "NP" else f"{vat_code}%") | |
| computed_items.append( | |
| { | |
| "name": name, | |
| "quantity": str(_quantize(quantity)), | |
| "vat_code": vat_code, | |
| "vat_label": vat_label, | |
| "unit_price_net": str(unit_price_net), | |
| "unit_price_gross": str(unit_price_gross), | |
| "net_total": str(net_total), | |
| "vat_amount": str(vat_amount_total), | |
| "gross_total": str(gross_total), | |
| } | |
| ) | |
| summary_key = vat_label | |
| bucket = summary.setdefault(summary_key, {"net": Decimal("0.00"), "vat": Decimal("0.00"), "gross": Decimal("0.00")}) | |
| bucket["net"] += net_total | |
| bucket["vat"] += vat_amount_total | |
| bucket["gross"] += gross_total | |
| return computed_items, summary | |
| def computed_summary_to_serializable(summary: Dict[str, Dict[str, Decimal]]) -> List[Dict[str, str]]: | |
| serialized = [] | |
| for vat_label, values in summary.items(): | |
| serialized.append( | |
| { | |
| "vat_label": vat_label, | |
| "net_total": str(_quantize(values["net"])), | |
| "vat_total": str(_quantize(values["vat"])), | |
| "gross_total": str(_quantize(values["gross"])), | |
| } | |
| ) | |
| serialized.sort(key=lambda item: item["vat_label"]) | |
| return serialized | |
| def compute_invoice(payload: Dict[str, Any], business: Dict[str, Any]) -> Dict[str, Any]: | |
| items_payload = payload.get("items", []) | |
| computed_items, summary = compute_invoice_items(items_payload) | |
| net_sum = sum(Decimal(item["net_total"]) for item in computed_items) | |
| vat_sum = sum(Decimal(item["vat_amount"]) for item in computed_items) | |
| gross_sum = sum(Decimal(item["gross_total"]) for item in computed_items) | |
| issued_at = datetime.now() | |
| invoice_id = issued_at.strftime("FV-%Y%m%d-%H%M%S") | |
| sale_date = parse_iso_date(payload.get("sale_date")) or issued_at.strftime("%Y-%m-%d") | |
| client_payload = payload.get("client") or {} | |
| client = { | |
| "name": (client_payload.get("name") or "").strip(), | |
| "address_line": (client_payload.get("address_line") or "").strip(), | |
| "postal_code": (client_payload.get("postal_code") or "").strip(), | |
| "city": (client_payload.get("city") or "").strip(), | |
| "tax_id": (client_payload.get("tax_id") or "").strip(), | |
| } | |
| invoice = { | |
| "invoice_id": invoice_id, | |
| "issued_at": issued_at.strftime("%Y-%m-%d %H:%M"), | |
| "sale_date": sale_date, | |
| "items": computed_items, | |
| "summary": computed_summary_to_serializable(summary), | |
| "totals": { | |
| "net": str(_quantize(net_sum)), | |
| "vat": str(_quantize(vat_sum)), | |
| "gross": str(_quantize(gross_sum)), | |
| }, | |
| "client": client, | |
| "exemption_note": (payload.get("exemption_note") or "").strip(), | |
| } | |
| return invoice | |
| def create_token() -> str: | |
| token = uuid.uuid4().hex | |
| SESSION_TOKENS[token] = datetime.now() | |
| return token | |
| def get_token() -> Optional[str]: | |
| header = request.headers.get("Authorization", "") | |
| if not header.startswith("Bearer "): | |
| return None | |
| return header.split(" ", 1)[1].strip() | |
| def require_auth() -> str: | |
| token = get_token() | |
| if not token or token not in SESSION_TOKENS: | |
| raise PermissionError("Brak autoryzacji.") | |
| return token | |
| def serve_static(path: str) -> Any: | |
| if path not in ALLOWED_STATIC: | |
| return jsonify({"error": "Nie znaleziono pliku."}), 404 | |
| target = Path(app.static_folder) / path | |
| if target.is_file(): | |
| return send_from_directory(app.static_folder, path) | |
| return jsonify({"error": "Nie znaleziono pliku."}), 404 | |
| def api_status() -> Any: | |
| data = load_store() | |
| configured = bool(data.get("business") and data.get("password_hash")) | |
| return jsonify({"configured": configured}) | |
| def api_setup() -> Any: | |
| data = load_store() | |
| if data.get("password_hash"): | |
| return jsonify({"error": "Aplikacja zostala juz skonfigurowana."}), 400 | |
| payload = request.get_json(force=True) | |
| required_fields = [ | |
| "company_name", | |
| "owner_name", | |
| "address_line", | |
| "postal_code", | |
| "city", | |
| "tax_id", | |
| "bank_account", | |
| "password", | |
| ] | |
| missing = [field for field in required_fields if not (payload.get(field) or "").strip()] | |
| if missing: | |
| return jsonify({"error": f"Brakuje pol: {', '.join(missing)}"}), 400 | |
| if len(payload["password"]) < 4: | |
| return jsonify({"error": "Haslo musi miec co najmniej 4 znaki."}), 400 | |
| data["business"] = { | |
| "company_name": payload["company_name"].strip(), | |
| "owner_name": payload["owner_name"].strip(), | |
| "address_line": payload["address_line"].strip(), | |
| "postal_code": payload["postal_code"].strip(), | |
| "city": payload["city"].strip(), | |
| "tax_id": payload["tax_id"].strip(), | |
| "bank_account": payload["bank_account"].strip(), | |
| } | |
| data["password_hash"] = hash_password(payload["password"]) | |
| data.setdefault("invoices", []) | |
| save_store(data) | |
| return jsonify({"message": "Dane zapisane. Mozesz sie zalogowac."}) | |
| def api_login() -> Any: | |
| payload = request.get_json(force=True) | |
| password = (payload.get("password") or "").strip() | |
| data = load_store() | |
| if not data.get("password_hash"): | |
| return jsonify({"error": "Aplikacja nie zostala skonfigurowana."}), 400 | |
| if hash_password(password) != data["password_hash"]: | |
| return jsonify({"error": "Nieprawidlowe haslo."}), 401 | |
| token = create_token() | |
| return jsonify({"token": token}) | |
| def api_business() -> Any: | |
| try: | |
| require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| if request.method == "GET": | |
| ensure_configured(data) | |
| return jsonify({"business": data["business"]}) | |
| payload = request.get_json(force=True) | |
| current = data.get("business") or {} | |
| updated = { | |
| "company_name": (payload.get("company_name") or current.get("company_name") or "").strip(), | |
| "owner_name": (payload.get("owner_name") or current.get("owner_name") or "").strip(), | |
| "address_line": (payload.get("address_line") or current.get("address_line") or "").strip(), | |
| "postal_code": (payload.get("postal_code") or current.get("postal_code") or "").strip(), | |
| "city": (payload.get("city") or current.get("city") or "").strip(), | |
| "tax_id": (payload.get("tax_id") or current.get("tax_id") or "").strip(), | |
| "bank_account": (payload.get("bank_account") or current.get("bank_account") or "").strip(), | |
| } | |
| missing = [field for field, value in updated.items() if not value] | |
| if missing: | |
| return jsonify({"error": f"Wypelnij wszystkie pola: {', '.join(missing)}"}), 400 | |
| data["business"] = updated | |
| save_store(data) | |
| return jsonify({"business": updated}) | |
| def api_invoices() -> Any: | |
| try: | |
| require_auth() | |
| except PermissionError: | |
| return jsonify({"error": "Brak autoryzacji."}), 401 | |
| data = load_store() | |
| ensure_configured(data) | |
| if request.method == "GET": | |
| return jsonify({"invoices": data.get("invoices", [])}) | |
| payload = request.get_json(force=True) | |
| try: | |
| invoice = compute_invoice(payload, data["business"]) | |
| except ValueError as error: | |
| return jsonify({"error": str(error)}), 400 | |
| invoices = data.setdefault("invoices", []) | |
| invoices.append(invoice) | |
| if len(invoices) > INVOICE_HISTORY_LIMIT: | |
| data["invoices"] = invoices[-INVOICE_HISTORY_LIMIT:] | |
| save_store(data) | |
| return jsonify({"invoice": invoice}) | |
| if __name__ == "__main__": | |
| port = int(os.environ.get("PORT", "7860")) | |
| app.run(host="0.0.0.0", port=port, debug=True) | |
| DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT)) | |
| DATA_FILE = DATA_DIR / "web_invoice_store.json" | |