from flask import Flask, render_template_string, request, redirect, url_for, send_from_directory
import json
import os
import logging
import threading
import time
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError
from werkzeug.utils import secure_filename
app = Flask(__name__)
DATA_FILE = 'data.json'
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Настройки Hugging Face
REPO_ID = "Kgshop/glasman"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
# Ссылка на логотип
LOGO_URL = "https://huggingface.co/spaces/GlasmanGL/shop/resolve/main/gl_glasman-20250423-0001.jpg"
# Настройка логирования
logging.basicConfig(level=logging.INFO) # Changed level to INFO for production clarity
def load_data():
try:
# Ensure local file exists before attempting to load
if not os.path.exists(DATA_FILE):
logging.info(f"{DATA_FILE} not found locally, attempting download from HF.")
download_db_from_hf() # Download if it doesn't exist
# Check again if download succeeded
if not os.path.exists(DATA_FILE):
logging.warning("Local database file still not found after download attempt. Starting with empty data.")
return {'products': [], 'categories': []}
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info("Data successfully loaded from JSON")
if not isinstance(data, dict) or 'products' not in data or 'categories' not in data:
logging.warning("JSON structure is invalid or incomplete. Resetting to default structure.")
# Handle case where data is just a list (old format?) or missing keys
if isinstance(data, list): # Assuming list might be old product list
return {'products': data, 'categories': []}
else:
return {'products': [], 'categories': []}
# Ensure products and categories are lists
if not isinstance(data.get('products'), list):
data['products'] = []
if not isinstance(data.get('categories'), list):
data['categories'] = []
return data
except FileNotFoundError:
# This case should ideally be handled by the initial check and download
logging.warning("Local database file not found. Starting with empty data.")
return {'products': [], 'categories': []}
except json.JSONDecodeError:
logging.error("Error: Unable to decode JSON file. File might be corrupted. Starting with empty data.")
# Optionally, try to rename the corrupted file and download again
try:
os.rename(DATA_FILE, f"{DATA_FILE}.corrupted_{datetime.now().strftime('%Y%m%d%H%M%S')}")
logging.info("Renamed corrupted data file.")
# Try downloading again
download_db_from_hf()
# Recursive call - be careful with recursion depth
return load_data()
except Exception as e_rename:
logging.error(f"Could not rename corrupted file or re-download: {e_rename}")
return {'products': [], 'categories': []}
except RepositoryNotFoundError:
logging.error("Hugging Face repository not found. Cannot download initial database. Starting with empty local data.")
return {'products': [], 'categories': []}
except Exception as e:
logging.error(f"An unexpected error occurred during data loading: {e}")
return {'products': [], 'categories': []}
def save_data(data):
temp_file = f"{DATA_FILE}.tmp"
try:
# Write to a temporary file first
with open(temp_file, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
# Rename the temporary file to the actual data file (atomic operation on POSIX)
os.replace(temp_file, DATA_FILE)
logging.info("Data successfully saved to JSON")
# Schedule HF upload (consider doing it asynchronously or less frequently)
# Using threading here for simplicity, but a task queue (like Celery) is better for production
threading.Thread(target=upload_db_to_hf).start()
except Exception as e:
logging.error(f"Error saving data: {e}")
# Clean up temp file if it exists
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except OSError as e_remove:
logging.error(f"Could not remove temporary save file {temp_file}: {e_remove}")
raise # Re-raise the exception to indicate failure
def upload_db_to_hf():
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set. Skipping Hugging Face upload.")
return
if not os.path.exists(DATA_FILE):
logging.warning(f"Data file {DATA_FILE} not found. Skipping Hugging Face upload.")
return
try:
api = HfApi()
api.upload_file(
path_or_fileobj=DATA_FILE,
path_in_repo=DATA_FILE,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Automated database backup {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logging.info("JSON database backup successfully uploaded to Hugging Face.")
except RepositoryNotFoundError:
logging.error(f"Failed to upload backup: Repository '{REPO_ID}' not found.")
except Exception as e:
logging.error(f"Error uploading backup: {e}")
def download_db_from_hf():
if not HF_TOKEN_READ:
logging.warning("HF_TOKEN_READ not set. Skipping Hugging Face download.")
# Raise an error if download is essential for startup without local file
# raise ValueError("HF Read Token not configured, cannot download database.")
return # Or just return if local fallback is acceptable
try:
hf_hub_download(
repo_id=REPO_ID,
filename=DATA_FILE,
repo_type="dataset",
token=HF_TOKEN_READ,
local_dir=".", # Download to current directory
local_dir_use_symlinks=False, # Recommended for safety
force_download=True # Overwrite local file if it exists
)
logging.info("JSON database successfully downloaded from Hugging Face.")
except RepositoryNotFoundError:
logging.error(f"Repository '{REPO_ID}' not found on Hugging Face.")
raise # Re-raise to be handled by load_data
except Exception as e: # Catch more specific exceptions if needed (e.g., hf_hub_utils.HFValidationError)
logging.error(f"Error downloading JSON database from Hugging Face: {e}")
raise # Re-raise to be handled by load_data
def periodic_backup():
if not HF_TOKEN_WRITE:
logging.info("Periodic backup disabled: HF_TOKEN_WRITE not set.")
return
logging.info("Starting periodic backup thread.")
while True:
time.sleep(800) # Sleep for 13 minutes and 20 seconds
logging.info("Initiating periodic backup...")
try:
# Load current data before backup to ensure consistency?
# Or just upload the last saved state. Uploading last saved is simpler.
upload_db_to_hf()
except Exception as e:
logging.error(f"Error during periodic backup: {e}")
# Add error handling/retry logic if needed
@app.route('/')
def catalog():
data = load_data()
products = data.get('products', []) # Use .get for safety
categories = data.get('categories', [])
catalog_html = '''
GL Glasman
Каталог Товаров
{% for category in categories %}
{% endfor %}
{% if products %}
{% for product in products %}
{% if product.get('photos') and product['photos']|length > 0 %}
{% else %}
{% endif %}
{{ product['name']|e }}
{{ product['price'] }} ₸
{{ product['description'][:80]|e }}{% if product['description']|length > 80 %}...{% endif %}
{% endfor %}
{% else %}
Нет товаров для отображения.
{% endif %}
×
Детали Продукта
Загрузка...
×
Выберите опции
×
Корзина
Ваша корзина пуста.
Итого: 0 ₸
'''
return render_template_string(catalog_html, products=products, categories=categories, repo_id=REPO_ID)
@app.route('/product/')
def product_detail(index):
# This endpoint now returns HTML partial for the modal
data = load_data()
products = data.get('products', [])
if index < 0 or index >= len(products):
return "
Продукт не найден.
", 404
product = products[index]
# Generate only the inner content for the modal
detail_html = '''
{% if product.get('photos') and product['photos']|length > 0 %}
{% for photo in product['photos'] %}
{# Use pre-wrap to respect newlines #}
{% set colors = product.get('colors') %}
{% if colors and colors|length > 0 %}
Доступные цвета: {{ colors|join(', ')|e }}
{% else %}
Доступные цвета: Цвет не указан
{% endif %}
{% set sizes = product.get('sizes') %}
{% if sizes and sizes|length > 0 %}
Доступные размеры: {{ sizes|join(', ')|e }}
{% else %}
Доступные размеры: Размер не указан
{% endif %}
{% set patterns = product.get('patterns') %}
{% if patterns and patterns|length > 0 %}
Доступные узоры: {{ patterns|join(', ')|e }}
{% else %}
Доступные узоры: Узор не указан
{% endif %}
'''
# Note: repo_id is needed here as well for the image URLs
return render_template_string(detail_html, product=product, repo_id=REPO_ID)
# Helper function to upload a single photo to Hugging Face
def upload_photo_to_hf(local_path, repo_filename, product_name):
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set. Cannot upload photo.")
return False
try:
api = HfApi()
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=f"photos/{repo_filename}", # Store in 'photos' directory
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Upload photo {repo_filename} for product {product_name}"
)
logging.info(f"Successfully uploaded photo {repo_filename} to HF.")
return True
except Exception as e:
logging.error(f"Failed to upload photo {repo_filename} to HF: {e}")
return False
finally:
# Clean up the temporary local file
if os.path.exists(local_path):
try:
os.remove(local_path)
except OSError as e_remove:
logging.error(f"Could not remove temporary photo file {local_path}: {e_remove}")
@app.route('/admin', methods=['GET', 'POST'])
def admin():
# Basic Password Protection (Replace with a proper auth system)
# IMPORTANT: This is NOT secure for production. Use Flask-Login, OAuth, etc.
# Example: Check for a specific header or query parameter
# admin_secret = os.getenv("ADMIN_SECRET", "default_secret") # Get secret from env
# provided_secret = request.headers.get("X-Admin-Secret") or request.args.get("secret")
#
# if provided_secret != admin_secret:
# return "Unauthorized", 401 # Or redirect to a login page
data = load_data()
# Ensure keys exist and are lists, provide defaults if not
products = data.setdefault('products', [])
categories = data.setdefault('categories', [])
if request.method == 'POST':
action = request.form.get('action')
logging.debug(f"Admin action received: {action}")
try:
if action == 'add_category':
category_name = request.form.get('category_name', '').strip()
if category_name and category_name not in categories:
categories.append(category_name)
categories.sort() # Keep categories sorted
save_data(data)
logging.info(f"Category '{category_name}' added.")
return redirect(url_for('admin'))
elif not category_name:
return "Ошибка: Название категории не может быть пустым.", 400
else:
return "Ошибка: Категория с таким названием уже существует.", 400
elif action == 'delete_category':
category_to_delete = request.form.get('category_name')
if category_to_delete and category_to_delete in categories:
categories.remove(category_to_delete)
# Update products using this category
updated_products = []
for product in products:
if product.get('category') == category_to_delete:
product['category'] = 'Без категории' # Reassign to default
updated_products.append(product)
data['products'] = updated_products
save_data(data)
logging.info(f"Category '{category_to_delete}' deleted and products updated.")
return redirect(url_for('admin'))
elif not category_to_delete:
return "Ошибка: Не выбрана категория для удаления.", 400
else:
return "Ошибка: Категория не найдена.", 404
elif action == 'add' or action == 'edit':
# Common data extraction for add/edit
name = request.form.get('name', '').strip()
price_str = request.form.get('price', '0').replace(',', '.')
description = request.form.get('description', '').strip()
category = request.form.get('category', 'Без категории')
# Use getlist to handle multiple inputs with the same name
colors = [c.strip() for c in request.form.getlist('colors') if c.strip()]
sizes = [s.strip() for s in request.form.getlist('sizes') if s.strip()]
patterns = [p.strip() for p in request.form.getlist('patterns') if p.strip()]
photos_files = request.files.getlist('photos')
if not name or not description: # Price can be 0
return "Ошибка: Название и описание товара обязательны.", 400
try:
price = float(price_str)
if price < 0:
return "Ошибка: Цена не может быть отрицательной.", 400
except ValueError:
return "Ошибка: Неверный формат цены.", 400
uploaded_photo_filenames = []
if photos_files:
for photo in photos_files:
if photo and photo.filename:
# Sanitize filename
original_filename = secure_filename(photo.filename)
# Create a unique filename to avoid collisions
timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f")
unique_filename = f"{timestamp}_{original_filename}"
temp_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
try:
photo.save(temp_path)
logging.debug(f"Photo saved locally to {temp_path}")
# Attempt upload to HF
if upload_photo_to_hf(temp_path, unique_filename, name):
uploaded_photo_filenames.append(unique_filename)
else:
# Decide error handling: stop, warn, or continue without photo
logging.warning(f"Failed to upload photo {unique_filename} to HF. It will not be added to the product.")
# Optionally: return "Error uploading photo", 500
except Exception as e_save:
logging.error(f"Error saving uploaded file {original_filename}: {e_save}")
# Clean up failed temp file if necessary
if os.path.exists(temp_path): os.remove(temp_path)
# Optionally: return "Error processing file upload", 500
if action == 'add':
new_product = {
'name': name,
'price': price,
'description': description,
'category': category if category in categories else 'Без категории',
'photos': uploaded_photo_filenames, # Add newly uploaded photos
'colors': list(set(colors)), # Remove duplicates
'sizes': list(set(sizes)),
'patterns': list(set(patterns))
}
products.append(new_product)
logging.info(f"Product '{name}' added.")
elif action == 'edit':
index_str = request.form.get('index')
if index_str is None:
return "Ошибка: Индекс товара не предоставлен для редактирования.", 400
try:
index = int(index_str)
if index < 0 or index >= len(products):
raise IndexError("Index out of bounds")
except (ValueError, IndexError):
return "Ошибка: Неверный индекс товара для редактирования.", 400
# Get existing photos unless new ones were uploaded
existing_photos = products[index].get('photos', [])
# Decide update strategy for photos: replace or append? Replacing is simpler.
products[index]['photos'] = uploaded_photo_filenames if uploaded_photo_filenames else existing_photos
# Option to remove specific photos would need more UI elements
products[index].update({
'name': name,
'price': price,
'description': description,
'category': category if category in categories else 'Без категории',
'colors': list(set(colors)), # Update with new list, ensuring uniqueness
'sizes': list(set(sizes)),
'patterns': list(set(patterns))
})
logging.info(f"Product at index {index} ('{name}') updated.")
save_data(data)
return redirect(url_for('admin'))
elif action == 'delete':
index_str = request.form.get('index')
if index_str is None:
return "Ошибка: Индекс товара не предоставлен для удаления.", 400
try:
index = int(index_str)
if index < 0 or index >= len(products):
raise IndexError("Index out of bounds")
except (ValueError, IndexError):
return "Ошибка: Неверный индекс товара для удаления.", 400
# Optional: Delete associated photos from HF (more complex, requires tracking)
# For now, just remove product data
deleted_product_name = products[index].get('name', 'Unknown')
del products[index]
save_data(data)
logging.info(f"Product at index {index} ('{deleted_product_name}') deleted.")
return redirect(url_for('admin'))
else:
logging.warning(f"Unknown admin action received: {action}")
return "Неизвестное действие.", 400
except Exception as e:
# Log the full error traceback for debugging
logging.exception(f"An error occurred in admin POST action '{action}': {e}")
# Provide a generic error message to the user
return f"Произошла внутренняя ошибка сервера при обработке действия '{action}'. Подробности см. в логах сервера.", 500
# --- Render Admin Page (GET request or after POST redirect) ---
admin_html = '''
Админ-панель
Админ-панель
Добавить новый товар
Управление категориями
Добавить категорию
Существующие категории
{% if categories %}
{% for category in categories %}
{{ category|e }}
{% endfor %}
{% else %}
Нет созданных категорий.
{% endif %}
Управление базой данных
Резервное копирование JSON файла базы данных на Hugging Face.
Цвета: {{ (product.get('colors')|join(', '))|e if product.get('colors') else 'Нет' }}
Размеры: {{ (product.get('sizes')|join(', '))|e if product.get('sizes') else 'Нет' }}
Узоры: {{ (product.get('patterns')|join(', '))|e if product.get('patterns') else 'Нет' }}
{% if product.get('photos') %}
{% for photo in product['photos'] %}
{% endfor %}
{% endif %}
Редактировать
{% else %}
Нет товаров для отображения. Добавьте новый товар, используя форму выше.
{% endfor %}
'''
# Pass categories for the dropdowns in add/edit forms
return render_template_string(admin_html, products=products, categories=categories, repo_id=REPO_ID)
# Route to download the local database file
@app.route('/admin/download_local_db')
def download_local_db():
# Add authentication/authorization check here if needed
try:
# Ensure DATA_FILE is within the application directory or a safe path
# For simplicity, assuming it's in the current directory
safe_directory = os.path.abspath(".") # Current directory
return send_from_directory(directory=safe_directory, path=DATA_FILE, as_attachment=True)
except FileNotFoundError:
logging.error(f"Attempted to download local DB, but {DATA_FILE} not found.")
return "Локальный файл базы данных не найден.", 404
except Exception as e:
logging.error(f"Error during local DB download: {e}")
return "Произошла ошибка при скачивании файла.", 500
@app.route('/backup', methods=['POST'])
def backup():
# Add authentication/authorization check here if needed
logging.info("Manual backup requested.")
try:
upload_db_to_hf()
# Add a success message (e.g., using Flask flash messages)
# flash("Резервная копия успешно загружена на Hugging Face.", "success")
return redirect(url_for('admin')) # Redirect back to admin page
except Exception as e:
logging.error(f"Manual backup failed: {e}")
# Add an error message
# flash(f"Ошибка при создании резервной копии: {e}", "error")
return f"Ошибка при создании резервной копии: {e}", 500
@app.route('/download', methods=['GET']) # Changed to GET as it's retrieving data
def download():
# Add authentication/authorization check here if needed
logging.info("Manual download from HF requested.")
try:
download_db_from_hf()
# Force reload data in the current instance if necessary,
# or just inform the user they might need to restart/refresh.
# flash("База данных успешно скачана с Hugging Face. Данные обновлены.", "success")
return redirect(url_for('admin')) # Redirect back to admin page
except RepositoryNotFoundError:
# flash("Ошибка: Репозиторий Hugging Face не найден.", "error")
return "Ошибка: Репозиторий Hugging Face не найден.", 404
except Exception as e:
logging.error(f"Manual download from HF failed: {e}")
# flash(f"Ошибка при скачивании базы данных: {e}", "error")
return f"Ошибка при скачивании базы данных: {e}", 500
if __name__ == '__main__':
# Start the periodic backup in a separate thread
# Ensure the thread stops when the main app exits using daemon=True
if HF_TOKEN_WRITE: # Only start if token is available
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
else:
logging.warning("Periodic backup thread not started: HF_TOKEN_WRITE is not set.")
# Attempt initial data load (downloads from HF if local file missing and configured)
try:
load_data()
except Exception as e:
# Log error but allow the app to start, potentially with empty data
logging.critical(f"Failed initial data load: {e}. The application might not function correctly.")
# Run the Flask app
# Use waitress or gunicorn for production instead of Flask's built-in server
port = int(os.environ.get("PORT", 7860)) # Use PORT environment variable if available
debug_mode = os.environ.get("FLASK_DEBUG", "false").lower() == "true" # Enable debug via env var
logging.info(f"Starting Flask app on 0.0.0.0:{port} with debug={debug_mode}")
app.run(debug=debug_mode, host='0.0.0.0', port=port)