diff --git "a/app.py" "b/app.py" deleted file mode 100644--- "a/app.py" +++ /dev/null @@ -1,1861 +0,0 @@ - -from flask import Flask, render_template_string, request, redirect, url_for, session, send_file -import json -import os -import logging -import threading -import time -from datetime import datetime -from huggingface_hub import HfApi, hf_hub_download -from huggingface_hub.utils import RepositoryNotFoundError -from werkzeug.utils import secure_filename - -app = Flask(__name__) -app.secret_key = 'your_unique_secret_key_12345_cosmetics' # Уникальный секретный ключ -DATA_FILE = 'data_soola_cosmetics.json' -USERS_FILE = 'users_soola_cosmetics.json' - -# Список файлов для синхронизации (config.json убран) -SYNC_FILES = [DATA_FILE, USERS_FILE] - -# Настройки Hugging Face -REPO_ID = "Kgshop/Soola" # Оставляем старый или меняем на новый? Пока оставил старый. -HF_TOKEN_WRITE = os.getenv("HF_TOKEN") -HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") - -# Адрес магазина (теперь один) -STORE_ADDRESS = "Рынок Дордой , Джунхай , терминал , 38" - -# Единственная валюта -CURRENCY_CODE = 'KGS' -CURRENCY_NAME = 'Кыргызский сом (с)' - -# Настройка логирования -logging.basicConfig(level=logging.DEBUG) - -# --- Функции для работы с данными и пользователями --- - -def load_data(): - """Загрузка данных о товарах и категориях.""" - try: - # Попытка скачать актуальные файлы перед загрузкой - download_db_from_hf() - with open(DATA_FILE, 'r', encoding='utf-8') as file: - data = json.load(file) - logging.info("Данные успешно загружены из JSON") - # Проверка структуры данных - if not isinstance(data, dict) or 'products' not in data or 'categories' not in data: - logging.warning("Структура данных некорректна, инициализация пустыми списками.") - # Если data это список (старый формат), пытаемся сохранить его как категории - return {'products': [], 'categories': [] if not isinstance(data, list) else data} - return data - except FileNotFoundError: - logging.warning(f"Локальный файл {DATA_FILE} не найден. Попытка скачивания с HF.") - # Повторная попытка скачать, если первая была до создания файла - try: - download_db_from_hf() - # Если скачивание успешно, пробуем загрузить снова - with open(DATA_FILE, 'r', encoding='utf-8') as file: - data = json.load(file) - logging.info("Данные успешно загружены из JSON после скачивания.") - if not isinstance(data, dict) or 'products' not in data or 'categories' not in data: - return {'products': [], 'categories': [] if not isinstance(data, list) else data} - return data - except (RepositoryNotFoundError, FileNotFoundError, Exception) as e: - logging.error(f"Не удалось скачать или найти файл {DATA_FILE} после ошибки. Создание локальной базы данных. Ошибка: {e}") - return {'products': [], 'categories': []} # Инициализация пустой структурой - except json.JSONDecodeError: - logging.error(f"Ошибка: Невозможно декодировать JSON файл {DATA_FILE}.") - return {'products': [], 'categories': []} - except RepositoryNotFoundError: - logging.error("Репозиторий Hugging Face не найден. Создание локальной базы данных.") - return {'products': [], 'categories': []} - except Exception as e: - logging.error(f"Произошла непредвиденная ошибка при загрузке данных: {e}") - return {'products': [], 'categories': []} - - -def save_data(data): - """Сохранение данных о товарах и категориях.""" - try: - with open(DATA_FILE, 'w', encoding='utf-8') as file: - json.dump(data, file, ensure_ascii=False, indent=4) - logging.info(f"Данные успешно сохранены в {DATA_FILE}") - upload_db_to_hf() # Синхронизация после сохранения - except Exception as e: - logging.error(f"Ошибка при сохранении данных в {DATA_FILE}: {e}") - # Не пробрасываем исключение дальше, чтобы приложение не падало, - # но логируем оши��ку. В идеале, нужна стратегия обработки таких ошибок. - - -def load_users(): - """Загрузка данных пользователей.""" - try: - # Попытка скачать актуальный файл пользователей - # download_db_from_hf() # Не будем скачивать каждый раз при загрузке юзеров, только при старте и по запросу - with open(USERS_FILE, 'r', encoding='utf-8') as file: - users = json.load(file) - logging.info(f"Данные пользователей успешно загружены из {USERS_FILE}") - return users if isinstance(users, dict) else {} - except FileNotFoundError: - logging.warning(f"Локальный файл пользователей {USERS_FILE} не найден. Возвращаем пустой словарь.") - return {} - except json.JSONDecodeError: - logging.error(f"Ошибка: Невозможно декодировать JSON файл пользователей {USERS_FILE}.") - return {} - except Exception as e: - logging.error(f"Произошла ошибка при загрузке пользователей: {e}") - return {} - - -def save_users(users): - """Сохранение данных пользователей.""" - try: - with open(USERS_FILE, 'w', encoding='utf-8') as file: - json.dump(users, file, ensure_ascii=False, indent=4) - logging.info(f"Данные пользователей успешно сохранены в {USERS_FILE}") - upload_db_to_hf() # Синхронизация после сохранения - except Exception as e: - logging.error(f"Ошибка при сохранении пользователей в {USERS_FILE}: {e}") - - -# --- Функции синхронизации с Hugging Face --- - -def upload_db_to_hf(): - """Загрузка файлов данных на Hugging Face.""" - if not HF_TOKEN_WRITE: - logging.warning("Переменная окружения HF_TOKEN (WRITE) не установлена. Загрузка на Hugging Face отключена.") - return - try: - api = HfApi() - for file_name in SYNC_FILES: - if os.path.exists(file_name): - logging.info(f"Начинается загрузка файла {file_name} на Hugging Face...") - api.upload_file( - path_or_fileobj=file_name, - path_in_repo=file_name, - repo_id=REPO_ID, - repo_type="dataset", - token=HF_TOKEN_WRITE, - commit_message=f"Автоматическое резервное копирование {file_name} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" - ) - logging.info(f"Резервная копия {file_name} успешно загружена на Hugging Face.") - else: - logging.warning(f"Файл {file_name} не найден для загрузки на Hugging Face.") - except Exception as e: - logging.error(f"Ошибка при загрузке резервной копии на Hugging Face: {e}") - -def download_db_from_hf(): - """Скачивание файлов данных с Hugging Face.""" - if not HF_TOKEN_READ: - logging.warning("Переменная окружения HF_TOKEN_READ не установлена. Скачивание с Hugging Face может быть недоступно для приватных репозиториев.") - # Можно продолжать без токена для публичных репо - # return # Раскомментировать, если чтение без токена не нужно - - logging.info("Попытка скачивания файлов с Hugging Face...") - api = HfApi() # Создаем объект API здесь - for file_name in SYNC_FILES: - try: - hf_hub_download( - repo_id=REPO_ID, - filename=file_name, - repo_type="dataset", - token=HF_TOKEN_READ, # Может быть None, если HF_TOKEN_READ не установлен - local_dir=".", - local_dir_use_symlinks=False, - force_download=True, # Принудительно скачиваем для обновления - resume_download=False # Не возобновляем, скачиваем заново - ) - logging.info(f"Файл {file_name} успешно скачан/обновлен из Hugging Face.") - except RepositoryNotFoundError: - logging.error(f"Репозиторий {REPO_ID} не найден на Hugging Face.") - # Не прерываем цикл, пытаемся скачать другие файлы - continue # Переходим к следующему файлу - except Exception as e: - # Логируем ошибку для конкретного файла, но не падаем - logging.error(f"Ошибка при скачивании файла {file_name} с Hugging Face: {e}. Пропускаем.") - # Если файл не скачался, будем использовать локальную версию (если есть) - - -def periodic_backup(): - """Периодическая загрузка данных на Hugging Face.""" - while True: - logging.info("Запуск периодического резервного копирования...") - upload_db_to_hf() - logging.info("Периодическое резервное копирование завершено.") - time.sleep(1800) # Увеличил интервал до 30 минут (1800 секунд) - -# --- Маршруты Flask --- - -@app.route('/') -def catalog(): - """Главная страница каталога товаров.""" - data = load_data() - products = data.get('products', []) - categories = data.get('categories', []) - is_authenticated = 'user' in session - - catalog_html = ''' - - -
- - -{{ product['description'] }}
- - -Товары пока не добавлены.
- {% endif %} -Цена: {product["price"]} {CURRENCY_CODE}
' if is_authenticated else 'Цена: Доступна после входа
' - colors_html = f'Доступные варианты/цвета: {", ".join(product.get("colors", []))}
' if product.get('colors') else '' - - detail_html = f''' -Нет фото
Категория: {product.get('category', 'Без категории')}
- {price_html} -Описание:
{product['description'].replace(r'\\n', '
').replace(r'\n', '
')}
Вход выполнен успешно. Перенаправление...
- Перейти в каталог - ''' - return login_response_html - # return redirect(url_for('catalog')) # Стандартное перенаправление - else: - logging.warning(f"Неудачная попытка входа для пользователя: {login}") - error_message = "Неверный логин или пароль." - # Возвращаем страницу входа с сообщением об ошибке - return render_template_string(LOGIN_FORM_TEMPLATE, error=error_message) - - # Отображаем форму входа при GET запросе - return render_template_string(LOGIN_FORM_TEMPLATE, error=None) - -# Вынесем HTML формы входа в отдельную переменную для читаемости -LOGIN_FORM_TEMPLATE = ''' - - - - - -Выход выполнен. Перенаправление...
- На главную - ''' - return logout_response_html - # return redirect(url_for('catalog')) # Старое перенаправление - - -@app.route('/admin', methods=['GET', 'POST']) -def admin(): - """Административная панель.""" - # Простая проверка на админа (в реальном приложении нужна система ролей) - if session.get('user') != 'admin': # Жестко заданный логин админа - # Можно добавить проверку пароля из файла users, если админ - обычный пользователь - # users = load_users() - # if session.get('user') not in users or not users[session['user']].get('is_admin'): - # return "Доступ запрещен", 403 - return "Доступ запрещен. Только для пользователя 'admin'.", 403 - - - data = load_data() - products = data.get('products', []) - categories = data.get('categories', []) - users = load_users() - - message = None # Сообщение для отображения после действия - - if request.method == 'POST': - action = request.form.get('action') - - try: # Обернем обработку POST в try-except для перехвата ошибок - if action == 'add_category': - category_name = request.form.get('category_name', '').strip() - if category_name and category_name not in categories: - categories.append(category_name) - categories.sort() # Сортируем категории - data['categories'] = categories - save_data(data) - message = f"Категория '{category_name}' успешно добавлена." - elif not category_name: - message = "Ошибка: Название категории не может быть пустым." - else: - message = f"Ошибка: Категория '{category_name}' уже существует." - - elif action == 'delete_category': - category_to_delete = request.form.get('category_name') - if category_to_delete in categories: - categories.remove(category_to_delete) - # Обновляем товары, убирая удаленную категорию - for product in products: - if product.get('category') == category_to_delete: - product['category'] = 'Без категории' # Или можно удалять ключ 'category' - data['categories'] = categories - data['products'] = products - save_data(data) - message = f"Категория '{category_to_delete}' удалена. Товары перенесены в 'Без категории'." - else: - message = f"Ошибка: Категория '{category_to_delete}' не найдена." - - - elif action == 'add_product': - name = request.form.get('name', '').strip() - price_str = request.form.get('price', '0').replace(',', '.') - description = request.form.get('description', '').strip() - category = request.form.get('category') - photos_files = request.files.getlist('photos') - # Получаем цвета, удаляем пустые строки и дубликаты - colors = list(dict.fromkeys(filter(None, [c.strip() for c in request.form.getlist('colors')]))) - - if not name or not description: - message = "Ошибка: Название и описание товара обязательны." - else: - try: - price = round(float(price_str), 2) - if price < 0: price = 0 - except ValueError: - message = "Ошибка: Некорректный формат цены." - else: - photos_list = upload_photos(photos_files, name) # Функция для загрузки фото - - new_product = { - 'name': name, - 'price': price, # Цена теперь всегда в KGS - 'description': description, - 'category': category if category in categories else 'Без категории', - 'photos': photos_list, - 'colors': colors if colors else [] # Пустой список, если цвета не указаны - } - products.append(new_product) - data['products'] = products - save_data(data) - message = f"Товар '{name}' успешно добавлен." - - elif action == 'edit_product': - index_str = request.form.get('index') - if index_str is None: - message = "Ошибка: Не указан индекс товара для редактирования." - else: - index = int(index_str) - if 0 <= index < len(products): - name = request.form.get('name', '').strip() - price_str = request.form.get('price', '0').replace(',', '.') - description = request.form.get('description', '').strip() - category = request.form.get('category') - photos_files = request.files.getlist('photos') - colors = list(dict.fromkeys(filter(None, [c.strip() for c in request.form.getlist('colors')]))) - - if not name or not description: - message = "Ошибка: Название и описание товара обязательны." - else: - try: - price = round(float(price_str), 2) - if price < 0: price = 0 - except ValueError: - message = "Ошибка: Некорректный формат цены." - else: - # Загружаем новые фото, если они были выбраны - if photos_files and any(f.filename for f in photos_files): - # Можно добавить логику удаления старых фото с HF, если нужно - new_photos_list = upload_photos(photos_files, name) - products[index]['photos'] = new_photos_list - message = "Фотографии обновлены. " - - products[index]['name'] = name - products[index]['price'] = price - products[index]['description'] = description - products[index]['category'] = category if category in categories else 'Без категории' - products[index]['colors'] = colors if colors else [] - data['products'] = products - save_data(data) - message = (message or "") + f"Товар '{name}' успешно обновлен." - else: - message = "Ошибка: Неверный индекс товара для редактирования." - - - elif action == 'delete_product': - index_str = request.form.get('index') - if index_str is None: - message = "Ошибка: Не указан индекс товара для удаления." - else: - index = int(index_str) - if 0 <= index < len(products): - deleted_product_name = products[index]['name'] - # Можно добавить удаление фото с HF перед удалением товара - # delete_photos_from_hf(products[index].get('photos', [])) - del products[index] - data['products'] = products - save_data(data) - message = f"Товар '{deleted_product_name}' удален." - else: - message = "Ошибка: Неверный индекс товара для удаления." - - - # --- Управление пользователями (Новое) --- - elif action == 'add_user': - login = request.form.get('login', '').strip() - password = request.form.get('password', '').strip() - first_name = request.form.get('first_name', '').strip() - last_name = request.form.get('last_name', '').strip() - country = request.form.get('country', '').strip() - city = request.form.get('city', '').strip() - - if not login or not password: - message = "Ошибка: Логин и пароль обязательны для нового пользователя." - elif login in users: - message = f"Ошибка: Пользователь с логином '{login}' уже существует." - elif login == 'admin' and session.get('user') != 'admin': # Запрет создавать второго админа? - message = "Ошибка: Логин 'admin' зарезервирован." - else: - users[login] = { - 'password': password, # В реальном приложении пароль нужно хешировать! - 'first_name': first_name, - 'last_name': last_name, - 'country': country, - 'city': city, - # 'is_admin': False # Можно добавить поле для ролей - } - save_users(users) - message = f"Пользователь '{login}' успешно зарегистрирован." - - elif action == 'delete_user': - login_to_delete = request.form.get('login') - if not login_to_delete: - message = "Ошибка: Не указан логин пользователя для удаления." - elif login_to_delete == 'admin': # Запрет на удаление админа - message = "Ошибка: Нельзя удалить основного администратора ('admin')." - elif login_to_delete in users: - del users[login_to_delete] - save_users(users) - message = f"Пользователь '{login_to_delete}' удален." - else: - message = f"Ошибка: Пользователь '{login_to_delete}' не найден." - - except Exception as e: - logging.error(f"Ошибка в админ-панели при обработке действия '{action}': {e}", exc_info=True) - message = f"Произошла внутренняя ошибка: {e}" - - # После POST запроса перенаправляем на GET, чтобы избежать повторной отправки формы - # Передаем сообщение через сессию или параметр URL (менее безопасно) - if message: - session['admin_message'] = message # Используем сессию для передачи сообщения - return redirect(url_for('admin')) - - # Для GET запроса или после редиректа - admin_message = session.pop('admin_message', None) # Получаем сообщение из сессии - - # Загружаем актуальные данные для отображения - data = load_data() - products = data.get('products', []) - categories = data.get('categories', []) - users = load_users() - - - admin_html = ''' - - - - - -Категорий пока нет.
- {% endfor %} -- Примечание: Данные также автоматически синхронизируются каждые 30 минут. Скачивание перезапишет локальные несохраненные изменения. -
-Категория: {{ product.get('category', 'Без категории') }}
-Цена: {{ product['price'] }} {{ currency_code }}
-Описание: {{ product['description'][:100] }}{% if product['description']|length > 100 %}...{% endif %}
-Варианты: {{ product.get('colors', ['Стандарт'])|join(', ') }}
- {% if product.get('photos') %} -Товаров пока нет.
- {% endfor %} -Логин: {{ login }}
-Имя: {{ user_data.get('first_name', 'Не указано') }} {{ user_data.get('last_name', '') }}
-Локация: {{ user_data.get('city', 'Город не указан') }}, {{ user_data.get('country', 'Страна не указана') }}
- {# Не отображаем пароль! #} -Пользователей (кроме админа) пока нет.
- {% endfor %} - {# Отображаем админа отдельно, без кнопки удаления #} - {% if 'admin' in users %} -Логин: admin (Администратор)
-Имя: {{ users['admin'].get('first_name', '') }} {{ users['admin'].get('last_name', '') }}
-Локация: {{ users['admin'].get('city', '') }}, {{ users['admin'].get('country', '') }}
-