from flask import Flask, render_template_string, request, redirect, url_for, jsonify, flash, abort, Response, session
import json
import os
import logging
import threading
import time
from datetime import datetime, timedelta
import pytz
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError
import uuid
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
import functools
from functools import wraps
from collections import defaultdict
from urllib.parse import quote
app = Flask(__name__)
app.secret_key = os.urandom(24)
app.permanent_session_lifetime = timedelta(days=7)
ADMIN_PASS = os.getenv("ADMIN_PASS", "admin")
DATA_DIR = 'pos_data'
os.makedirs(DATA_DIR, exist_ok=True)
INVENTORY_FILE = os.path.join(DATA_DIR, 'inventory.json')
TRANSACTIONS_FILE = os.path.join(DATA_DIR, 'transactions.json')
USERS_FILE = os.path.join(DATA_DIR, 'users.json')
KASSAS_FILE = os.path.join(DATA_DIR, 'kassas.json')
EXPENSES_FILE = os.path.join(DATA_DIR, 'expenses.json')
DATA_FILES = {
'inventory': (INVENTORY_FILE, threading.Lock()),
'transactions': (TRANSACTIONS_FILE, threading.Lock()),
'users': (USERS_FILE, threading.Lock()),
'kassas': (KASSAS_FILE, threading.Lock()),
'expenses': (EXPENSES_FILE, threading.Lock()),
}
HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE", "YOUR_WRITE_TOKEN_HERE")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ", "YOUR_READ_TOKEN_HERE")
REPO_ID = "Kgshop/morshenbasekz"
BISHKEK_TZ = pytz.timezone('Asia/Almaty')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def get_current_time():
return datetime.now(BISHKEK_TZ)
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return str(obj)
return json.JSONEncoder.default(self, obj)
def to_decimal(value_str, default='0.00'):
if value_str is None or value_str == '':
return Decimal(default)
try:
return Decimal(str(value_str).replace(',', '.'))
except InvalidOperation:
logging.warning(f"Could not convert '{value_str}' to Decimal. Returned {default}.")
return Decimal(default)
def load_json_data(file_key):
filepath, lock = DATA_FILES[file_key]
filename = os.path.basename(filepath)
with lock:
try:
hf_hub_download(
repo_id=REPO_ID, filename=filename, repo_type="dataset", token=HF_TOKEN_READ,
local_dir=DATA_DIR, local_dir_use_symlinks=False, force_download=True,
)
except HfHubHTTPError as e:
if e.response.status_code != 404:
logging.error(f"HTTP error downloading {filename}: {e}")
except Exception as e:
logging.error(f"Unknown error downloading {filename}: {e}")
try:
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []
def save_json_data(file_key, data):
filepath, lock = DATA_FILES[file_key]
with lock:
try:
temp_file = filepath + ".tmp"
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4, cls=DecimalEncoder)
os.replace(temp_file, filepath)
except Exception as e:
logging.error(f"Critical error saving {filepath}: {e}", exc_info=True)
@functools.lru_cache(maxsize=1)
def get_hf_api():
if not HF_TOKEN_WRITE or HF_TOKEN_WRITE == "YOUR_WRITE_TOKEN_HERE":
return None
try:
return HfApi()
except Exception as e:
logging.error(f"Error initializing HfApi: {e}")
return None
def upload_db_to_hf(file_key):
api = get_hf_api()
if not api:
return
filepath, _ = DATA_FILES[file_key]
if not os.path.exists(filepath):
return
try:
filename = os.path.basename(filepath)
commit_time = get_current_time().strftime('%Y-%m-%d %H:%M:%S %Z%z')
api.upload_file(
path_or_fileobj=filepath, path_in_repo=filename, repo_id=REPO_ID, repo_type="dataset",
token=HF_TOKEN_WRITE, commit_message=f"Automated backup {filename} {commit_time}",
run_as_future=True
)
except Exception as e:
logging.error(f"Error initiating upload of {filepath}: {e}")
def periodic_backup():
while True:
time.sleep(1800)
try:
for key in DATA_FILES.keys():
upload_db_to_hf(key)
except Exception as e:
logging.error(f"Error during scheduled backup: {e}", exc_info=True)
def find_item_by_field(data, field, value):
for item in data:
if isinstance(item, dict) and item.get(field) == value:
return item
return None
def find_user_by_pin(pin):
users = load_json_data('users')
return find_item_by_field(users, 'pin', pin)
def format_currency_py(value):
try:
number = to_decimal(value)
return f"{number:,.2f}".replace(",", " ").replace(".", ",")
except (InvalidOperation, TypeError, ValueError):
return "0,00"
def generate_receipt_html(transaction):
items_html = ""
for item in transaction['items']:
items_html += f"""
| {item['name']} |
{item['quantity']} |
{format_currency_py(item['price_at_sale'])} |
{format_currency_py(item['total'])} |
"""
return f"""
Чек {transaction['id'][:8]}
Компания Morshen Group
--------------------------------
Чек №: {transaction['id'][:8]}
Дата: {datetime.fromisoformat(transaction['timestamp']).strftime('%d.%m.%Y %H:%M')}
Кассир: {transaction['user_name']}
--------------------------------
| Товар |
Кол. |
Цена |
Сумма |
{items_html}
--------------------------------
Итого: {format_currency_py(transaction['total_amount'])} ₸
Способ оплаты: {'Наличные' if transaction['payment_method'] == 'cash' else 'Карта'}
--------------------------------
Спасибо за покупку!
"""
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'admin_logged_in' not in session:
flash("Для доступа к этой странице требуется аутентификация.", "warning")
return redirect(url_for('admin_login', next=request.url))
return f(*args, **kwargs)
return decorated_function
@app.context_processor
def inject_utils():
return {'format_currency_py': format_currency_py, 'get_current_time': get_current_time, 'quote': quote}
@app.route('/')
def sales_screen():
inventory = load_json_data('inventory')
users = load_json_data('users')
kassas = load_json_data('kassas')
active_inventory = []
for p in inventory:
if isinstance(p, dict) and any(v.get('stock', 0) > 0 for v in p.get('variants', [])):
active_inventory.append(p)
active_inventory.sort(key=lambda x: x.get('name', '').lower())
html = BASE_TEMPLATE.replace('__TITLE__', "Касса").replace('__CONTENT__', SALES_SCREEN_CONTENT).replace('__SCRIPTS__', SALES_SCREEN_SCRIPTS)
return render_template_string(html, inventory=active_inventory, users=users, kassas=kassas)
@app.route('/inventory', methods=['GET', 'POST'])
@admin_required
def inventory_management():
if request.method == 'POST':
try:
name = request.form.get('name', '').strip()
barcode = request.form.get('barcode', '').strip()
if not name or not barcode:
flash("Название и штрих-код - обязательные поля.", "danger")
return redirect(url_for('inventory_management'))
inventory = load_json_data('inventory')
if find_item_by_field(inventory, 'barcode', barcode):
flash(f"Товар со штрих-кодом {barcode} уже существует.", "warning")
return redirect(url_for('inventory_management'))
variants = []
variant_names = request.form.getlist('variant_name[]')
variant_prices = request.form.getlist('variant_price[]')
variant_cost_prices = request.form.getlist('variant_cost_price[]')
variant_stocks = request.form.getlist('variant_stock[]')
for i in range(len(variant_names)):
v_name = variant_names[i].strip()
if not v_name: continue
variants.append({
'id': uuid.uuid4().hex,
'option_name': "Вариант",
'option_value': v_name,
'price': str(to_decimal(variant_prices[i])),
'cost_price': str(to_decimal(variant_cost_prices[i])),
'stock': int(to_decimal(variant_stocks[i], '0')),
})
if not variants:
flash("Нужно добавить хотя бы один вариант товара.", "danger")
return redirect(url_for('inventory_management'))
new_product = {
'id': uuid.uuid4().hex,
'name': name,
'barcode': barcode,
'variants': variants,
'timestamp_added': get_current_time().isoformat(),
'timestamp_updated': get_current_time().isoformat()
}
inventory.append(new_product)
save_json_data('inventory', inventory)
upload_db_to_hf('inventory')
flash(f"Товар '{name}' успешно добавлен.", "success")
except Exception as e:
logging.error(f"Error adding product: {e}", exc_info=True)
flash(f"Ошибка при добавлении товара: {e}", "danger")
return redirect(url_for('inventory_management'))
inventory_list = load_json_data('inventory')
inventory_list.sort(key=lambda x: x.get('name', '').lower())
html = BASE_TEMPLATE.replace('__TITLE__', "Склад").replace('__CONTENT__', INVENTORY_CONTENT).replace('__SCRIPTS__', INVENTORY_SCRIPTS)
return render_template_string(html, inventory=inventory_list)
@app.route('/inventory/edit/', methods=['POST'])
@admin_required
def edit_product(product_id):
inventory = load_json_data('inventory')
product_found = False
for i, product in enumerate(inventory):
if isinstance(product, dict) and product.get('id') == product_id:
try:
name = request.form.get('name', '').strip()
barcode = request.form.get('barcode', '').strip()
if not name or not barcode:
flash("Название и штрих-код обязательны.", "danger")
return redirect(url_for('inventory_management'))
existing_barcode = find_item_by_field(inventory, 'barcode', barcode)
if existing_barcode and existing_barcode.get('id') != product_id:
flash(f"Штрих-код {barcode} уже используется другим товаром.", "warning")
return redirect(url_for('inventory_management'))
inventory[i]['name'] = name
inventory[i]['barcode'] = barcode
new_variants = []
variant_ids = request.form.getlist('variant_id[]')
variant_names = request.form.getlist('variant_name[]')
variant_prices = request.form.getlist('variant_price[]')
variant_cost_prices = request.form.getlist('variant_cost_price[]')
variant_stocks = request.form.getlist('variant_stock[]')
for j in range(len(variant_ids)):
v_name = variant_names[j].strip()
if not v_name: continue
new_variants.append({
'id': variant_ids[j] or uuid.uuid4().hex,
'option_name': "Вариант",
'option_value': v_name,
'price': str(to_decimal(variant_prices[j])),
'cost_price': str(to_decimal(variant_cost_prices[j])),
'stock': int(to_decimal(variant_stocks[j], '0')),
})
inventory[i]['variants'] = new_variants
inventory[i]['timestamp_updated'] = get_current_time().isoformat()
product_found = True
break
except Exception as e:
logging.error(f"Error updating product: {e}", exc_info=True)
flash(f"Ошибка при обновлении товара: {e}", "danger")
return redirect(url_for('inventory_management'))
if product_found:
save_json_data('inventory', inventory)
upload_db_to_hf('inventory')
flash("Товар успешно обновлен.", "success")
else:
flash("Товар не найден.", "danger")
return redirect(url_for('inventory_management'))
@app.route('/inventory/delete/', methods=['POST'])
@admin_required
def delete_product(product_id):
inventory = load_json_data('inventory')
initial_len = len(inventory)
inventory = [p for p in inventory if not (isinstance(p, dict) and p.get('id') == product_id)]
if len(inventory) < initial_len:
save_json_data('inventory', inventory)
upload_db_to_hf('inventory')
flash("Товар удален.", "success")
else:
flash("Товар не найден.", "warning")
return redirect(url_for('inventory_management'))
@app.route('/inventory/stock_in', methods=['POST'])
@admin_required
def stock_in():
try:
product_id = request.form.get('product_id')
variant_id = request.form.get('variant_id')
quantity = int(request.form.get('quantity', 0))
cost_price_str = request.form.get('cost_price')
if not product_id or not variant_id or quantity <= 0:
flash("Неверные данные для оприходования.", "danger")
return redirect(url_for('inventory_management'))
inventory = load_json_data('inventory')
product = find_item_by_field(inventory, 'id', product_id)
if not product:
flash("Товар не найден.", "danger")
return redirect(url_for('inventory_management'))
variant_found = False
for i, variant in enumerate(product.get('variants', [])):
if variant.get('id') == variant_id:
variant['stock'] = variant.get('stock', 0) + quantity
if cost_price_str:
new_cost = to_decimal(cost_price_str)
old_stock = variant.get('stock', 0) - quantity
old_cost = to_decimal(variant.get('cost_price', '0'))
if old_stock + quantity > 0:
avg_cost = ((old_cost * old_stock) + (new_cost * quantity)) / (old_stock + quantity)
variant['cost_price'] = str(avg_cost.quantize(Decimal('0.01'), rounding=ROUND_HALF_UP))
variant_found = True
break
if not variant_found:
flash("Вариант товара не найден.", "danger")
return redirect(url_for('inventory_management'))
product['timestamp_updated'] = get_current_time().isoformat()
save_json_data('inventory', inventory)
upload_db_to_hf('inventory')
flash(f"Остаток товара '{product['name']} ({variant['option_value']})' увеличен на {quantity}.", "success")
except Exception as e:
logging.error(f"Error stocking in: {e}", exc_info=True)
flash(f"Ошибка при оприходовании: {e}", "danger")
return redirect(url_for('inventory_management'))
@app.route('/api/product_by_barcode/')
def get_product_by_barcode(barcode):
inventory = load_json_data('inventory')
product = find_item_by_field(inventory, 'barcode', barcode)
if product:
active_variants = [v for v in product.get('variants', []) if v.get('stock', 0) > 0]
if active_variants:
product_copy = product.copy()
product_copy['variants'] = active_variants
return jsonify({'success': True, 'product': product_copy})
else:
return jsonify({'success': False, 'message': 'Товар закончился на складе'}), 404
return jsonify({'success': False, 'message': 'Товар не найден'}), 404
@app.route('/api/complete_sale', methods=['POST'])
def complete_sale():
try:
data = request.get_json()
cart = data.get('cart', [])
user_id = data.get('userId')
kassa_id = data.get('kassaId')
payment_method = data.get('paymentMethod', 'cash')
if not cart or not user_id or not kassa_id:
return jsonify({'success': False, 'message': 'Неполные данные для продажи.'}), 400
inventory = load_json_data('inventory')
users = load_json_data('users')
kassas = load_json_data('kassas')
user = find_item_by_field(users, 'id', user_id)
kassa = find_item_by_field(kassas, 'id', kassa_id)
if not user or not kassa:
return jsonify({'success': False, 'message': 'Кассир или касса не найдены.'}), 404
sale_items = []
total_amount = Decimal('0.00')
inventory_updates = {}
for variant_id, cart_item in cart.items():
product = find_item_by_field(inventory, 'id', cart_item['productId'])
if not product:
return jsonify({'success': False, 'message': f"Товар с ID {cart_item['productId']} не найден."}), 404
variant = find_item_by_field(product.get('variants', []), 'id', variant_id)
if not variant:
return jsonify({'success': False, 'message': f"Вариант товара с ID {variant_id} не найден."}), 404
quantity_sold = cart_item['quantity']
current_stock = variant.get('stock', 0)
if quantity_sold > current_stock:
return jsonify({'success': False, 'message': f"Недостаточно товара '{product['name']} ({variant['option_value']})'. В наличии: {current_stock}"}), 400
price_at_sale = to_decimal(variant.get('price', '0'))
cost_price_at_sale = to_decimal(variant.get('cost_price', '0'))
item_total = price_at_sale * Decimal(quantity_sold)
total_amount += item_total
sale_items.append({
'product_id': product['id'],
'variant_id': variant_id,
'name': f"{product['name']} ({variant['option_value']})",
'barcode': product.get('barcode'),
'quantity': quantity_sold,
'price_at_sale': str(price_at_sale),
'cost_price_at_sale': str(cost_price_at_sale),
'total': str(item_total)
})
inventory_updates[variant_id] = {'product_id': product['id'], 'new_stock': current_stock - quantity_sold}
now_iso = get_current_time().isoformat()
new_transaction = {
'id': uuid.uuid4().hex,
'timestamp': now_iso,
'type': 'sale',
'status': 'completed',
'original_transaction_id': None,
'user_id': user_id,
'user_name': user.get('name', 'N/A'),
'kassa_id': kassa_id,
'kassa_name': kassa.get('name', 'N/A'),
'items': sale_items,
'total_amount': str(total_amount),
'payment_method': payment_method
}
new_transaction['receipt_html'] = generate_receipt_html(new_transaction)
transactions = load_json_data('transactions')
transactions.append(new_transaction)
for variant_id, update_info in inventory_updates.items():
for p in inventory:
if p.get('id') == update_info['product_id']:
for v in p.get('variants', []):
if v.get('id') == variant_id:
v['stock'] = update_info['new_stock']
p['timestamp_updated'] = now_iso
break
break
if payment_method == 'cash':
for i, k in enumerate(kassas):
if k.get('id') == kassa_id:
current_balance = to_decimal(k.get('balance', '0'))
kassas[i]['balance'] = str(current_balance + total_amount)
if 'history' not in kassas[i] or not isinstance(kassas[i]['history'], list):
kassas[i]['history'] = []
kassas[i]['history'].append({
'type': 'sale',
'amount': str(total_amount),
'timestamp': now_iso,
'transaction_id': new_transaction['id']
})
break
save_json_data('transactions', transactions)
save_json_data('inventory', inventory)
save_json_data('kassas', kassas)
upload_db_to_hf('transactions')
upload_db_to_hf('inventory')
upload_db_to_hf('kassas')
receipt_url = url_for('view_receipt', transaction_id=new_transaction['id'], _external=True)
return jsonify({
'success': True,
'message': 'Продажа успешно зарегистрирована.',
'transactionId': new_transaction['id'],
'receiptUrl': receipt_url
})
except Exception as e:
logging.error(f"Error completing sale: {e}", exc_info=True)
return jsonify({'success': False, 'message': f'Внутренняя ошибка сервера: {e}'}), 500
@app.route('/receipt/')
def view_receipt(transaction_id):
transactions = load_json_data('transactions')
transaction = find_item_by_field(transactions, 'id', transaction_id)
if transaction and 'receipt_html' in transaction:
return Response(transaction['receipt_html'], mimetype='text/html')
abort(404, description="Чек не найден")
@app.route('/transactions')
def transaction_history():
transactions = load_json_data('transactions')
transactions.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
html = BASE_TEMPLATE.replace('__TITLE__', "История транзакций").replace('__CONTENT__', TRANSACTIONS_CONTENT).replace('__SCRIPTS__', TRANSACTIONS_SCRIPTS)
return render_template_string(html, transactions=transactions)
@app.route('/reports')
@admin_required
def reports():
today = get_current_time().date()
start_date_str = request.args.get('start_date', (today.replace(day=1)).strftime('%Y-%m-%d'))
end_date_str = request.args.get('end_date', (today).strftime('%Y-%m-%d'))
start_date = datetime.strptime(start_date_str, '%Y-%m-%d').replace(tzinfo=BISHKEK_TZ)
end_date = (datetime.strptime(end_date_str, '%Y-%m-%d') + timedelta(days=1)).replace(tzinfo=BISHKEK_TZ)
num_days = (end_date - start_date).days
transactions = load_json_data('transactions')
expenses = load_json_data('expenses')
users = load_json_data('users')
filtered_transactions = [
t for t in transactions
if start_date <= datetime.fromisoformat(t['timestamp']) < end_date
]
filtered_expenses = [
e for e in expenses
if start_date <= datetime.fromisoformat(e['timestamp']) < end_date
]
total_revenue = sum(to_decimal(t['total_amount']) for t in filtered_transactions)
total_cogs = sum(
to_decimal(item.get('cost_price_at_sale', '0')) * to_decimal(str(item['quantity']))
for t in filtered_transactions for item in t['items']
)
gross_profit = total_revenue - total_cogs
total_expenses = sum(to_decimal(e['amount']) for e in filtered_expenses)
sales_by_cashier = defaultdict(lambda: {'count': 0, 'total': Decimal(0)})
for t in filtered_transactions:
if t.get('type') == 'sale':
cashier_name = t.get('user_name', 'Неизвестный')
sales_by_cashier[cashier_name]['count'] += 1
sales_by_cashier[cashier_name]['total'] += to_decimal(t['total_amount'])
elif t.get('type') == 'return':
cashier_name = t.get('user_name', 'Неизвестный')
sales_by_cashier[cashier_name]['count'] -= 1
sales_by_cashier[cashier_name]['total'] += to_decimal(t['total_amount'])
cashier_payouts = defaultdict(Decimal)
for t in filtered_transactions:
user = find_item_by_field(users, 'id', t.get('user_id'))
if user:
payment_type = user.get('payment_type')
payment_value = to_decimal(user.get('payment_value', '0'))
if payment_type == 'percentage' and payment_value > 0:
payout = to_decimal(t['total_amount']) * (payment_value / Decimal(100))
cashier_payouts[user['name']] += payout
for user in users:
if user.get('payment_type') == 'salary':
monthly_salary = to_decimal(user.get('payment_value', '0'))
if monthly_salary > 0:
daily_salary = monthly_salary / Decimal(30)
period_salary = daily_salary * Decimal(num_days)
cashier_payouts[user['name']] += period_salary
total_salary_expenses = sum(cashier_payouts.values())
net_profit = gross_profit - total_expenses - total_salary_expenses
stats = {
'total_revenue': total_revenue,
'total_cogs': total_cogs,
'gross_profit': gross_profit,
'total_expenses': total_expenses,
'total_salary_expenses': total_salary_expenses,
'net_profit': net_profit,
'sales_by_cashier': sorted(sales_by_cashier.items(), key=lambda item: item[1]['total'], reverse=True),
'cashier_payouts': sorted(cashier_payouts.items(), key=lambda item: item[1], reverse=True)
}
filtered_expenses.sort(key=lambda x: x['timestamp'], reverse=True)
html = BASE_TEMPLATE.replace('__TITLE__', "Отчеты").replace('__CONTENT__', REPORTS_CONTENT).replace('__SCRIPTS__', REPORTS_SCRIPTS)
return render_template_string(html, stats=stats, start_date=start_date_str, end_date=end_date_str, expenses=filtered_expenses)
@app.route('/reports/product_roi')
@admin_required
def product_roi_report():
inventory = load_json_data('inventory')
transactions = load_json_data('transactions')
product_stats = []
for product in inventory:
for variant in product.get('variants', []):
total_revenue = Decimal('0.00')
total_cogs = Decimal('0.00')
total_qty_sold = 0
for t in transactions:
if t['type'] in ['sale', 'return']:
for item in t['items']:
if item.get('variant_id') == variant['id']:
total_revenue += to_decimal(item['total'])
total_cogs += to_decimal(item.get('cost_price_at_sale', '0')) * to_decimal(str(item['quantity']))
if t['type'] == 'sale':
total_qty_sold += item['quantity']
elif t['type'] == 'return':
total_qty_sold += item['quantity']
current_stock = to_decimal(str(variant.get('stock', 0)))
cost_price = to_decimal(variant.get('cost_price', '0'))
inventory_value = current_stock * cost_price
total_investment = total_cogs + inventory_value
payback = total_revenue - total_investment
product_stats.append({
'name': product['name'],
'variant_name': variant['option_value'],
'total_qty_sold': total_qty_sold,
'total_revenue': total_revenue,
'total_investment': total_investment,
'inventory_value': inventory_value,
'payback': payback
})
product_stats.sort(key=lambda x: x['payback'], reverse=True)
html = BASE_TEMPLATE.replace('__TITLE__', "Отчет по окупаемости товаров").replace('__CONTENT__', PRODUCT_ROI_CONTENT).replace('__SCRIPTS__', '')
return render_template_string(html, stats=product_stats)
@app.route('/admin', methods=['GET'])
@admin_required
def admin_panel():
users = load_json_data('users')
kassas = load_json_data('kassas')
expenses = load_json_data('expenses')
expenses.sort(key=lambda x: x.get('timestamp', ''), reverse=True)
html = BASE_TEMPLATE.replace('__TITLE__', "Админ-панель").replace('__CONTENT__', ADMIN_CONTENT).replace('__SCRIPTS__', ADMIN_SCRIPTS)
return render_template_string(html, users=users, kassas=kassas, expenses=expenses[:10])
@app.route('/admin/user', methods=['POST'])
@admin_required
def manage_user():
action = request.form.get('action')
users = load_json_data('users')
if action == 'add':
name = request.form.get('name', '').strip()
pin = request.form.get('pin', '').strip()
payment_type = request.form.get('payment_type')
payment_value = to_decimal(request.form.get('payment_value', '0'))
if name and pin and pin.isdigit() and payment_type:
new_user = {
'id': uuid.uuid4().hex, 'name': name, 'pin': pin,
'payment_type': payment_type, 'payment_value': str(payment_value)
}
users.append(new_user)
flash(f"Кассир '{name}' добавлен.", "success")
else:
flash("Все поля обязательны.", "danger")
elif action == 'edit':
user_id = request.form.get('id')
user = find_item_by_field(users, 'id', user_id)
if user:
user['name'] = request.form.get('name', '').strip()
user['pin'] = request.form.get('pin', '').strip()
user['payment_type'] = request.form.get('payment_type')
user['payment_value'] = str(to_decimal(request.form.get('payment_value', '0')))
flash(f"Данные кассира '{user['name']}' обновлены.", "success")
else:
flash("Кассир не найден.", "warning")
elif action == 'delete':
user_id = request.form.get('id')
initial_len = len(users)
users = [u for u in users if u.get('id') != user_id]
if len(users) < initial_len:
flash("Кассир удален.", "success")
else:
flash("Кассир не найден.", "warning")
save_json_data('users', users)
upload_db_to_hf('users')
return redirect(url_for('admin_panel'))
@app.route('/admin/kassa', methods=['POST'])
@admin_required
def manage_kassa():
action = request.form.get('action')
kassas = load_json_data('kassas')
if action == 'add':
name = request.form.get('name', '').strip()
balance = to_decimal(request.form.get('balance', '0'))
if name:
new_kassa = {
'id': uuid.uuid4().hex,
'name': name,
'balance': str(balance),
'history': []
}
if balance > 0:
new_kassa['history'].append({
'type': 'deposit',
'amount': str(balance),
'timestamp': get_current_time().isoformat(),
'description': 'Начальный баланс'
})
kassas.append(new_kassa)
flash(f"Касса '{name}' добавлена.", "success")
else:
flash("Название кассы обязательно.", "danger")
elif action == 'delete':
kassa_id = request.form.get('id')
initial_len = len(kassas)
kassas = [k for k in kassas if k.get('id') != kassa_id]
if len(kassas) < initial_len:
flash("Касса удалена.", "success")
else:
flash("Касса не найдена.", "warning")
save_json_data('kassas', kassas)
upload_db_to_hf('kassas')
return redirect(url_for('admin_panel'))
@app.route('/admin/kassa_op', methods=['POST'])
@admin_required
def kassa_operation():
kassa_id = request.form.get('kassa_id')
op_type = request.form.get('op_type')
amount = to_decimal(request.form.get('amount', '0'))
description = request.form.get('description', '').strip()
if not kassa_id or not op_type or amount <= 0:
flash("Выберите кассу, тип операции и укажите положительную сумму.", "danger")
return redirect(url_for('admin_panel'))
kassas = load_json_data('kassas')
kassa_found = False
for i, kassa in enumerate(kassas):
if kassa.get('id') == kassa_id:
current_balance = to_decimal(kassa.get('balance', '0'))
new_balance = current_balance
if op_type == 'deposit':
new_balance += amount
elif op_type == 'withdrawal':
if amount > current_balance:
flash("Сумма изъятия превышает баланс кассы.", "danger")
return redirect(url_for('admin_panel'))
new_balance -= amount
kassas[i]['balance'] = str(new_balance)
if 'history' not in kassas[i]: kassas[i]['history'] = []
kassas[i]['history'].append({
'type': op_type,
'amount': str(amount),
'timestamp': get_current_time().isoformat(),
'description': description or f"{'Внесение' if op_type == 'deposit' else 'Изъятие'} средств"
})
kassa_found = True
break
if kassa_found:
save_json_data('kassas', kassas)
upload_db_to_hf('kassas')
flash("Операция по кассе успешно проведена.", "success")
else:
flash("Касса не найдена.", "danger")
return redirect(url_for('admin_panel'))
@app.route('/admin/expense', methods=['POST'])
@admin_required
def manage_expense():
amount = to_decimal(request.form.get('amount', '0'))
description = request.form.get('description', '').strip()
if amount <= 0 or not description:
flash("Укажите положительную сумму и описание расхода.", "danger")
return redirect(url_for('admin_panel'))
new_expense = {
'id': uuid.uuid4().hex,
'timestamp': get_current_time().isoformat(),
'amount': str(amount),
'description': description
}
expenses = load_json_data('expenses')
expenses.append(new_expense)
save_json_data('expenses', expenses)
upload_db_to_hf('expenses')
flash("Расход успешно добавлен.", "success")
return redirect(url_for('admin_panel'))
@app.route('/cashier_login', methods=['GET', 'POST'])
def cashier_login():
if request.method == 'POST':
pin = request.form.get('pin')
user = find_user_by_pin(pin)
if user:
return redirect(url_for('cashier_dashboard', user_id=user['id']))
else:
flash("Неверный ПИН-код.", "danger")
html = BASE_TEMPLATE.replace('__TITLE__', "Вход для кассира").replace('__CONTENT__', CASHIER_LOGIN_CONTENT).replace('__SCRIPTS__', '')
return render_template_string(html)
@app.route('/cashier_dashboard/')
def cashier_dashboard(user_id):
users = load_json_data('users')
user = find_item_by_field(users, 'id', user_id)
if not user:
abort(404, "Кассир не найден")
transactions = load_json_data('transactions')
user_transactions = [t for t in transactions if t.get('user_id') == user_id]
user_transactions.sort(key=lambda x: x['timestamp'], reverse=True)
html = BASE_TEMPLATE.replace('__TITLE__', f"Продажи кассира: {user['name']}").replace('__CONTENT__', CASHIER_DASHBOARD_CONTENT).replace('__SCRIPTS__', '')
return render_template_string(html, user=user, transactions=user_transactions)
@app.route('/return_transaction/', methods=['POST'])
def return_transaction(transaction_id):
cashier_id = request.form.get('cashier_id')
if not cashier_id:
flash("Не удалось определить кассира для оформления возврата.", "danger")
return redirect(url_for('cashier_login'))
transactions = load_json_data('transactions')
inventory = load_json_data('inventory')
kassas = load_json_data('kassas')
original_transaction = find_item_by_field(transactions, 'id', transaction_id)
if not original_transaction:
flash("Оригинальная транзакция не найдена.", "danger")
return redirect(url_for('cashier_dashboard', user_id=cashier_id))
if original_transaction.get('status') == 'returned':
flash("Эта продажа уже была возвращена.", "warning")
return redirect(url_for('cashier_dashboard', user_id=cashier_id))
total_amount = to_decimal(original_transaction['total_amount'])
return_items = []
inventory_updates = {}
for item in original_transaction['items']:
return_items.append({**item, 'quantity': -item['quantity'], 'total': str(-to_decimal(item['total']))})
product = find_item_by_field(inventory, 'id', item['product_id'])
if product:
variant = find_item_by_field(product.get('variants', []), 'id', item['variant_id'])
if variant:
inventory_updates[item['variant_id']] = {'product_id': item['product_id'], 'new_stock': variant.get('stock', 0) + item['quantity']}
now_iso = get_current_time().isoformat()
return_transaction = {
'id': uuid.uuid4().hex,
'timestamp': now_iso,
'type': 'return',
'status': 'completed',
'original_transaction_id': transaction_id,
'user_id': original_transaction['user_id'],
'user_name': original_transaction['user_name'],
'kassa_id': original_transaction['kassa_id'],
'kassa_name': original_transaction['kassa_name'],
'items': return_items,
'total_amount': str(-total_amount),
'payment_method': original_transaction['payment_method']
}
transactions.append(return_transaction)
for i, t in enumerate(transactions):
if t['id'] == transaction_id:
transactions[i]['status'] = 'returned'
break
for variant_id, update_info in inventory_updates.items():
for p in inventory:
if p.get('id') == update_info['product_id']:
for v in p.get('variants', []):
if v.get('id') == variant_id:
v['stock'] = update_info['new_stock']
p['timestamp_updated'] = now_iso
break
break
if original_transaction['payment_method'] == 'cash':
for i, k in enumerate(kassas):
if k['id'] == original_transaction['kassa_id']:
current_balance = to_decimal(k.get('balance', '0'))
kassas[i]['balance'] = str(current_balance - total_amount)
kassas[i].setdefault('history', []).append({
'type': 'return', 'amount': str(-total_amount), 'timestamp': now_iso,
'transaction_id': return_transaction['id']
})
break
save_json_data('transactions', transactions)
save_json_data('inventory', inventory)
save_json_data('kassas', kassas)
upload_db_to_hf('transactions')
upload_db_to_hf('inventory')
upload_db_to_hf('kassas')
flash("Возврат успешно оформлен.", "success")
return redirect(url_for('cashier_dashboard', user_id=cashier_id))
@app.route('/backup', methods=['POST'])
@admin_required
def backup_hf():
try:
for key in DATA_FILES.keys():
upload_db_to_hf(key)
flash(f"Резервное копирование {len(DATA_FILES)} файлов инициировано.", "success")
except Exception as e:
flash(f"Ошибка при резервном копировании: {e}", "danger")
return redirect(url_for('admin_panel'))
@app.route('/download', methods=['GET'])
@admin_required
def download_hf():
errors = []
success_count = 0
for key in DATA_FILES.keys():
filepath, _ = DATA_FILES[key]
filename = os.path.basename(filepath)
try:
hf_hub_download(
repo_id=REPO_ID, filename=filename, repo_type="dataset", token=HF_TOKEN_READ,
local_dir=DATA_DIR, local_dir_use_symlinks=False, force_download=True,
)
success_count += 1
except Exception as e:
errors.append(f"Ошибка загрузки {filename}: {e}")
if success_count > 0:
flash(f"Успешно загружено {success_count} файлов. Данные перезаписаны.", "success")
if errors:
flash("Произошли ошибки: " + "; ".join(errors), "danger")
return redirect(url_for('admin_panel'))
@app.route('/admin_login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
password = request.form.get('password')
if password == ADMIN_PASS:
session['admin_logged_in'] = True
session.permanent = True
next_url = request.args.get('next')
flash("Вы успешно вошли в систему.", "success")
return redirect(next_url or url_for('admin_panel'))
else:
flash("Неверный пароль.", "danger")
html = BASE_TEMPLATE.replace('__TITLE__', "Вход для администратора").replace('__CONTENT__', ADMIN_LOGIN_CONTENT).replace('__SCRIPTS__', '')
return render_template_string(html)
@app.route('/admin_logout')
def admin_logout():
session.pop('admin_logged_in', None)
flash("Вы вышли из системы.", "info")
return redirect(url_for('sales_screen'))
BASE_TEMPLATE = """
__TITLE__ - POS
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
__CONTENT__
__SCRIPTS__
"""
SALES_SCREEN_CONTENT = """
{% for p in inventory %}
{{ p.name }}
{% if p.variants|length > 1 %}
от {{ format_currency_py(p.variants|map(attribute='price')|min) }} ₸
{% elif p.variants|length == 1 %}
{{ format_currency_py(p.variants[0].price) }} ₸
{% else %}
Нет в наличии
{% endif %}
{% endfor %}
Итого:
0,00 ₸
"""
SALES_SCREEN_SCRIPTS = """
"""
INVENTORY_CONTENT = """
{% for p in inventory %}
| Вариант | Цена | Себест. | Остаток |
{% for v in p.variants %}
| {{ v.option_value }} |
{{ format_currency_py(v.price) }} ₸ |
{{ format_currency_py(v.cost_price) }} ₸ |
{{ v.stock }} |
{% else %}
| Нет вариантов |
{% endfor %}
{% endfor %}
{% for p in inventory %}
{% endfor %}
"""
INVENTORY_SCRIPTS = """
"""
TRANSACTIONS_CONTENT = """
| ID | Дата | Тип | Кассир | Касса | Сумма | Статус | Позиции |
{% for t in transactions %}
| {{ t.id[:8] }} |
{{ t.timestamp[:16]|replace('T', ' ') }} |
{{'Продажа' if t.type == 'sale' else 'Возврат'}} |
{{ t.user_name }} | {{ t.kassa_name }} |
{{ format_currency_py(t.total_amount) }} ₸ |
{{t.status}} |
{% for item in t['items'] %}- {{ item.name }} ({{ item.quantity }}x{{ format_currency_py(item.price_at_sale) }})
{% endfor %}
|
{% endfor %}
"""
TRANSACTIONS_SCRIPTS = ""
REPORTS_CONTENT = """
- Выручка (за вычетом возвратов) {{ format_currency_py(stats.total_revenue) }} ₸
- Себестоимость проданных товаров {{ format_currency_py(stats.total_cogs) }} ₸
- Валовая прибыль {{ format_currency_py(stats.gross_profit) }} ₸
- Расходы (операционные) -{{ format_currency_py(stats.total_expenses) }} ₸
- Расходы (зарплаты) -{{ format_currency_py(stats.total_salary_expenses) }} ₸
- Чистая прибыль {{ format_currency_py(stats.net_profit) }} ₸
| Кассир | К выплате |
{% for name, payout in stats.cashier_payouts %}
| {{ name }} | {{ format_currency_py(payout) }} ₸ |
{% else %}| Нет данных для расчета. |
{% endfor %}
| Кассир | Чеков | Сумма |
{% for name, data in stats.sales_by_cashier %}
| {{ name }} | {{ data.count }} | {{ format_currency_py(data.total) }} ₸ |
{% else %}| Нет продаж за выбранный период. |
{% endfor %}
| Дата | Описание | Сумма |
{% for expense in expenses %}
| {{ expense.timestamp[:10] }} |
{{ expense.description }} |
{{ format_currency_py(expense.amount) }} ₸ |
{% else %}| Нет расходов за выбранный период. |
{% endfor %}
"""
REPORTS_SCRIPTS = ""
PRODUCT_ROI_CONTENT = """
| Товар (вариант) |
Продано, шт |
Выручка |
Стоимость на складе |
Общие вложения |
Результат (Окупаемость) |
{% for item in stats %}
{{ item.name }} {{ item.variant_name }} |
{{ item.total_qty_sold }} |
{{ format_currency_py(item.total_revenue) }} ₸ |
{{ format_currency_py(item.inventory_value) }} ₸ |
{{ format_currency_py(item.total_investment) }} ₸ |
{{ format_currency_py(item.payback) }} ₸
|
{% else %}
| Нет данных для анализа. |
{% endfor %}
"""
ADMIN_CONTENT = """
Последние 10 расходов:
{% for e in expenses %}
| {{ e.timestamp[:16]|replace('T', ' ') }} |
{{ e.description }} |
{{ format_currency_py(e.amount) }} ₸ |
{% else %}
| Расходов пока нет |
{% endfor %}
Данные периодически сохраняются в облако. Можно сделать это вручную.
{% for u in users %}
{% endfor %}
"""
ADMIN_SCRIPTS = ""
ADMIN_LOGIN_CONTENT = """
"""
CASHIER_LOGIN_CONTENT = """
"""
CASHIER_DASHBOARD_CONTENT = """
| ID | Дата | Тип | Сумма | Статус | Действие |
{% for t in transactions %}
| {{ t.id[:8] }} |
{{ t.timestamp[:16]|replace('T', ' ') }} |
{{'Продажа' if t.type == 'sale' else 'Возврат'}} |
{{ format_currency_py(t.total_amount) }} ₸ |
{{t.status}} |
{% if t.type == 'sale' and t.status == 'completed' %}
{% endif %}
|
{% endfor %}
"""
if __name__ == '__main__':
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
for key in DATA_FILES.keys():
load_json_data(key)
app.run(debug=False, host='0.0.0.0', port=7860, use_reloader=False)