Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import os | |
| import json | |
| import random | |
| import string | |
| from datetime import datetime, timedelta | |
| import time | |
| import threading | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # --- Настройки Hugging Face --- | |
| REPO_ID = "Testbase1/testsett" # Замените на ваш репозиторий | |
| DB_FILENAME = "auth_system.json" # JSON база данных | |
| HF_TOKEN_WRITE = os.getenv("HF_TOKEN") # Токен для записи (загрузки) | |
| HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") # Токен для чтения (скачивания) | |
| # Глобальный блокировщик для доступа к JSON-файлу | |
| db_lock = threading.Lock() | |
| # --- Кеширование функций для повышения производительности --- | |
| def load_db(): | |
| """Загружает базу данных из JSON-файла.""" | |
| if not os.path.exists(DB_FILENAME): | |
| data = {"users": [], "products": [], "cart": [], "sales": []} | |
| save_db(data) | |
| return data | |
| try: | |
| with open(DB_FILENAME, "r", encoding="utf8") as f: | |
| data = json.load(f) | |
| except json.JSONDecodeError: | |
| data = {"users": [], "products": [], "cart": [], "sales": []} | |
| save_db(data) | |
| return data | |
| def save_db(data): | |
| """Сохраняет базу данных в JSON-файл.""" | |
| with db_lock: | |
| with open(DB_FILENAME, "w", encoding="utf8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=4) | |
| def upload_db_to_hf(): | |
| """Загружает JSON-файл базы данных в репозиторий Hugging Face.""" | |
| try: | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj=DB_FILENAME, | |
| path_in_repo=DB_FILENAME, | |
| repo_id=REPO_ID, | |
| repo_type="dataset", | |
| token=HF_TOKEN_WRITE, | |
| commit_message=f"Автоматическое резервное копирование базы данных {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
| ) | |
| print("Резервная копия JSON базы успешно загружена на Hugging Face.") | |
| except Exception as e: | |
| print("Ошибка при загрузке резервной копии:", e) | |
| def download_db_from_hf(): | |
| """Скачивает JSON-файл базы данных из репозитория Hugging Face.""" | |
| try: | |
| hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename=DB_FILENAME, | |
| repo_type="dataset", | |
| token=HF_TOKEN_READ, | |
| local_dir=".", | |
| local_dir_use_symlinks=False | |
| ) | |
| print("JSON база успешно скачана из Hugging Face.") | |
| return True | |
| except Exception as e: | |
| print("Ошибка при скачивании JSON базы:", e) | |
| return False | |
| def periodic_backup(): | |
| """ | |
| Периодически, каждые 15 секунд, вызывается функция upload_db_to_hf(). | |
| """ | |
| while True: | |
| upload_db_to_hf() | |
| time.sleep(100) | |
| # --- Вспомогательные функции --- | |
| def generate_token(): | |
| """Генерирует случайный токен.""" | |
| return ''.join(random.choices(string.ascii_letters + string.digits, k=13)) | |
| def get_new_id(items): | |
| """Возвращает новый ID для элемента.""" | |
| return max([item.get("id", 0) for item in items], default=0) + 1 | |
| # --- Функции для работы с продажами --- | |
| def record_sales(): | |
| """Записывает информацию о продажах, обновляет базу данных и очищает корзину.""" | |
| with db_lock: | |
| data = load_db() | |
| cart_items = data["cart"] | |
| if not cart_items: | |
| return None, None, 0.0 | |
| sales_details = [] | |
| total_amount = 0.0 | |
| now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| sale_id = generate_token() | |
| for item in cart_items: | |
| product = next((p for p in data["products"] if p["id"] == item["product_id"]), None) | |
| if product: | |
| sale_amount = item["quantity"] * product["sale_price"] | |
| total_amount += sale_amount | |
| sales_details.append([product["id"], item["quantity"], sale_amount, now]) | |
| new_sale_id = get_new_id(data["sales"]) | |
| data["sales"].append({ | |
| "id": new_sale_id, | |
| "sale_id": sale_id, | |
| "product_id": product["id"], | |
| "quantity": item["quantity"], | |
| "total_amount": sale_amount, | |
| "sale_date": now | |
| }) | |
| data["cart"] = [] | |
| save_db(data) | |
| return sales_details, sale_id, total_amount | |
| # --- Страницы приложения --- | |
| def register(): | |
| """Страница регистрации.""" | |
| st.title('Регистрация') | |
| username = st.text_input("Введите ваше имя пользователя") | |
| admin_password = st.text_input("Введите пароль администратора", type="password") | |
| if st.button("Зарегистрироваться"): | |
| if admin_password == "morshenfullsumflpol": | |
| token = generate_token() | |
| with db_lock: | |
| data = load_db() | |
| if any(u["username"] == username for u in data["users"]): | |
| st.error("Это имя пользователя уже занято. Попробуйте другое.") | |
| else: | |
| new_id = get_new_id(data["users"]) | |
| data["users"].append({"id": new_id, "username": username, "token": token}) | |
| save_db(data) | |
| st.success(f"Регистрация успешна! Ваш токен: {token}") | |
| else: | |
| st.error("Неверный пароль администратора!") | |
| def login(): | |
| """Страница авторизации.""" | |
| st.title('Авторизация') | |
| token_input = st.text_input("Введите ваш токен") | |
| if st.button("Войти"): | |
| data = load_db() | |
| user = next((u for u in data["users"] if u["token"] == token_input), None) | |
| if user: | |
| st.session_state.logged_in = True | |
| st.session_state.username = user["username"] | |
| st.session_state.user_id = user["id"] | |
| st.success("Добро пожаловать!") | |
| else: | |
| st.error("Неверный токен!") | |
| def add_product(): | |
| """Страница добавления товара.""" | |
| st.title("Добавление товара") | |
| product_name = st.text_input("Название товара").strip().lower() | |
| product_description = st.text_area("Описание товара") | |
| purchase_price = st.number_input("Приходная цена", min_value=0.0, step=0.01) | |
| sale_price = st.number_input("Отпускная цена", min_value=0.0, step=0.01) | |
| product_quantity = st.number_input("Количество на складе", min_value=0, step=1) | |
| if st.button("Добавить товар"): | |
| if product_name and sale_price: | |
| with db_lock: | |
| data = load_db() | |
| new_id = get_new_id(data["products"]) | |
| data["products"].append({ | |
| "id": new_id, | |
| "name": product_name, | |
| "description": product_description, | |
| "purchase_price": purchase_price, | |
| "sale_price": sale_price, | |
| "quantity_in_stock": product_quantity, | |
| "user_id": st.session_state.user_id | |
| }) | |
| save_db(data) | |
| st.success("Товар успешно добавлен!") | |
| else: | |
| st.error("Пожалуйста, введите все обязательные данные.") | |
| def edit_products(): | |
| """Страница редактирования товара.""" | |
| st.title("Редактирование товара") | |
| data = load_db() | |
| products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id] | |
| if products: | |
| product_names = [p["name"] for p in products] | |
| selected_product_name = st.selectbox("Выберите товар", product_names) | |
| product = next(p for p in products if p["name"] == selected_product_name) | |
| new_name = st.text_input("Новое название товара", product["name"]) | |
| new_description = st.text_area("Новое описание товара", product["description"]) | |
| new_purchase_price = st.number_input("Новая приходная цена", min_value=0.0, step=0.01, value=product["purchase_price"]) | |
| new_sale_price = st.number_input("Новая отпускная цена", min_value=0.0, step=0.01, value=product["sale_price"]) | |
| new_quantity_in_stock = st.number_input("Новое количество на складе", min_value=0, step=1, value=product["quantity_in_stock"]) | |
| if st.button("Сохранить изменения"): | |
| with db_lock: | |
| data = load_db() | |
| for p in data["products"]: | |
| if p["id"] == product["id"]: | |
| p["name"] = new_name | |
| p["description"] = new_description | |
| p["purchase_price"] = new_purchase_price | |
| p["sale_price"] = new_sale_price | |
| p["quantity_in_stock"] = new_quantity_in_stock | |
| break | |
| save_db(data) | |
| st.success("Товар успешно обновлен!") | |
| if st.button("Удалить товар"): | |
| with db_lock: | |
| data = load_db() | |
| data["products"] = [p for p in data["products"] if p["id"] != product["id"]] | |
| save_db(data) | |
| st.success("Товар успешно удален!") | |
| else: | |
| st.info("У вас пока нет добавленных товаров.") | |
| def add_to_cart(): | |
| """Страница добавления товара в корзину.""" | |
| st.title("Отпуск товара") | |
| search_term = st.text_input("Поиск товара").strip().lower() | |
| data = load_db() | |
| products = [p for p in data["products"] if p["user_id"] == st.session_state.user_id and search_term in p["name"].lower()] | |
| if products: | |
| for product in products: | |
| cols = st.columns(4) | |
| with cols[0]: | |
| st.write(product["name"]) | |
| st.write(f"**Отпускная цена**: {product['sale_price']:.2f}") | |
| st.write(f"**Остаток на складе**: {product['quantity_in_stock']}") | |
| with cols[1]: | |
| quantity = st.number_input(f"Количество для '{product['name']}'", min_value=0, max_value=product["quantity_in_stock"], key=f"quantity_{product['id']}") | |
| with cols[2]: | |
| add_button = st.button(f"Добавить в корзину", key=f"add_{product['id']}") | |
| if add_button: | |
| if quantity <= product["quantity_in_stock"]: | |
| with db_lock: | |
| data = load_db() | |
| cart = data["cart"] | |
| existing = next((item for item in cart if item["product_id"] == product["id"]), None) | |
| if existing: | |
| existing["quantity"] += quantity | |
| else: | |
| new_cart_id = get_new_id(cart) | |
| cart.append({"id": new_cart_id, "product_id": product["id"], "quantity": quantity}) | |
| for p in data["products"]: | |
| if p["id"] == product["id"]: | |
| p["quantity_in_stock"] -= quantity | |
| break | |
| save_db(data) | |
| st.success(f"Товар '{product['name']}' успешно добавлен в корзину!") | |
| else: | |
| st.error(f"Недостаточное количество товара '{product['name']}' на складе!") | |
| with cols[3]: | |
| remove_button = st.button(f"Удалить из корзины", key=f"remove_{product['id']}") | |
| if remove_button: | |
| if quantity > 0: | |
| with db_lock: | |
| data = load_db() | |
| cart = data["cart"] | |
| existing = next((item for item in cart if item["product_id"] == product["id"]), None) | |
| if existing: | |
| if existing["quantity"] > quantity: | |
| existing["quantity"] -= quantity | |
| else: | |
| cart.remove(existing) | |
| for p in data["products"]: | |
| if p["id"] == product["id"]: | |
| p["quantity_in_stock"] += quantity | |
| break | |
| save_db(data) | |
| st.success(f"Товар '{product['name']}' успешно удалён из корзины!") | |
| else: | |
| st.error(f"Введите количество для удаления товара '{product['name']}'.") | |
| st.subheader("Состояние корзины") | |
| data = load_db() | |
| cart_items = [] | |
| for item in data["cart"]: | |
| prod = next((p for p in data["products"] if p["id"] == item["product_id"]), None) | |
| if prod: | |
| total = item["quantity"] * prod["sale_price"] | |
| cart_items.append([prod["name"], item["quantity"], prod["sale_price"], total]) | |
| if cart_items: | |
| df = pd.DataFrame(cart_items, columns=["Название", "Количество", "Цена за единицу", "Итого"]) | |
| st.dataframe(df.style.format({"Цена за единицу": "{:.2f}", "Итого": "{:.2f}"}), use_container_width=True) | |
| total_quantity = sum(item[1] for item in cart_items) | |
| total_price = sum(item[3] for item in cart_items) | |
| st.write(f"Общее количество: {total_quantity}, Общая стоимость: {total_price:.2f}") | |
| if st.button("Пробить"): | |
| sales_details, sale_id, total_amount = record_sales() | |
| if sales_details: | |
| st.write("**Детали сделок:**") | |
| sale_details_df = pd.DataFrame(sales_details, columns=["ID товара", "Количество", "Сумма", "Дата и время"]) | |
| st.dataframe(sale_details_df.style.format({"Сумма": "{:.2f}", | |
| "Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), | |
| use_container_width=True) | |
| st.write(f"**Общая сумма сделки:** {total_amount:.2f}") | |
| st.success("Корзина успешно пробита! Все товары добавлены в отчет и корзина очищена.") | |
| else: | |
| st.info("Корзина пуста.") | |
| else: | |
| st.info("По вашему запросу товары не найдены.") | |
| def monthly_report(): | |
| """Страница отчета о продажах за месяц.""" | |
| st.title("Отчет о продажах за месяц") | |
| data = load_db() | |
| start_date = datetime.now().replace(day=1) | |
| end_date = (datetime.now() + timedelta(days=31)).replace(day=1) | |
| sales = [] | |
| for sale in data["sales"]: | |
| sale_date = datetime.strptime(sale["sale_date"], "%Y-%m-%d %H:%M:%S") | |
| if start_date <= sale_date < end_date: | |
| prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) | |
| if prod and prod["user_id"] == st.session_state.user_id: | |
| sales.append({ | |
| "name": prod["name"], | |
| "quantity": sale["quantity"], | |
| "sale_price": prod["sale_price"], | |
| "total_sales": sale["total_amount"], | |
| "profit": sale["quantity"] * (prod["sale_price"] - prod["purchase_price"]) | |
| }) | |
| report = {} | |
| for s in sales: | |
| key = s["name"] | |
| if key not in report: | |
| report[key] = {"quantity": 0, "sale_price": s["sale_price"], "total_sales": 0, "profit": 0} | |
| report[key]["quantity"] += s["quantity"] | |
| report[key]["total_sales"] += s["total_sales"] | |
| report[key]["profit"] += s["profit"] | |
| report_data = [] | |
| for name, values in report.items(): | |
| report_data.append([name, values["quantity"], values["sale_price"], values["total_sales"], values["profit"]]) | |
| total_sales = sum(item[3] for item in report_data) | |
| total_profit = sum(item[4] for item in report_data) | |
| if report_data: | |
| df = pd.DataFrame(report_data, columns=["Название", "Общее количество", "Отпускная цена", "Общие продажи", "Прибыль"]) | |
| st.write(f"Отчет за {datetime.now().strftime('%B %Y')}") | |
| st.dataframe(df.style.format({"Отпускная цена": "{:.2f}", "Общие продажи": "{:.2f}", "Прибыль": "{:.2f}"}), use_container_width=True) | |
| st.write(f"**Общая сумма продаж**: {total_sales:.2f}") | |
| st.write(f"**Общая сумма прибыли**: {total_profit:.2f}") | |
| if st.button("Сделки"): | |
| data = load_db() | |
| sales_grouped = {} | |
| for sale in data["sales"]: | |
| prod = next((p for p in data["products"] if p["id"] == sale["product_id"]), None) | |
| if prod and prod["user_id"] == st.session_state.user_id: | |
| key = sale["sale_id"] | |
| if key not in sales_grouped: | |
| sales_grouped[key] = {"sale_date": sale["sale_date"], "total_amount": 0, "details": []} | |
| sales_grouped[key]["total_amount"] += sale["total_amount"] | |
| sales_grouped[key]["details"].append([prod["name"], sale["quantity"], prod["sale_price"], sale["sale_date"]]) | |
| for key, group in sales_grouped.items(): | |
| st.write(f"**Сделка ID:** {key} ({group['sale_date']}) - **Общая сумма:** {group['total_amount']:.2f}") | |
| df_sales = pd.DataFrame(group["details"], columns=["Название товара", "Количество", "Цена за единицу", "Дата и время"]) | |
| st.dataframe(df_sales.style.format({"Дата и время": lambda x: pd.to_datetime(x).strftime('%d-%m-%Y %H:%M:%S')}), | |
| use_container_width=True) | |
| else: | |
| st.info("Нет данных о продажах за этот месяц.") | |
| # --- Главная функция приложения --- | |
| def main(): | |
| """Главная функция приложения.""" | |
| st.warning("Попытка скачивания базы данных из Hugging Face...") | |
| download_success = download_db_from_hf() | |
| if download_success: | |
| st.success("JSON база успешно скачана из Hugging Face!") | |
| try: | |
| os.chmod(DB_FILENAME, 0o666) | |
| st.info("Права доступа к JSON базе изменены (read-write).") | |
| except Exception as e: | |
| st.error(f"Не удалось изменить права доступа: {e}") | |
| else: | |
| if os.path.exists(DB_FILENAME): | |
| st.info("Не удалось скачать JSON базу, используется локальная копия.") | |
| else: | |
| st.error("Не удалось скачать JSON базу и локальной базы нет. Будет создана новая база.") | |
| with open(DB_FILENAME, "w", encoding="utf8") as f: | |
| json.dump({"users": [], "products": [], "cart": [], "sales": []}, f, ensure_ascii=False, indent=4) | |
| if 'logged_in' not in st.session_state: | |
| st.session_state.logged_in = False | |
| st.sidebar.title("Навигация") | |
| if st.session_state.logged_in: | |
| st.sidebar.title(f"Привет, {st.session_state.username}!") | |
| option = st.sidebar.selectbox("Выберите действие", ["Добавить товар", "Отпуск товара", "Редактировать товары", "Отчет за месяц", "Выйти"]) | |
| if option == "Добавить товар": | |
| add_product() | |
| elif option == "Отпуск товара": | |
| add_to_cart() | |
| elif option == "Редактировать товары": | |
| edit_products() | |
| elif option == "Отчет за месяц": | |
| monthly_report() | |
| elif option == "Выйти": | |
| st.session_state.logged_in = False | |
| st.session_state.username = None | |
| st.session_state.user_id = None | |
| st.success("Вы вышли из системы!") | |
| else: | |
| page = st.sidebar.selectbox("Выберите страницу", ["Авторизация", "Регистрация"]) | |
| if page == "Регистрация": | |
| register() | |
| elif page == "Авторизация": | |
| login() | |
| if __name__ == "__main__": | |
| # Запускаем фоновый поток резервного копирования сразу | |
| backup_thread = threading.Thread(target=periodic_backup, daemon=True) | |
| backup_thread.start() | |
| main() | |