from flask import Flask, render_template_string, request, redirect, url_for, session, send_from_directory import json import os import logging import threading import time from datetime import datetime from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError from werkzeug.utils import secure_filename app = Flask(__name__) app.secret_key = 'your_unique_secret_key_12345' # Уникальный секретный ключ (Лучше сменить на более сложный) DATA_FILE = 'data_soola.json' USERS_FILE = 'users_soola.json' # Список файлов для синхронизации SYNC_FILES = [DATA_FILE, USERS_FILE] # Настройки Hugging Face REPO_ID = "Kgshop/Soola" # Рекомендуется сменить на репозиторий для Soola Cosmetics HF_TOKEN_WRITE = os.getenv("HF_TOKEN") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") # Адрес магазина STORE_ADDRESS = "Рынок Дордой, Джунхай, терминал, 38" # Валюта (только сом) CURRENCY_SYMBOL = 'с' CURRENCY_CODE = 'KGS' # Настройка логирования logging.basicConfig(level=logging.DEBUG) def load_data(): """Загрузка данных о товарах и категориях.""" try: # Попытка скачать актуальные данные перед загрузкой try: download_db_from_hf() except Exception as e: logging.warning(f"Не удалось скачать данные с Hugging Face при запуске: {e}. Используется локальная копия.") with open(DATA_FILE, 'r', encoding='utf-8') as file: data = json.load(file) logging.info("Данные успешно загружены из JSON") # Проверка структуры данных if not isinstance(data, dict): # Если старый формат (просто список продуктов), преобразуем if isinstance(data, list): return {'products': data, 'categories': []} else: return {'products': [], 'categories': []} if 'products' not in data: data['products'] = [] if 'categories' not in data: data['categories'] = [] return data except FileNotFoundError: logging.warning(f"{DATA_FILE} не найден. Создание пустой структуры данных.") return {'products': [], 'categories': []} except json.JSONDecodeError: logging.error(f"Ошибка: Невозможно декодировать JSON файл {DATA_FILE}.") return {'products': [], 'categories': []} except Exception as e: logging.error(f"Произошла непредвиденная ошибка при загрузке данных: {e}") return {'products': [], 'categories': []} def save_data(data): """Сохранение данных о товарах и категориях.""" try: with open(DATA_FILE, 'w', encoding='utf-8') as file: json.dump(data, file, ensure_ascii=False, indent=4) logging.info(f"Данные успешно сохранены в {DATA_FILE}") # Загружаем на HF после сохранения upload_db_to_hf(DATA_FILE) except Exception as e: logging.error(f"Ошибка при сохранении данных в {DATA_FILE}: {e}") # Не прерываем работу приложения, но логируем ошибку # raise # Можно раскомментировать, если критично прерывать работу при ошибке сохранения def load_users(): """Загрузка данных пользователей.""" try: # Попытка скачать актуальные данные перед загрузкой try: download_db_from_hf() except Exception as e: logging.warning(f"Не удалось скачать данные пользователей с Hugging Face при запуске: {e}. Используется локальная копия.") with open(USERS_FILE, 'r', encoding='utf-8') as file: users = json.load(file) logging.info("Данные пользователей успешно загружены") return users if isinstance(users, dict) else {} except FileNotFoundError: logging.warning(f"{USERS_FILE} не найден. Возвращен пустой словарь пользователей.") return {} except json.JSONDecodeError: logging.error(f"Ошибка: Невозможно декодировать JSON файл {USERS_FILE}.") return {} except Exception as e: logging.error(f"Произошла непредвиденная ошибка при загрузке пользователей: {e}") return {} def save_users(users): """Сохранение данных пользователей.""" try: with open(USERS_FILE, 'w', encoding='utf-8') as file: json.dump(users, file, ensure_ascii=False, indent=4) logging.info(f"Данные пользователей успешно сохранены в {USERS_FILE}") # Загружаем на HF после сохранения upload_db_to_hf(USERS_FILE) except Exception as e: logging.error(f"Ошибка при сохранении данных пользователей в {USERS_FILE}: {e}") def upload_db_to_hf(filename_to_upload=None): """Загрузка файлов на Hugging Face. Если filename_to_upload не указан, загружает все файлы из SYNC_FILES.""" if not HF_TOKEN_WRITE: logging.warning("HF_TOKEN (токен для записи) не установлен. Загрузка на Hugging Face отключена.") return try: api = HfApi() files_to_sync = [filename_to_upload] if filename_to_upload else SYNC_FILES for file_name in files_to_sync: if os.path.exists(file_name): try: api.upload_file( path_or_fileobj=file_name, path_in_repo=file_name, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Автоматическое резервное копирование {file_name} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) logging.info(f"Резервная копия {file_name} успешно загружена на Hugging Face.") except Exception as e: logging.error(f"Ошибка при загрузке файла {file_name} на Hugging Face: {e}") else: logging.warning(f"Файл {file_name} не найден для загрузки на Hugging Face.") except Exception as e: logging.error(f"Общая ошибка при инициализации или загрузке на Hugging Face: {e}") def download_db_from_hf(): """Скачивание файлов с Hugging Face.""" if not HF_TOKEN_READ and not HF_TOKEN_WRITE: logging.warning("HF_TOKEN_READ и HF_TOKEN (токен для чтения/записи) не установлены. Скачивание с Hugging Face отключено.") return # Используем токен для записи, если токен для чтения не задан token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE try: api = HfApi() # Api() не используется для скачивания, но можно оставить для единообразия for file_name in SYNC_FILES: try: hf_hub_download( repo_id=REPO_ID, filename=file_name, repo_type="dataset", token=token_to_use, local_dir=".", local_dir_use_symlinks=False, # Важно для корректной перезаписи force_download=True # Принудительно скачивать, чтобы получить свежую версию ) logging.info(f"Файл {file_name} успешно скачан из Hugging Face.") except RepositoryNotFoundError: logging.error(f"Репозиторий {REPO_ID} не найден на Hugging Face. Скачивание {file_name} невозможно.") # Не прерываем скачивание остальных файлов except Exception as e: # Ловим более конкретные ошибки по файлам # Проверяем, является ли ошибка 'Not Found' (файл не существует в репо) if "404 Client Error" in str(e) or "EntryNotFoundError" in str(e): logging.warning(f"Файл {file_name} не найден в репозитории {REPO_ID}. Пропускаем скачивание.") else: logging.error(f"Ошибка при скачивании файла {file_name} из Hugging Face: {e}") except Exception as e: # Общая ошибка на уровне всего процесса скачивания logging.error(f"Общая ошибка при скачивании файлов с Hugging Face: {e}") # Не прерываем работу приложения, но логируем # raise # Можно раскомментировать, если критично прервать работу при ошибке скачивания def periodic_backup(): """Периодическая загрузка всех файлов на Hugging Face.""" while True: time.sleep(800) # 13 минут 20 секунд logging.info("Запуск периодического резервного копирования...") upload_db_to_hf() # Загружаем все файлы @app.route('/') def catalog(): data = load_data() products = data.get('products', []) categories = data.get('categories', []) is_authenticated = 'user' in session catalog_html = ''' Soola Cosmetics - Каталог

Soola Cosmetics

{{ store_address }}
{% for category in categories %} {% endfor %}
{% if products %} {% for product in products %}
{% if product.get('photos') and product['photos']|length > 0 %}
{{ product['name'] }}
{% else %}
No Image Available
{% endif %}

{{ product['name'] }}

{% if is_authenticated %}
{{ "%.2f"|format(product['price']) }} {{ currency_symbol }}
{% else %}
Цена по запросу
{% endif %}

{{ product['description'][:100] }}{% if product['description']|length > 100 %}...{% endif %}

{% if is_authenticated %} {% endif %}
{% endfor %} {% else %}

Товары пока не добавлены.

{% endif %}
''' return render_template_string( catalog_html, products=products, categories=categories, repo_id=REPO_ID, is_authenticated=is_authenticated, store_address=STORE_ADDRESS, session=session, currency_symbol=CURRENCY_SYMBOL # Передаем символ валюты ) @app.route('/product/') def product_detail(index): data = load_data() products = data.get('products', []) is_authenticated = 'user' in session if index < 0 or index >= len(products): return "Товар не найден", 404 product = products[index] detail_html = '''

{{ product['name'] }}

{% if product.get('photos') and product['photos']|length > 0 %} {% for photo in product['photos'] %}
{{ product['name'] }} - фото {{ loop.index }}
{% endfor %} {% else %}
Нет изображения
{% endif %}

Категория: {{ product.get('category', 'Без категории') }}

{% if is_authenticated %}

Цена: {{ "%.2f"|format(product['price']) }} {{ currency_symbol }}

{% else %}

Цена: Доступна после входа

{% endif %}

Описание:
{{ product['description'] | replace('\\n', '
') | safe }}

{% if product.get('colors') and product['colors']|length > 0 %}

Доступные варианты/цвета: {{ product['colors']|join(', ') }}

{% endif %}
''' return render_template_string( detail_html, product=product, repo_id=REPO_ID, is_authenticated=is_authenticated, currency_symbol=CURRENCY_SYMBOL # Передаем символ валюты ) # Маршрут /set_currency убран, так как валюта теперь одна # Маршрут /register убран, регистрация теперь только через админку @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': login = request.form.get('login') password = request.form.get('password') users = load_users() if login in users and users[login].get('password') == password: # Проверяем наличие ключа 'password' session['user'] = login # Дополнительные данные пользователя в сессию больше не кладем (country, city) # Валюта теперь не нужна в сессии logging.info(f"Пользователь {login} успешно вошел в систему.") # Сохраняем логин в localStorage для авто-логина login_response_html = '''

Вход выполнен успешно. Перенаправление...

''' return render_template_string(login_response_html, login=login) # return redirect(url_for('catalog')) # Старый вариант без localStorage else: logging.warning(f"Неудачная попытка входа для пользователя: {login}") error_message = "Неверный логин или пароль." # Возвращаем страницу входа с сообщением об ошибке return render_template_string(get_login_form_html(), error=error_message) # GET запрос - просто показываем форму входа return render_template_string(get_login_form_html()) def get_login_form_html(): """Возвращает HTML-код формы входа.""" return ''' Вход - Soola Cosmetics

Вход в Soola Cosmetics

{% if error %}
{{ error }}
{% endif %}
Вернуться в каталог
''' @app.route('/auto_login', methods=['POST']) def auto_login(): """Попытка автоматического входа на основе данных из localStorage.""" data = request.get_json() login = data.get('login') if not login: return "Логин не предоставлен", 400 users = load_users() if login in users: # Пароль для авто-логина не проверяем (риск безопасности, но удобно) # Если нужна проверка, нужно будет передавать и сохраненный хеш/токен пароля session['user'] = login logging.info(f"Пользователь {login} автоматически вошел в систему.") return "OK", 200 else: logging.warning(f"Авто-логин не удался: пользователь {login} не найден.") # Важно вернуть ошибку, чтобы клиентский JS удалил невалидный 'user' из localStorage return "Пользователь не найден", 404 @app.route('/logout') def logout(): user = session.get('user') session.pop('user', None) # Очистка других данных сессии, если они были session.pop('country', None) # На всякий случай session.pop('city', None) # На всякий случай session.pop('currency', None) # На всякий случай logging.info(f"Пользователь {user or 'Неизвестный'} вышел из системы.") # Очищаем localStorage при выходе logout_response_html = '''

Выход выполнен. Перенаправление...

''' return render_template_string(logout_response_html) # return redirect(url_for('catalog')) # Старый вариант @app.route('/admin', methods=['GET', 'POST']) def admin(): # Простая проверка "админа" - первый зарегистрированный пользователь? # Или лучше задать конкретный логин администратора ADMIN_USER = os.getenv("ADMIN_LOGIN", "admin") # Можно задать через переменную окружения if 'user' not in session or session['user'] != ADMIN_USER: # Если не админ, перенаправляем на страницу входа или главную logging.warning(f"Попытка неавторизованного доступа к /admin пользователем {session.get('user')}") return redirect(url_for('login')) # Или url_for('catalog') data = load_data() products = data.get('products', []) categories = data.get('categories', []) users = load_users() # Удаляем админа из списка пользователей для отображения (чтобы нельзя было случайно удалить) users_display = {login: udata for login, udata in users.items() if login != ADMIN_USER} if request.method == 'POST': action = request.form.get('action') logging.debug(f"Admin POST action: {action}") try: # Обернем обработку POST в try-except для отлова ошибок if action == 'add_category': category_name = request.form.get('category_name', '').strip() if category_name and category_name not in categories: categories.append(category_name) categories.sort() # Сортируем категории save_data(data) logging.info(f"Категория '{category_name}' добавлена администратором {session['user']}.") elif not category_name: logging.warning("Попытка добавить пустую категорию.") else: logging.warning(f"Попытка добавить существующую категорию: {category_name}") return redirect(url_for('admin')) elif action == 'delete_category': category_index_str = request.form.get('category_index') if category_index_str is not None: try: category_index = int(category_index_str) if 0 <= category_index < len(categories): deleted_category = categories.pop(category_index) logging.info(f"Категория '{deleted_category}' удалена администратором {session['user']}.") # Обновляем товары, у которых была эта категория for product in products: if product.get('category') == deleted_category: product['category'] = 'Без категории' save_data(data) else: logging.warning(f"Неверный индекс категории для удаления: {category_index}") except ValueError: logging.warning(f"Неверный формат индекса категории: {category_index_str}") return redirect(url_for('admin')) elif action == 'add': name = request.form.get('name', '').strip() price_str = request.form.get('price', '0').replace(',', '.') description = request.form.get('description', '').strip() category = request.form.get('category') photos_files = request.files.getlist('photos') # Получаем цвета, убираем пустые строки colors = [c.strip() for c in request.form.getlist('colors') if c.strip()] if not name or not description: # Цена может быть 0 logging.warning("Попытка добавить товар без имени или описания.") # Нужно вернуть ошибку пользователю return "Ошибка: Название и Описание обязательны.", 400 try: price = round(float(price_str), 2) if price < 0: price = 0.0 # Цена не может быть отрицательной except ValueError: logging.warning(f"Неверный формат цены: {price_str}. Установлена цена 0.") price = 0.0 photos_list = [] if photos_files and HF_TOKEN_WRITE: # Загрузка фото только если есть токен uploads_dir = 'uploads_temp' # Временная папка os.makedirs(uploads_dir, exist_ok=True) api = HfApi() for photo in photos_files[:10]: # Ограничение на 10 фото if photo and photo.filename: try: # Используем безопасное имя + таймстемп для уникальности base, ext = os.path.splitext(photo.filename) safe_base = secure_filename(base) timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") photo_filename = f"{safe_base}_{timestamp}{ext}" temp_path = os.path.join(uploads_dir, photo_filename) photo.save(temp_path) logging.debug(f"Фото сохранено временно: {temp_path}") # Загружаем на HF path_in_repo = f"photos/{photo_filename}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Добавлено фото {photo_filename} для товара {name}" ) photos_list.append(photo_filename) logging.info(f"Фото {photo_filename} загружено на HF: {path_in_repo}") # Удаляем временный файл if os.path.exists(temp_path): os.remove(temp_path) logging.debug(f"Временное фото удалено: {temp_path}") except Exception as e: logging.error(f"Ошибка при обработке или загрузке фото {photo.filename}: {e}") # Удаляем временный файл, если он остался после ошибки if 'temp_path' in locals() and os.path.exists(temp_path): try: os.remove(temp_path) except Exception as rem_e: logging.error(f"Ошибка при удалении временного файла после ошибки: {rem_e}") # Очищаем временную папку, если она пуста if not os.listdir(uploads_dir): os.rmdir(uploads_dir) elif photos_files and not HF_TOKEN_WRITE: logging.warning("HF_TOKEN для записи не установлен, фото не будут загружены.") new_product = { 'id': f"prod_{int(time.time() * 1000)}", # Простой уникальный ID на основе времени 'name': name, 'price': price, # Цена в сомах 'description': description, 'category': category if category in categories else 'Без категории', 'photos': photos_list, 'colors': colors if colors else [] # Пустой список, если цвета не указаны } products.append(new_product) save_data(data) logging.info(f"Товар '{name}' добавлен администратором {session['user']}.") return redirect(url_for('admin')) elif action == 'edit': index_str = request.form.get('index') if index_str is None: return redirect(url_for('admin')) try: index = int(index_str) if not (0 <= index < len(products)): logging.warning(f"Попытка редактировать несуществующий товар с индексом {index}.") return redirect(url_for('admin')) name = request.form.get('name', '').strip() price_str = request.form.get('price', '0').replace(',', '.') description = request.form.get('description', '').strip() category = request.form.get('category') photos_files = request.files.getlist('photos') # Новые фото для замены # Получаем цвета, убираем пустые строки colors = [c.strip() for c in request.form.getlist('colors') if c.strip()] if not name or not description: logging.warning("Попытка сохранить товар без имени или описания при редактировании.") return "Ошибка: Название и Описание обязательны.", 400 try: price = round(float(price_str), 2) if price < 0: price = 0.0 except ValueError: logging.warning(f"Неверный формат цены при редактировании: {price_str}. Цена не изменена.") price = products[index]['price'] # Оставляем старую цену # Обработка фото: Если загружены новые фото, они *заменяют* старые new_photos_list = products[index].get('photos', []) # Начинаем со старых фото if photos_files and any(f.filename for f in photos_files) and HF_TOKEN_WRITE: logging.info(f"Загрузка новых фото для товара '{name}' (индекс {index}). Старые будут заменены.") # TODO: В идеале, нужно удалить старые фото с HF, но это сложнее. # Пока просто заменяем список в JSON. new_photos_list = [] # Очищаем список перед добавлением новых uploads_dir = 'uploads_temp' os.makedirs(uploads_dir, exist_ok=True) api = HfApi() for photo in photos_files[:10]: if photo and photo.filename: try: base, ext = os.path.splitext(photo.filename) safe_base = secure_filename(base) timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") photo_filename = f"{safe_base}_{timestamp}{ext}" temp_path = os.path.join(uploads_dir, photo_filename) photo.save(temp_path) path_in_repo = f"photos/{photo_filename}" api.upload_file( path_or_fileobj=temp_path, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Обновлено фото {photo_filename} для товара {name}" ) new_photos_list.append(photo_filename) logging.info(f"Новое фото {photo_filename} загружено на HF для товара {name}.") if os.path.exists(temp_path): os.remove(temp_path) except Exception as e: logging.error(f"Ошибка при обработке/загрузке нового фото {photo.filename}: {e}") if 'temp_path' in locals() and os.path.exists(temp_path): try: os.remove(temp_path) except Exception as rem_e: logging.error(f"Ошибка при удалении временного файла после ошибки: {rem_e}") if not os.listdir(uploads_dir): os.rmdir(uploads_dir) elif photos_files and not HF_TOKEN_WRITE: logging.warning("HF_TOKEN для записи не установлен, новые фото не будут загружены при редактировании.") # new_photos_list остается равным старым фото # Обновляем данные продукта products[index]['name'] = name products[index]['price'] = price products[index]['description'] = description products[index]['category'] = category if category in categories else 'Без категории' products[index]['photos'] = new_photos_list # Обновляем список фото products[index]['colors'] = colors if colors else [] # Обновляем цвета save_data(data) logging.info(f"Товар '{name}' (индекс {index}) обновлен администратором {session['user']}.") except ValueError: logging.warning(f"Неверный формат индекса товара для редактирования: {index_str}") except Exception as e: logging.error(f"Непредвиденная ошибка при редактировании товара: {e}") return redirect(url_for('admin')) elif action == 'delete': index_str = request.form.get('index') if index_str is not None: try: index = int(index_str) if 0 <= index < len(products): deleted_product = products.pop(index) # TODO: В идеале, удалить и фото с HF save_data(data) logging.info(f"Товар '{deleted_product.get('name', 'N/A')}' (индекс {index}) удален администратором {session['user']}.") else: logging.warning(f"Попытка удалить несуществующий товар с индексом {index}.") except ValueError: logging.warning(f"Неверный формат индекса товара для удаления: {index_str}") return redirect(url_for('admin')) # --- Управление пользователями --- elif action == 'add_user': login = request.form.get('login', '').strip() password = request.form.get('password', '').strip() if login and password: if login in users: logging.warning(f"Попытка добавить существующего пользователя: {login}") # Можно вернуть ошибку на страницу админа else: users[login] = {'password': password} # Храним пароль открытым текстом (НЕ РЕКОМЕНДУЕТСЯ!) # В реальном приложении нужно использовать хеширование паролей (например, Werkzeug security) save_users(users) logging.info(f"Пользователь '{login}' добавлен администратором {session['user']}.") else: logging.warning("Попытка добавить пользователя с пустым логином или паролем.") return redirect(url_for('admin')) elif action == 'delete_user': login_to_delete = request.form.get('login') if login_to_delete and login_to_delete != ADMIN_USER: # Нельзя удалить админа if login_to_delete in users: del users[login_to_delete] save_users(users) logging.info(f"Пользователь '{login_to_delete}' удален администратором {session['user']}.") else: logging.warning(f"Попытка удалить несуществующего пользователя: {login_to_delete}") elif login_to_delete == ADMIN_USER: logging.warning(f"Попытка удалить администратора ({ADMIN_USER}) отклонена.") return redirect(url_for('admin')) except Exception as e: logging.error(f"Критическая ошибка при обработке POST запроса в /admin: {e}", exc_info=True) # Можно показать страницу с ошибкой или просто перенаправить return redirect(url_for('admin')) # Перенаправляем в любом случае # GET запрос - отображаем страницу админки admin_html = ''' Админ-панель - Soola Cosmetics

Админ-панель Soola Cosmetics

Перейти в каталог

Синхронизация с Hugging Face

Данные автоматически сохраняются на Hugging Face при изменениях и периодически.

(Перезапишет локальные файлы!)

Управление категориями

Добавить категорию

Список категорий

{% if categories %}
    {% for category in categories %}
  • {{ category }} {% if category != 'Без категории' %}
    {% endif %}
  • {% endfor %}
{% else %}

Категорий пока нет.

{% endif %}

Добавление товара

Список товаров

{% if products %} {% for product in products %}

{{ product['name'] }}

ID: {{ product.get('id', 'N/A') }}

Категория: {{ product.get('category', 'Без категории') }}

Цена: {{ "%.2f"|format(product['price']) }} {{ currency_symbol }}

Описание: {{ product['description'][:150] }}{% if product['description']|length > 150 %}...{% endif %}

Варианты: {{ product.get('colors', ['Стандартный'])|join(', ') }}

{% if product.get('photos') and product['photos']|length > 0 %}
Фото:
{% for photo in product['photos'] %} Фото {{ loop.index }} {% endfor %}
{% else %}

Фото: Нет

{% endif %}
Редактировать
{% if product.get('colors') %} {% for color in product.get('colors', []) %}
{% endfor %} {% endif %}
{% endfor %} {% else %}

Товаров пока нет.

{% endif %}

Управление пользователями

Добавить пользователя

Список пользователей

(Администратор '{{ admin_user }}' не отображается)

{% if users_display %}
    {% for login, user_data in users_display.items() %}
  • {{ login }}
  • {% endfor %}
{% else %}

Других пользователей нет.

{% endif %}
''' return render_template_string( admin_html, products=products, categories=categories, repo_id=REPO_ID, users=users_display, # Отображаем пользователей без админа admin_user=ADMIN_USER, # Передаем логин админа currency_symbol=CURRENCY_SYMBOL ) @app.route('/backup', methods=['POST']) def backup(): """Принудительная загрузка всех данных на Hugging Face.""" # Проверка прав доступа (на всякий случай) ADMIN_USER = os.getenv("ADMIN_LOGIN", "admin") if 'user' not in session or session['user'] != ADMIN_USER: return "Доступ запрещен", 403 logging.info(f"Запрошено принудительное резервное копирование администратором {session['user']}.") upload_db_to_hf() # Загружаем все файлы из SYNC_FILES # Можно добавить сообщение об успехе/ошибке через flash или вернуть статус return redirect(url_for('admin')) # Возвращаемся в админку @app.route('/download', methods=['GET']) def download(): """Принудительное скачивание всех данных с Hugging Face.""" # Проверка прав доступа ADMIN_USER = os.getenv("ADMIN_LOGIN", "admin") if 'user' not in session or session['user'] != ADMIN_USER: return "Доступ запрещен", 403 logging.info(f"Запрошено принудительное скачивание данных с HF администратором {session['user']}.") try: download_db_from_hf() # После скачивания хорошо бы перезагрузить данные в приложении, # но это произойдет при следующем запросе к load_data/load_users. # Можно добавить сообщение об успехе except Exception as e: logging.error(f"Ошибка при принудительном скачивании с HF: {e}") # Можно добавить сообщение об ошибке return redirect(url_for('admin')) # Возвращаемся в админку if __name__ == '__main__': # Загружаем данные при старте для инициализации try: initial_data = load_data() initial_users = load_users() logging.info(f"Начальная загрузка: {len(initial_data.get('products',[]))} товаров, {len(initial_data.get('categories',[]))} категорий, {len(initial_users)} пользователей.") except Exception as e: logging.error(f"Не удалось загрузить данные при запуске приложения: {e}") # Запускаем поток для периодического резервного копирования # Убедимся, что токен для записи существует, иначе поток не нужен if HF_TOKEN_WRITE: backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() logging.info("Поток периодического резервного копирования запущен.") else: logging.warning("Периодическое резервное копирование отключено (нет HF_TOKEN).") # Запуск Flask приложения # Используйте Gunicorn или Waitress для продакшена вместо встроенного сервера Flask app.run(debug=False, host='0.0.0.0', port=7860) # debug=False для продакшена!