import os
import base64
import json
import threading
import time
from datetime import datetime, timedelta
from uuid import uuid4
import random
import string
import tempfile
import io
from PIL import Image, ImageOps
from flask import Flask, render_template_string, request, redirect, url_for, flash, jsonify, session
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError, HfHubHTTPError
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
import requests
load_dotenv()
app = Flask(__name__)
app.secret_key = 'super_secret_key_store_app_123_gippo_env'
app.config.update(
SESSION_COOKIE_SAMESITE='None',
SESSION_COOKIE_SECURE=True,
PERMANENT_SESSION_LIFETIME=timedelta(days=30)
)
DATA_FILE = 'data.json'
SYNC_FILES = [DATA_FILE]
REPO_ID = os.getenv("REPO_ID", "Kgshop/fullmeta")
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
DEFAULT_WHATSAPP_NUMBER = "+77470623684"
DEFAULT_LOGO_URL = "https://huggingface.co/spaces/Metapp/Tech/resolve/main/1776929812446-019db944-b5db-7524-8f44-73942d70a0f8.png"
data_lock = threading.Lock()
def get_almaty_time():
return (datetime.utcnow() + timedelta(hours=5)).strftime('%Y-%m-%d %H:%M:%S')
def download_db_from_hf(specific_file=None, retries=3, delay=5):
token_to_use = HF_TOKEN_READ if HF_TOKEN_READ else HF_TOKEN_WRITE
files_to_download = [specific_file] if specific_file else SYNC_FILES
all_successful = True
for file_name in files_to_download:
success = False
for attempt in range(retries + 1):
try:
hf_hub_download(
repo_id=REPO_ID,
filename=file_name,
repo_type="dataset",
token=token_to_use,
local_dir=".",
local_dir_use_symlinks=False,
force_download=True,
resume_download=False
)
success = True
break
except RepositoryNotFoundError:
return False
except HfHubHTTPError as e:
if e.response.status_code == 404:
if attempt == 0 and not os.path.exists(file_name):
try:
if file_name == DATA_FILE:
with data_lock:
fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(os.path.abspath(DATA_FILE)) or '.', text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump({}, f)
os.replace(temp_path, DATA_FILE)
except Exception:
pass
success = False
break
except requests.exceptions.RequestException:
pass
except Exception:
pass
if attempt < retries:
time.sleep(delay)
if not success:
all_successful = False
return all_successful
def upload_db_to_hf(specific_file=None):
if not HF_TOKEN_WRITE:
return
try:
api = HfApi()
files_to_upload = [specific_file] if specific_file else SYNC_FILES
for file_name in files_to_upload:
if os.path.exists(file_name):
try:
api.upload_file(
path_or_fileobj=file_name,
path_in_repo=file_name,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Sync {file_name} {get_almaty_time()}"
)
except Exception:
pass
except Exception:
pass
def process_and_upload_image(file_obj, repo_path, size=(512, 512)):
if not HF_TOKEN_WRITE:
return None
try:
img = Image.open(file_obj).convert('RGB')
img = ImageOps.fit(img, size, Image.Resampling.LANCZOS)
buf = io.BytesIO()
img.save(buf, format='JPEG', quality=85)
buf.seek(0)
filename = f"{uuid4().hex}.jpg"
api = HfApi()
api.upload_file(
path_or_fileobj=buf,
path_in_repo=f"{repo_path}/{filename}",
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE
)
return filename
except Exception:
return None
def periodic_backup():
while True:
time.sleep(1800)
upload_db_to_hf()
def load_data():
with data_lock:
data = {}
try:
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
if not isinstance(data, dict):
raise FileNotFoundError
except (FileNotFoundError, json.JSONDecodeError):
if download_db_from_hf(specific_file=DATA_FILE):
try:
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
except Exception:
data = {}
else:
data = {}
if 'products' in data or 'categories' in data:
data = {
'default_env': {
'products': data.get('products', []),
'categories': data.get('categories',[]),
'category_photos': data.get('category_photos', {}),
'orders': data.get('orders', {}),
'staff': [],
'catalog_users': [],
'inventory_history':[],
'settings': {
'organization_name': 'Default Shop',
'admin_password_enabled': False,
'admin_password': '',
'logo_url': DEFAULT_LOGO_URL,
'whatsapp_number': DEFAULT_WHATSAPP_NUMBER,
'invoice_contacts': '',
'currency': 'T',
'track_inventory': False,
'use_barcodes': False,
'business_type': 'mixed',
'system_mode': 'both',
'hide_stock_online': False,
'closed_catalog_enabled': False,
'theme': 'light',
'customer_fields': {
'name': True, 'phone': True, 'city': True, 'address': False, 'zip': False
},
'socials': {
'wa': {'enabled': True, 'url': 'https://wa.me/77011333885'},
'ig': {'enabled': True, 'url': 'https://instagram.com/14sklad_baisat'},
'tg': {'enabled': True, 'url': 'https://t.me/posuda15konteiner'}
}
}
}
}
changed = False
for env_id, env_data in data.items():
if 'products' not in env_data: env_data['products'] =[]
if 'categories' not in env_data: env_data['categories'] =[]
if 'category_photos' not in env_data: env_data['category_photos'] = {}
if 'orders' not in env_data: env_data['orders'] = {}
if 'staff' not in env_data: env_data['staff'] = []
if 'catalog_users' not in env_data: env_data['catalog_users'] = []
if 'inventory_history' not in env_data: env_data['inventory_history'] = []
if 'settings' not in env_data:
env_data['settings'] = {}
changed = True
settings = env_data['settings']
if 'organization_name' not in settings: settings['organization_name'] = f'Shop {env_id}'; changed = True
if 'admin_password_enabled' not in settings: settings['admin_password_enabled'] = False; changed = True
if 'admin_password' not in settings: settings['admin_password'] = ''; changed = True
if 'logo_url' not in settings: settings['logo_url'] = DEFAULT_LOGO_URL; changed = True
if 'whatsapp_number' not in settings: settings['whatsapp_number'] = DEFAULT_WHATSAPP_NUMBER; changed = True
if 'invoice_contacts' not in settings: settings['invoice_contacts'] = ''; changed = True
if 'currency' not in settings: settings['currency'] = 'T'; changed = True
if 'track_inventory' not in settings: settings['track_inventory'] = False; changed = True
if 'use_barcodes' not in settings: settings['use_barcodes'] = False; changed = True
if 'business_type' not in settings: settings['business_type'] = 'mixed'; changed = True
if 'system_mode' not in settings: settings['system_mode'] = 'both'; changed = True
if 'hide_stock_online' not in settings: settings['hide_stock_online'] = False; changed = True
if 'closed_catalog_enabled' not in settings: settings['closed_catalog_enabled'] = False; changed = True
if 'theme' not in settings: settings['theme'] = 'light'; changed = True
if 'customer_fields' not in settings:
settings['customer_fields'] = {'name': True, 'phone': True, 'city': True, 'address': False, 'zip': False}
changed = True
if 'socials' not in settings:
settings['socials'] = {
'wa': {'enabled': True, 'url': 'https://wa.me/77011333885'},
'ig': {'enabled': True, 'url': 'https://instagram.com/14sklad_baisat'},
'tg': {'enabled': True, 'url': 'https://t.me/posuda15konteiner'}
}
changed = True
for product in env_data['products']:
if 'product_id' not in product: product['product_id'] = uuid4().hex; changed = True
if 'pieces_per_box' not in product: product['pieces_per_box'] = ""; changed = True
if 'box_price' not in product: product['box_price'] = ""; changed = True
if 'min_order' not in product: product['min_order'] = ""; changed = True
if 'barcode' not in product: product['barcode'] = ""; changed = True
if 'variants' not in product: product['variants'] =[]; changed = True
if 'has_variant_prices' not in product: product['has_variant_prices'] = False; changed = True
if 'stock' not in product: product['stock'] = ""; changed = True
if 'is_available' not in product: product['is_available'] = True; changed = True
if 'wholesale_tiers' not in product: product['wholesale_tiers'] = []; changed = True
for v in product['variants']:
if 'stock' not in v: v['stock'] = ""; changed = True
if 'box_price' not in v: v['box_price'] = ""; changed = True
if 'barcode' not in v: v['barcode'] = ""; changed = True
if 'pieces_per_box' not in v: v['pieces_per_box'] = product.get('pieces_per_box', ""); changed = True
if 'is_available' not in v: v['is_available'] = True; changed = True
if 'wholesale_tiers' not in v: v['wholesale_tiers'] = []; changed = True
for order_id, order in env_data['orders'].items():
if 'status' not in order: order['status'] = 'confirmed'; changed = True
if 'staff_name' not in order: order['staff_name'] = ''; changed = True
if 'assembled' not in order: order['assembled'] = {}; changed = True
if 'global_discount' not in order: order['global_discount'] = 0; changed = True
for item in order.get('cart', []):
if 'discount' not in item: item['discount'] = 0; changed = True
if 'category' not in item: item['category'] = 'Без категории'; changed = True
if 'wholesale_tiers' not in item: item['wholesale_tiers'] = []; changed = True
if changed or not os.path.exists(DATA_FILE):
try:
fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(os.path.abspath(DATA_FILE)) or '.', text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as f:
json.dump(data, f)
os.replace(temp_path, DATA_FILE)
except Exception:
pass
return data
def save_data(data):
try:
if not isinstance(data, dict):
return
with data_lock:
fd, temp_path = tempfile.mkstemp(dir=os.path.dirname(os.path.abspath(DATA_FILE)) or '.', text=True)
with os.fdopen(fd, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
os.replace(temp_path, DATA_FILE)
upload_db_to_hf(specific_file=DATA_FILE)
except Exception:
pass
def get_env_data(env_id):
all_data = load_data()
if env_id not in all_data:
all_data[env_id] = {
'products': [],
'categories':[],
'category_photos': {},
'orders': {},
'staff': [],
'catalog_users': [],
'inventory_history':[],
'settings': {
'organization_name': f'Shop {env_id}',
'admin_password_enabled': False,
'admin_password': '',
'logo_url': DEFAULT_LOGO_URL,
'whatsapp_number': DEFAULT_WHATSAPP_NUMBER,
'invoice_contacts': '',
'currency': 'T',
'track_inventory': False,
'use_barcodes': False,
'business_type': 'mixed',
'system_mode': 'both',
'hide_stock_online': False,
'closed_catalog_enabled': False,
'theme': 'light',
'customer_fields': {'name': True, 'phone': True, 'city': True, 'address': False, 'zip': False},
'socials': {
'wa': {'enabled': True, 'url': ''},
'ig': {'enabled': True, 'url': ''},
'tg': {'enabled': True, 'url': ''}
}
}
}
save_data(all_data)
return all_data[env_id]
def save_env_data(env_id, env_data):
all_data = load_data()
all_data[env_id] = env_data
save_data(all_data)
def update_order_totals(order, business_type):
total = 0
global_discount = float(order.get('global_discount', 0))
for i in order['cart']:
qty = int(i.get('quantity', 0))
if qty <= 0:
continue
ppb = int(i.get('pieces_per_box', 1))
c_price = float(i.get('price', 0))
c_box_price = float(i.get('cart_box_price', 0))
item_discount = float(i.get('discount', 0))
base_price = c_price
tiers = i.get('wholesale_tiers', [])
if business_type == 'wholesale' and tiers:
valid_tiers = [t for t in tiers if qty >= t.get('qty', 0)]
if valid_tiers:
valid_tiers.sort(key=lambda x: x['qty'], reverse=True)
base_price = float(valid_tiers[0]['price'])
elif business_type in ['mixed', 'wholesale'] and c_box_price > 0 and ppb > 1 and qty >= ppb:
base_price = c_box_price / ppb
discounted_price = max(0, base_price - item_discount)
item_total = discounted_price * qty
i['calculated_price'] = round(discounted_price, 2)
total += item_total
total = max(0, total - global_discount)
order['total_price'] = round(total, 2)
def is_order_fully_assembled(order):
if order.get('status') not in['confirmed', 'pos']:
return True
assembled_data = order.get('assembled', {})
for item in order.get('cart',[]):
qty = int(item.get('quantity', 0))
if qty > 0:
c_key = item.get('c_key')
assembled_qty = int(assembled_data.get(c_key, 0))
if assembled_qty < qty:
return False
return True
def deduct_stock(cart_items, products):
for item in cart_items:
pid = item.get('product_id')
vidx = item.get('variant_idx', -1)
qty = int(item.get('quantity', 0))
for p in products:
if p['product_id'] == pid:
if vidx != -1 and vidx < len(p.get('variants',[])):
current_s = p['variants'][vidx].get('stock')
if current_s != "" and current_s is not None:
p['variants'][vidx]['stock'] = int(current_s) - qty
else:
current_s = p.get('stock')
if current_s != "" and current_s is not None:
p['stock'] = int(current_s) - qty
break
def restore_stock(c_key, pid, vidx, return_qty, products):
for p in products:
if p['product_id'] == pid:
if vidx != -1 and vidx < len(p.get('variants',[])):
current_s = p['variants'][vidx].get('stock')
if current_s != "" and current_s is not None:
p['variants'][vidx]['stock'] = int(current_s) + return_qty
else:
current_s = p.get('stock')
if current_s != "" and current_s is not None:
p['stock'] = int(current_s) + return_qty
break
def get_low_stock_items(products):
low_stock =[]
for p in products:
if p.get('variants'):
for vidx, v in enumerate(p['variants']):
s = v.get('stock')
if s != "" and s is not None and str(s).lstrip('-').isdigit() and int(s) < 100:
low_stock.append({"name": p['name'], "variant": v.get('name'), "stock": int(s), "category": p.get('category', '')})
else:
s = p.get('stock')
if s != "" and s is not None and str(s).lstrip('-').isdigit() and int(s) < 100:
low_stock.append({"name": p['name'], "variant": "", "stock": int(s), "category": p.get('category', '')})
return low_stock
LANDING_PAGE_TEMPLATE = '''
MetaStore
'''
LOGIN_TEMPLATE = '''
Вход
Вход
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
'''
CATALOG_LOGIN_TEMPLATE = '''
Вход в каталог
Закрытый каталог
Введите 6-значный пароль для доступа
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{{ message }}
{% endfor %}
{% endif %}
{% endwith %}
'''
ADMHOSTO_TEMPLATE = '''
Управление
Среды
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
{% if mode == 'pos' and staff_id %}
{% endif %}
{% if mode != 'pos' %}
{% if settings.socials.wa.enabled and settings.socials.wa.url %}
{% for link in settings.socials.wa.url.split() %}
{% if link.strip() %}
{% endif %}
{% endfor %}
{% endif %}
{% if settings.socials.ig.enabled and settings.socials.ig.url %}
{% for link in settings.socials.ig.url.split() %}
{% if link.strip() %}
{% endif %}
{% endfor %}
{% endif %}
{% if settings.socials.tg.enabled and settings.socials.tg.url %}
{% for link in settings.socials.tg.url.split() %}
{% if link.strip() %}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
Сумма заказа:0 {{ currency_code }}
Ваш заказ
{% if mode == 'pos' %}
{% else %}
{% if settings.customer_fields.name %} {% endif %}
{% if settings.customer_fields.phone %} {% endif %}
{% if settings.customer_fields.city %} {% endif %}
{% if settings.customer_fields.address %} {% endif %}
{% if settings.customer_fields.zip %} {% endif %}
{% endif %}