Test_DB / server.py
Antoni09's picture
Upload server.py
40beee2 verified
import base64
import binascii
import hashlib
import json
import os
import re
import uuid
from datetime import date, datetime, timedelta, timezone
from decimal import Decimal, ROUND_HALF_UP, getcontext
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from flask import Flask, jsonify, request, send_from_directory
from db import (
create_account,
execute,
fetch_all,
fetch_one,
fetch_business_logo,
insert_invoice,
search_clients,
update_business,
update_business_logo,
upsert_client,
)
APP_ROOT = Path(__file__).parent.resolve()
DATA_DIR = Path(os.environ.get("DATA_DIR", APP_ROOT))
DATA_FILE = DATA_DIR / "web_invoice_store.json"
INVOICE_HISTORY_LIMIT = 200
MAX_LOGO_SIZE = 512 * 1024 # 512 KB
TOKEN_TTL = timedelta(hours=12)
ALLOWED_LOGO_MIME_TYPES = {"image/png", "image/jpeg"}
DATABASE_AVAILABLE = bool(os.environ.get("NEON_DATABASE_URL"))
VAT_RATES: Dict[str, Optional[Decimal]] = {
"23": Decimal("0.23"),
"8": Decimal("0.08"),
"5": Decimal("0.05"),
"0": Decimal("0.00"),
"ZW": None,
"NP": None,
}
DEFAULT_UNIT = "szt."
ALLOWED_UNITS = {"szt.", "godz."}
PASSWORD_MIN_LENGTH = 4
SESSION_TOKENS: Dict[str, Dict[str, Any]] = {}
ALLOWED_STATIC = {
"index.html",
"styles.css",
"main.js",
"favicon.ico",
"Roboto-VariableFont_wdth,wght.ttf",
}
app = Flask(__name__, static_folder=str(APP_ROOT), static_url_path="")
getcontext().prec = 10
def _quantize(value: Decimal) -> Decimal:
return value.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
def _decimal(value: Any) -> Decimal:
try:
return Decimal(str(value))
except Exception as error: # pragma: no cover - defensive
raise ValueError(f"Niepoprawna wartosc liczby: {value}") from error
def _format_decimal_str(value: Any, default: str = "0.00") -> str:
if value in (None, ""):
return default
if isinstance(value, Decimal):
return str(_quantize(value))
try:
return str(_quantize(Decimal(str(value))))
except Exception:
return str(value)
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
EMAIL_PATTERN = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$")
def normalize_email(raw_email: str) -> Tuple[str, str]:
display_email = (raw_email or "").strip()
if not display_email:
raise ValueError("Email nie moze byc pusty.")
if not EMAIL_PATTERN.fullmatch(display_email):
raise ValueError("Podaj poprawny adres email.")
return display_email.lower(), display_email
def sanitize_filename(filename: Optional[str]) -> str:
if not filename:
return "logo"
name = str(filename).split("/")[-1].split("\\")[-1]
sanitized = re.sub(r"[^A-Za-z0-9._-]", "_", name).strip("._")
return sanitized or "logo"
def find_account_identifier(accounts: Dict[str, Any], identifier: str) -> Tuple[Optional[str], Optional[Dict[str, Any]]]:
key = (identifier or "").strip().lower()
if not key:
return None, None
account = accounts.get(key)
if account:
return key, account
for login_key, candidate in accounts.items():
candidate_login = (candidate.get("login") or "").strip().lower()
candidate_email = (candidate.get("email") or "").strip().lower()
if key in {candidate_login, candidate_email}:
return login_key, candidate
return None, None
def load_store() -> Dict[str, Any]:
if not DATA_FILE.exists():
return {"accounts": {}}
try:
with DATA_FILE.open("r", encoding="utf-8") as handle:
data = json.load(handle)
except json.JSONDecodeError:
raise ValueError("Plik z danymi jest uszkodzony.")
return data
def save_store(data: Dict[str, Any]) -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
tmp_path = DATA_FILE.with_suffix(".tmp")
with tmp_path.open("w", encoding="utf-8") as handle:
json.dump(data, handle, ensure_ascii=False, indent=2)
tmp_path.replace(DATA_FILE)
def get_account(data: Dict[str, Any], login_key: str) -> Dict[str, Any]:
accounts = data.get("accounts") or {}
account = accounts.get(login_key)
if not account:
raise KeyError("Nie znaleziono konta.")
return account
def get_account_row(login_key: str) -> Dict[str, Any]:
row = fetch_one("SELECT id, login FROM accounts WHERE login = %s", (login_key,))
if not row:
raise KeyError("Nie znaleziono konta.")
return row
def get_business_profile(account_id: int) -> Optional[Dict[str, Any]]:
return fetch_one(
"""
SELECT company_name, owner_name, address_line, postal_code,
city, tax_id, bank_account
FROM business_profiles
WHERE account_id = %s
""",
(account_id,),
)
def require_auth() -> str:
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise PermissionError("Brak tokenu.")
token = auth_header.split(" ", 1)[1].strip()
session = SESSION_TOKENS.get(token)
if not session:
raise PermissionError("Nieprawidlowy token.")
if session["expires_at"] < datetime.utcnow():
SESSION_TOKENS.pop(token, None)
raise PermissionError("Token wygasl.")
return session["login_key"]
@app.route("/")
def index() -> Any:
return send_from_directory(app.static_folder, "index.html")
@app.route("/<path:filename>")
def static_files(filename: str) -> Any:
if filename not in ALLOWED_STATIC:
return jsonify({"error": "Nie ma takiego zasobu."}), 404
return send_from_directory(app.static_folder, filename)
@app.route("/api/register", methods=["POST"])
def api_register() -> Any:
payload = request.get_json(force=True)
email = payload.get("email")
password = payload.get("password")
confirm = payload.get("confirm_password")
business_fields = [
"company_name",
"owner_name",
"address_line",
"postal_code",
"city",
"tax_id",
"bank_account",
]
business_data: Dict[str, str] = {}
for field in business_fields:
value = (payload.get(field) or "").strip()
if not value:
return jsonify({"error": f"Pole {field} jest wymagane."}), 400
business_data[field] = value
if password != confirm:
return jsonify({"error": "Hasla musza byc identyczne."}), 400
if len(password or "") < PASSWORD_MIN_LENGTH:
return jsonify({"error": "Haslo jest za krotkie."}), 400
login_key, display_email = normalize_email(email)
password_hash = hash_password(password)
if DATABASE_AVAILABLE:
if fetch_one("SELECT 1 FROM accounts WHERE login = %s", (login_key,)):
return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400
account_id = create_account(login_key, display_email, password_hash)
update_business(account_id, business_data)
return jsonify({"message": "Konto zostalo utworzone."})
data = load_store()
accounts = data.setdefault("accounts", {})
if login_key in accounts:
return jsonify({"error": "Konto o podanym emailu juz istnieje."}), 400
accounts[login_key] = {
"login": login_key,
"email": display_email,
"password_hash": password_hash,
"business": business_data,
"invoices": [],
"logo": None,
"created_at": datetime.utcnow().isoformat(timespec="seconds"),
}
save_store(data)
return jsonify({"message": "Konto zostalo utworzone."})
@app.route("/api/login", methods=["POST"])
def api_login() -> Any:
payload = request.get_json(force=True)
identifier = payload.get("identifier") or payload.get("email")
password = payload.get("password")
if not identifier or not password:
return jsonify({"error": "Podaj email/login i haslo."}), 400
login_key, _ = normalize_email(identifier)
if DATABASE_AVAILABLE:
row = fetch_one(
"SELECT id, password_hash FROM accounts WHERE login = %s",
(login_key,),
)
if not row or row["password_hash"] != hash_password(password):
return jsonify({"error": "Niepoprawne dane logowania."}), 401
token = uuid.uuid4().hex
SESSION_TOKENS[token] = {
"login_key": login_key,
"account_id": row["id"],
"expires_at": datetime.utcnow() + TOKEN_TTL,
}
return jsonify({"token": token, "login": login_key})
data = load_store()
accounts = data.get("accounts") or {}
login_key, account = find_account_identifier(accounts, login_key)
if not account or account.get("password_hash") != hash_password(password):
return jsonify({"error": "Niepoprawne dane logowania."}), 401
token = uuid.uuid4().hex
SESSION_TOKENS[token] = {
"login_key": login_key,
"expires_at": datetime.utcnow() + TOKEN_TTL,
}
return jsonify({"token": token, "login": account.get("login", login_key)})
@app.route("/api/logout", methods=["POST"])
def api_logout() -> Any:
token = request.headers.get("Authorization", "").replace("Bearer ", "")
SESSION_TOKENS.pop(token, None)
return jsonify({"message": "Wylogowano."})
@app.route("/api/business", methods=["GET", "POST"])
def api_business() -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
data = load_store()
account = data.get("accounts", {}).get(login_key)
account_row = None
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
if request.method == "GET":
if DATABASE_AVAILABLE:
profile = fetch_one(
"""
SELECT company_name, owner_name, address_line, postal_code,
city, tax_id, bank_account
FROM business_profiles
WHERE account_id = %s
""",
(account_row["id"],),
)
return jsonify({"business": profile})
if not account:
return jsonify({"business": None})
return jsonify({"business": account.get("business")})
payload = request.get_json(force=True)
required_fields = [
"company_name",
"owner_name",
"address_line",
"postal_code",
"city",
"tax_id",
"bank_account",
]
for field in required_fields:
if not (payload.get(field) or "").strip():
return jsonify({"error": f"Pole {field} jest wymagane."}), 400
if DATABASE_AVAILABLE:
update_business(account_row["id"], payload)
return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."})
if not account:
return jsonify({"error": "Nie znaleziono konta."}), 404
account["business"] = payload
save_store(data)
return jsonify({"message": "Dane sprzedawcy zostaly zaktualizowane."})
@app.route("/api/clients", methods=["GET"])
def api_clients() -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
if not DATABASE_AVAILABLE:
return jsonify({"clients": []})
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
search_term = (request.args.get("q") or "").strip()
limit_param = request.args.get("limit", "10")
try:
limit_value = int(limit_param)
except ValueError:
limit_value = 10
limit_value = max(1, min(25, limit_value))
clients = search_clients(account_row["id"], search_term, limit_value)
return jsonify({"clients": clients})
@app.route("/api/logo", methods=["GET", "POST", "DELETE"])
def api_logo() -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
account_row = None
account = None
data = None
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
else:
data = load_store()
try:
account = get_account(data, login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
if request.method == "GET":
if DATABASE_AVAILABLE:
logo_row = fetch_business_logo(account_row["id"])
if not logo_row:
return jsonify({"logo": None})
mime_type = logo_row["mime_type"]
encoded = logo_row["data"]
data_url = f"data:{mime_type};base64,{encoded}" if mime_type and encoded else None
return jsonify(
{
"logo": {
"filename": None,
"mime_type": mime_type,
"data": encoded,
"data_url": data_url,
"uploaded_at": None,
}
}
)
logo = account.get("logo") if account else None
if not logo:
return jsonify({"logo": None})
encoded = logo.get("data")
mime_type = logo.get("mime_type")
data_url = None
if encoded and mime_type:
data_url = f"data:{mime_type};base64,{encoded}"
return jsonify(
{
"logo": {
"filename": logo.get("filename"),
"mime_type": mime_type,
"data": encoded,
"data_url": data_url,
"uploaded_at": logo.get("uploaded_at"),
}
}
)
if request.method == "DELETE":
if DATABASE_AVAILABLE:
update_business_logo(account_row["id"], None, None)
return jsonify({"message": "Logo zostalo usuniete."})
if not account or data is None:
return jsonify({"error": "Nie znaleziono konta."}), 404
account["logo"] = None
save_store(data)
return jsonify({"message": "Logo zostalo usuniete."})
payload = request.get_json(force=True)
raw_content = (payload.get("content") or payload.get("data") or "").strip()
if not raw_content:
return jsonify({"error": "Brak danych logo."}), 400
provided_mime = (payload.get("mime_type") or "").strip()
filename = sanitize_filename(payload.get("filename"))
if raw_content.startswith("data:"):
try:
header, encoded_content = raw_content.split(",", 1)
except ValueError:
return jsonify({"error": "Niepoprawny format danych logo."}), 400
header = header.strip()
if ";base64" not in header:
return jsonify({"error": "Niepoprawny format danych logo (oczekiwano base64)."}), 400
mime_type = header.split(";")[0].replace("data:", "", 1) or provided_mime
base64_content = encoded_content.strip()
else:
mime_type = provided_mime
base64_content = raw_content
mime_type = (mime_type or "").lower()
if mime_type not in ALLOWED_LOGO_MIME_TYPES:
return jsonify({"error": "Dozwolone formaty logo to PNG lub JPG."}), 400
try:
logo_bytes = base64.b64decode(base64_content, validate=True)
except (ValueError, binascii.Error):
return jsonify({"error": "Nie udalo sie odczytac danych logo (base64)."}), 400
if len(logo_bytes) > MAX_LOGO_SIZE:
return jsonify({"error": f"Logo jest zbyt duze (maksymalnie {MAX_LOGO_SIZE // 1024} KB)."}), 400
encoded_logo = base64.b64encode(logo_bytes).decode("ascii")
stored_logo = {
"filename": filename,
"mime_type": mime_type,
"data": encoded_logo,
"uploaded_at": datetime.utcnow().isoformat(timespec="seconds"),
}
if DATABASE_AVAILABLE:
update_business_logo(account_row["id"], stored_logo["mime_type"], stored_logo["data"])
return jsonify({"logo": stored_logo})
account["logo"] = stored_logo
save_store(data)
return jsonify({"logo": stored_logo})
def normalize_phone(phone: Optional[str]) -> Optional[str]:
if not phone:
return None
digits = re.sub(r"[^\d+]", "", phone)
return digits or None
def validate_client(payload: Dict[str, Any]) -> Dict[str, str]:
client_payload = payload.get("client") or {}
client = {
"name": (client_payload.get("name") or payload.get("clientName") or "").strip(),
"tax_id": (client_payload.get("tax_id") or payload.get("clientTaxId") or "").strip(),
"address_line": (client_payload.get("address_line") or payload.get("clientAddress") or "").strip(),
"postal_code": (client_payload.get("postal_code") or payload.get("clientPostalCode") or "").strip(),
"city": (client_payload.get("city") or payload.get("clientCity") or "").strip(),
"phone": normalize_phone(client_payload.get("phone") or payload.get("clientPhone")),
}
return client
def build_invoice(payload: Dict[str, Any], business: Dict[str, Any], client: Dict[str, str]) -> Dict[str, Any]:
now = datetime.now()
invoice_id = f"FV-{now.strftime('%Y%m%d-%H%M%S')}"
issued_at = now.strftime("%Y-%m-%d %H:%M")
sale_date = payload.get("sale_date") or payload.get("saleDate") or date.today().isoformat()
payment_term = int(payload.get("payment_term") or payload.get("paymentTerm") or 14)
items = payload.get("items") or []
normalized_items: List[Dict[str, Any]] = []
for item in items:
name = (item.get("name") or "").strip()
if not name:
raise ValueError("Nazwa pozycji nie moze byc pusta.")
quantity = _quantize(_decimal(item.get("quantity") or "0"))
if quantity <= Decimal("0"):
raise ValueError("Ilosc musi byc dodatnia.")
unit = item.get("unit") or DEFAULT_UNIT
vat_code = str(item.get("vat_code") or item.get("vat") or item.get("vatCode") or "23")
if vat_code not in VAT_RATES:
raise ValueError("Niepoprawna stawka VAT.")
unit_price_raw = item.get("unit_price_gross")
if unit_price_raw in (None, ""):
unit_price_raw = item.get("unitPrice") or item.get("unit_price") or item.get("price")
unit_price_gross = _quantize(_decimal(unit_price_raw or "0"))
if unit_price_gross <= Decimal("0"):
raise ValueError("Cena musi byc dodatnia.")
vat_rate = VAT_RATES[vat_code]
if vat_rate is None:
unit_price_net = unit_price_gross
vat_amount = Decimal("0.00")
else:
unit_price_net = _quantize(unit_price_gross / (Decimal("1.0") + vat_rate))
vat_amount = _quantize(unit_price_gross - unit_price_net)
net_total = _quantize(unit_price_net * quantity)
vat_total = _quantize(vat_amount * quantity)
gross_total = _quantize(unit_price_gross * quantity)
normalized_items.append(
{
"name": name,
"quantity": str(quantity),
"unit": unit,
"vat_code": vat_code,
"vat_label": item.get("vatLabel") or vat_code,
"unit_price_net": str(unit_price_net),
"unit_price_gross": str(unit_price_gross),
"net_total": str(net_total),
"vat_amount": str(vat_amount),
"gross_total": str(gross_total),
}
)
totals = {"net": Decimal("0"), "vat": Decimal("0"), "gross": Decimal("0")}
summary: Dict[str, Dict[str, Decimal]] = {}
for item in normalized_items:
totals["net"] += Decimal(item["net_total"])
totals["vat"] += Decimal(item["vat_amount"])
totals["gross"] += Decimal(item["gross_total"])
label = item["vat_label"]
summary.setdefault(label, {"net_total": Decimal("0"), "vat_total": Decimal("0"), "gross_total": Decimal("0")})
summary[label]["net_total"] += Decimal(item["net_total"])
summary[label]["vat_total"] += Decimal(item["vat_amount"])
summary[label]["gross_total"] += Decimal(item["gross_total"])
totals = {key: str(_quantize(value)) for key, value in totals.items()}
summary_list = [
{
"vat_label": label,
"net_total": str(_quantize(values["net_total"])),
"vat_total": str(_quantize(values["vat_total"])),
"gross_total": str(_quantize(values["gross_total"])),
}
for label, values in summary.items()
]
exemption_note = (payload.get("exemption_note") or payload.get("exemptionNote") or "").strip()
return {
"invoice_id": invoice_id,
"issued_at": issued_at,
"sale_date": sale_date,
"payment_term": payment_term,
"items": normalized_items,
"summary": summary_list,
"totals": totals,
"client": client,
"business": business,
"exemption_note": exemption_note,
}
@app.route("/api/invoices", methods=["GET", "POST"])
def api_invoices() -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
if request.method == "GET":
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
invoice_rows = fetch_all(
"""
SELECT i.id,
i.invoice_number,
i.issued_at,
i.sale_date,
i.payment_term_days,
i.exemption_note,
i.total_net,
i.total_vat,
i.total_gross,
c.name AS client_name,
c.address_line AS client_address,
c.postal_code AS client_postal_code,
c.city AS client_city,
c.tax_id AS client_tax_id,
c.phone AS client_phone
FROM invoices AS i
LEFT JOIN clients AS c ON c.id = i.client_id
WHERE i.account_id = %s
ORDER BY i.issued_at DESC
LIMIT %s
""",
(account_row["id"], INVOICE_HISTORY_LIMIT),
)
if not invoice_rows:
return jsonify({"invoices": []})
invoice_ids = [row["id"] for row in invoice_rows]
items_map: Dict[int, List[Dict[str, Any]]] = {row_id: [] for row_id in invoice_ids}
summary_map: Dict[int, List[Dict[str, str]]] = {row_id: [] for row_id in invoice_ids}
if invoice_ids:
item_rows = fetch_all(
"""
SELECT invoice_id, line_no, name, quantity, unit,
vat_code, vat_label, unit_price_net,
unit_price_gross, net_total, vat_amount, gross_total
FROM invoice_items
WHERE invoice_id = ANY(%s)
ORDER BY line_no
""",
(invoice_ids,),
)
for item in item_rows:
items_map.setdefault(item["invoice_id"], []).append(
{
"name": item["name"],
"quantity": _format_decimal_str(item.get("quantity"), "0.00"),
"unit": item.get("unit") or DEFAULT_UNIT,
"vat_code": item.get("vat_code"),
"vat_label": item.get("vat_label") or item.get("vat_code"),
"unit_price_net": _format_decimal_str(item.get("unit_price_net")),
"unit_price_gross": _format_decimal_str(item.get("unit_price_gross")),
"net_total": _format_decimal_str(item.get("net_total")),
"vat_amount": _format_decimal_str(item.get("vat_amount")),
"gross_total": _format_decimal_str(item.get("gross_total")),
}
)
summary_rows = fetch_all(
"""
SELECT invoice_id, vat_label, net_total, vat_total, gross_total
FROM invoice_vat_summary
WHERE invoice_id = ANY(%s)
ORDER BY vat_label
""",
(invoice_ids,),
)
for entry in summary_rows:
summary_map.setdefault(entry["invoice_id"], []).append(
{
"vat_label": entry.get("vat_label"),
"net_total": _format_decimal_str(entry.get("net_total")),
"vat_total": _format_decimal_str(entry.get("vat_total")),
"gross_total": _format_decimal_str(entry.get("gross_total")),
}
)
business_profile = get_business_profile(account_row["id"])
invoices: List[Dict[str, Any]] = []
for row in invoice_rows:
issued_at_value = row.get("issued_at")
sale_date_value = row.get("sale_date")
if isinstance(issued_at_value, datetime):
issued_at = issued_at_value.strftime("%Y-%m-%d %H:%M")
else:
issued_at = issued_at_value
if hasattr(sale_date_value, "isoformat"):
sale_date = sale_date_value.isoformat()
else:
sale_date = sale_date_value
client = None
if row.get("client_name"):
client = {
"name": row.get("client_name"),
"address_line": row.get("client_address"),
"postal_code": row.get("client_postal_code"),
"city": row.get("client_city"),
"tax_id": row.get("client_tax_id"),
"phone": row.get("client_phone"),
}
invoices.append(
{
"invoice_id": row.get("invoice_number"),
"issued_at": issued_at,
"sale_date": sale_date,
"payment_term": row.get("payment_term_days"),
"exemption_note": row.get("exemption_note"),
"items": items_map.get(row["id"], []),
"summary": summary_map.get(row["id"], []),
"totals": {
"net": _format_decimal_str(row.get("total_net")),
"vat": _format_decimal_str(row.get("total_vat")),
"gross": _format_decimal_str(row.get("total_gross")),
},
"client": client,
"business": business_profile,
}
)
return jsonify({"invoices": invoices})
data = load_store()
try:
account = get_account(data, login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
invoices = account.get("invoices", [])[:INVOICE_HISTORY_LIMIT]
return jsonify({"invoices": invoices})
payload = request.get_json(force=True)
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
business = get_business_profile(account_row["id"])
if not business:
return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400
client = validate_client(payload)
try:
invoice = build_invoice(payload, business, client)
except ValueError as error:
return jsonify({"error": str(error)}), 400
client_id = upsert_client(
account_row["id"],
{
"name": client["name"],
"address_line": client["address_line"],
"postal_code": client["postal_code"],
"city": client["city"],
"tax_id": client["tax_id"],
"phone": client.get("phone"),
},
)
insert_invoice(account_row["id"], client_id, invoice)
return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice})
data = load_store()
try:
account = get_account(data, login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
business = account.get("business")
if not business:
return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400
client = validate_client(payload)
try:
invoice = build_invoice(payload, business, client)
except ValueError as error:
return jsonify({"error": str(error)}), 400
invoices = account.setdefault("invoices", [])
invoices.insert(0, invoice)
account["invoices"] = invoices[:INVOICE_HISTORY_LIMIT]
save_store(data)
return jsonify({"message": "Faktura zostala zapisana.", "invoice": invoice})
@app.route("/api/invoices/<invoice_id>", methods=["PUT", "DELETE"])
def api_invoice_detail(invoice_id: str) -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
invoice_row = fetch_one(
"""
SELECT id, issued_at
FROM invoices
WHERE account_id = %s AND invoice_number = %s
""",
(account_row["id"], invoice_id),
)
if not invoice_row:
return jsonify({"error": "Nie znaleziono faktury."}), 404
if request.method == "DELETE":
execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],))
execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],))
execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],))
return jsonify({"message": "Faktura zostala usunieta."})
payload = request.get_json(force=True)
business = get_business_profile(account_row["id"])
if not business:
return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400
client = validate_client(payload)
try:
invoice = build_invoice(payload, business, client)
except ValueError as error:
return jsonify({"error": str(error)}), 400
invoice["invoice_id"] = invoice_id
existing_issued_at = invoice_row.get("issued_at")
if isinstance(existing_issued_at, datetime):
invoice["issued_at"] = existing_issued_at.strftime("%Y-%m-%d %H:%M")
elif existing_issued_at:
invoice["issued_at"] = str(existing_issued_at)
client_id = upsert_client(
account_row["id"],
{
"name": client["name"],
"address_line": client["address_line"],
"postal_code": client["postal_code"],
"city": client["city"],
"tax_id": client["tax_id"],
"phone": client.get("phone"),
},
)
execute("DELETE FROM invoice_items WHERE invoice_id = %s", (invoice_row["id"],))
execute("DELETE FROM invoice_vat_summary WHERE invoice_id = %s", (invoice_row["id"],))
execute("DELETE FROM invoices WHERE id = %s", (invoice_row["id"],))
insert_invoice(account_row["id"], client_id, invoice)
return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice})
data = load_store()
try:
account = get_account(data, login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
invoices = account.get("invoices", [])
invoice_index = next(
(idx for idx, entry in enumerate(invoices) if entry.get("invoice_id") == invoice_id),
None,
)
if invoice_index is None:
return jsonify({"error": "Nie znaleziono faktury."}), 404
if request.method == "DELETE":
invoices.pop(invoice_index)
save_store(data)
return jsonify({"message": "Faktura zostala usunieta."})
payload = request.get_json(force=True)
business = account.get("business")
if not business:
return jsonify({"error": "Ustaw dane sprzedawcy przed dodaniem faktury."}), 400
client = validate_client(payload)
try:
invoice = build_invoice(payload, business, client)
except ValueError as error:
return jsonify({"error": str(error)}), 400
invoice["invoice_id"] = invoice_id
existing_invoice = invoices[invoice_index]
if existing_invoice.get("issued_at"):
invoice["issued_at"] = existing_invoice.get("issued_at")
invoices[invoice_index] = invoice
save_store(data)
return jsonify({"message": "Faktura zostala zaktualizowana.", "invoice": invoice})
@app.route("/api/invoices/summary", methods=["GET"])
def api_invoice_summary() -> Any:
try:
login_key = require_auth()
except PermissionError:
return jsonify({"error": "Brak autoryzacji."}), 401
now = datetime.utcnow()
last_month_start = now - timedelta(days=30)
quarter_first_month = ((now.month - 1) // 3) * 3 + 1
quarter_start = now.replace(month=quarter_first_month, day=1, hour=0, minute=0, second=0, microsecond=0)
year_start = now.replace(month=1, day=1, hour=0, minute=0, second=0, microsecond=0)
def normalize_issued_at(value: Any) -> Optional[datetime]:
if isinstance(value, datetime):
tzinfo = value.tzinfo
if tzinfo is not None and tzinfo.utcoffset(value) is not None:
return value.astimezone(timezone.utc).replace(tzinfo=None)
return value
if isinstance(value, str):
candidate = value.strip()
if not candidate:
return None
for fmt in ("%Y-%m-%d %H:%M", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d"):
try:
return datetime.strptime(candidate, fmt)
except ValueError:
continue
return None
def aggregate_from_rows(rows: List[Dict[str, Any]], start: datetime) -> Dict[str, Any]:
count = 0
gross_total = Decimal("0.00")
for row in rows:
issued_dt = normalize_issued_at(row.get("issued_at"))
if issued_dt is None or issued_dt < start:
continue
try:
gross_total += _decimal(row.get("total_gross") or "0")
except ValueError:
continue
count += 1
return {"count": count, "gross_total": str(_quantize(gross_total))}
if DATABASE_AVAILABLE:
try:
account_row = get_account_row(login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
rows = fetch_all(
"""
SELECT issued_at, total_gross
FROM invoices
WHERE account_id = %s
""",
(account_row["id"],),
)
summary = {
"last_month": aggregate_from_rows(rows, last_month_start),
"quarter": aggregate_from_rows(rows, quarter_start),
"year": aggregate_from_rows(rows, year_start),
}
return jsonify({"summary": summary})
data = load_store()
try:
account = get_account(data, login_key)
except KeyError:
return jsonify({"error": "Nie znaleziono konta."}), 404
invoices = account.get("invoices", [])
rows = [
{
"issued_at": invoice.get("issued_at"),
"total_gross": (invoice.get("totals") or {}).get("gross", "0"),
}
for invoice in invoices
]
summary = {
"last_month": aggregate_from_rows(rows, last_month_start),
"quarter": aggregate_from_rows(rows, quarter_start),
"year": aggregate_from_rows(rows, year_start),
}
return jsonify({"summary": summary})
if __name__ == "__main__":
port = int(os.environ.get("PORT", "5000"))
app.run(host="0.0.0.0", port=port, debug=True)