#!/usr/bin/env python3
import os
from flask import Flask, request, Response, render_template_string, jsonify, redirect, url_for
import hmac
import hashlib
import json
from urllib.parse import unquote, parse_qs, quote
import time
from datetime import datetime, timezone, timedelta
import logging
import threading
import random
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import RepositoryNotFoundError
BOT_TOKEN = os.getenv("BOT_TOKEN", "7835463659:AAGNePbelZIAOeaglyQi1qulOqnjs4BGQn4")
HOST = '0.0.0.0'
PORT = 7860
DATA_FILE = 'data.json'
REPO_ID = "flpolprojects/examplebonus"
HF_DATA_FILE_PATH = "data.json"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
app.secret_key = os.urandom(24)
_data_lock = threading.Lock()
visitor_data_cache = {}
def get_bishkek_time():
return datetime.now(timezone(timedelta(hours=6)))
def generate_unique_id(all_data):
while True:
new_id = str(random.randint(10000, 99999))
if new_id not in all_data:
return new_id
def download_data_from_hf():
global visitor_data_cache
if not HF_TOKEN_READ:
logging.warning("HF_TOKEN_READ not set. Skipping Hugging Face download.")
return False
try:
logging.info(f"Attempting to download {HF_DATA_FILE_PATH} from {REPO_ID}...")
hf_hub_download(
repo_id=REPO_ID,
filename=HF_DATA_FILE_PATH,
repo_type="dataset",
token=HF_TOKEN_READ,
local_dir=".",
local_dir_use_symlinks=False,
force_download=True,
etag_timeout=10
)
logging.info("Data file successfully downloaded from Hugging Face.")
with _data_lock:
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
visitor_data_cache = json.load(f)
logging.info("Successfully loaded downloaded data into cache.")
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Error reading downloaded data file: {e}. Starting with empty cache.")
visitor_data_cache = {}
return True
except RepositoryNotFoundError:
logging.error(f"Hugging Face repository '{REPO_ID}' not found. Cannot download data.")
except Exception as e:
logging.error(f"Error downloading data from Hugging Face: {e}")
return False
def load_visitor_data():
global visitor_data_cache
with _data_lock:
if not visitor_data_cache:
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
visitor_data_cache = json.load(f)
logging.info("Visitor data loaded from local JSON.")
except FileNotFoundError:
logging.warning(f"{DATA_FILE} not found locally. Starting with empty data.")
visitor_data_cache = {}
except json.JSONDecodeError:
logging.error(f"Error decoding {DATA_FILE}. Starting with empty data.")
visitor_data_cache = {}
except Exception as e:
logging.error(f"Unexpected error loading visitor data: {e}")
visitor_data_cache = {}
return visitor_data_cache
def save_visitor_data(data):
with _data_lock:
try:
visitor_data_cache.update(data)
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(visitor_data_cache, f, ensure_ascii=False, indent=4)
logging.info(f"Visitor data successfully saved to {DATA_FILE}.")
upload_data_to_hf_async()
except Exception as e:
logging.error(f"Error saving visitor data: {e}")
def upload_data_to_hf():
if not HF_TOKEN_WRITE:
logging.warning("HF_TOKEN_WRITE not set. Skipping Hugging Face upload.")
return
if not os.path.exists(DATA_FILE):
logging.warning(f"{DATA_FILE} does not exist. Skipping upload.")
return
try:
api = HfApi()
with _data_lock:
file_content_exists = os.path.getsize(DATA_FILE) > 0
if not file_content_exists:
logging.warning(f"{DATA_FILE} is empty. Skipping upload.")
return
logging.info(f"Attempting to upload {DATA_FILE} to {REPO_ID}/{HF_DATA_FILE_PATH}...")
api.upload_file(
path_or_fileobj=DATA_FILE,
path_in_repo=HF_DATA_FILE_PATH,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
commit_message=f"Update bonus data {get_bishkek_time().strftime('%Y-%m-%d %H:%M:%S')}"
)
logging.info("Bonus data successfully uploaded to Hugging Face.")
except Exception as e:
logging.error(f"Error uploading data to Hugging Face: {e}")
def upload_data_to_hf_async():
upload_thread = threading.Thread(target=upload_data_to_hf, daemon=True)
upload_thread.start()
def periodic_backup():
if not HF_TOKEN_WRITE:
logging.info("Periodic backup disabled: HF_TOKEN_WRITE not set.")
return
while True:
time.sleep(3600)
logging.info("Initiating periodic backup...")
upload_data_to_hf()
def verify_telegram_data(init_data_str):
try:
parsed_data = parse_qs(init_data_str)
received_hash = parsed_data.pop('hash', [None])[0]
if not received_hash:
return None, False
data_check_list = []
for key, value in sorted(parsed_data.items()):
data_check_list.append(f"{key}={value[0]}")
data_check_string = "\n".join(data_check_list)
secret_key = hmac.new("WebAppData".encode(), BOT_TOKEN.encode(), hashlib.sha256).digest()
calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
if calculated_hash == received_hash:
auth_date = int(parsed_data.get('auth_date', [0])[0])
current_time = int(time.time())
if current_time - auth_date > 86400:
logging.warning(f"Telegram InitData is older than 24 hours (Auth Date: {auth_date}, Current: {current_time}).")
return parsed_data, True
else:
logging.warning(f"Data verification failed. Calculated: {calculated_hash}, Received: {received_hash}")
return parsed_data, False
except Exception as e:
logging.error(f"Error verifying Telegram data: {e}")
return None, False
TEMPLATE = """
Bonus
Ваши бонусы
{{ "%.2f"|format(user.bonuses|float) }}
Ваш долг
{{ "%.2f"|format(user.debts|float) }}
Ваш ID клиента
{{ user.id }}
История операций
{% if user.combined_history %}
{% for item in user.combined_history %}
-
{{ item.description }}
{{ item.date_str }}
{% if item.transaction_type == 'bonus' %}
{{ '+' if item.type == 'accrual' else '-' }}{{ "%.2f"|format(item.amount|float) }}
{% elif item.transaction_type == 'debt' %}
{{ '+' if item.type == 'accrual' else '-' }}{{ "%.2f"|format(item.amount|float) }}
{% endif %}
{% endfor %}
{% else %}
Операций пока не было.
{% endif %}
"""
COMPANY_INFO_TEMPLATE = """
Визитка Компании
{% if company_info.website %}
{% endif %}
{% if company_info.facebook %}
{% endif %}
{% if company_info.instagram %}
{% endif %}
{% if company_info.tiktok %}
{% endif %}
Назад
"""
INVOICE_LIST_TEMPLATE = """
Мои накладные
{% if invoices %}
{% for invoice in invoices %}
-
{% for item in invoice.items %}
{{ item.product_name }} - {{ item.quantity }} шт. x {{ "%.2f"|format(item.unit_price) }} = {{ "%.2f"|format(item.total_price) }}
{% endfor %}
Итого: {{ "%.2f"|format(invoice.total_amount) }}
{% endfor %}
{% else %}
Накладных пока нет.
{% endif %}
Назад
"""
ADMIN_TEMPLATE = """
Bonus Admin
Панель администратора Bonus
{{ summary.total_users }}
Всего клиентов
{{ "%.2f"|format(summary.total_bonuses|float) }}
Всего бонусов
{{ "%.2f"|format(summary.total_debts|float) }}
Всего долгов
{{ summary.users_with_debt }}
Клиенты с долгом
Информация о компании
Создание накладной
Всего товаров: 0
Общая сумма: 0.00
{% if users %}
{% for user in users|sort(attribute='visited_at', reverse=true) %}
{{ user.first_name or '' }} {{ user.last_name or '' }}
@{{ user.username if user.username else user.phone_number }} | ID: {{ user.id }}
Бонусы
{{ "%.2f"|format(user.bonuses|float) }}
Долг
{{ "%.2f"|format(user.debts|float if user.debts else 0) }}
{% if user.telegram_id == None %}
{% endif %}
{% endfor %}
{% else %}
Пользователей пока нет.
{% endif %}
"""
@app.route('/')
def index():
user_id_str = request.args.get('user_id_for_test')
current_data = load_visitor_data()
user_data = {}
if user_id_str and user_id_str in current_data:
user_data = current_data[user_id_str]
user_data['id'] = user_id_str
bonus_history = user_data.get('history', [])
for item in bonus_history:
item['transaction_type'] = 'bonus'
debt_history = user_data.get('debt_history', [])
for item in debt_history:
item['transaction_type'] = 'debt'
combined_history = sorted(
bonus_history + debt_history,
key=lambda x: x['date'],
reverse=True
)
user_data['combined_history'] = combined_history
else:
user_data = {
"id": "N/A",
"bonuses": 0,
"debts": 0,
"history": [],
"debt_history": [],
"combined_history": []
}
return render_template_string(TEMPLATE, user=user_data, user_id=user_id_str)
@app.route('/company_info')
def view_company_info():
user_id_str = request.args.get('user_id_for_test')
current_data = load_visitor_data()
company_info = current_data.get('company_info', {})
user_data_for_template = {}
if user_id_str and user_id_str in current_data:
user_data_for_template = {"id": user_id_str}
else:
user_data_for_template = {"id": None}
return render_template_string(COMPANY_INFO_TEMPLATE, company_info=company_info, user_id=user_id_str)
@app.route('/invoices/')
def view_invoice(user_id):
current_data = load_visitor_data()
user_data = current_data.get(user_id, {})
invoices = user_data.get('invoices', [])
for invoice in invoices:
invoice_date = invoice.get('date')
if invoice_date:
try:
dt_object = datetime.fromisoformat(invoice_date)
invoice['created_at_str'] = dt_object.strftime('%Y-%m-%d %H:%M:%S')
except ValueError:
invoice['created_at_str'] = invoice_date
items = invoice.get('items', [])
for item in items:
item_price = item.get('unit_price', 0)
item_quantity = item.get('quantity', 0)
item['total_price'] = item_price * item_quantity
total_invoice_amount = sum(item['total_price'] for item in items)
invoice['total_amount'] = total_invoice_amount
return render_template_string(INVOICE_LIST_TEMPLATE, invoices=invoices, user_id=user_id)
@app.route('/verify', methods=['POST'])
def verify_data():
try:
req_data = request.get_json()
init_data_str = req_data.get('initData')
if not init_data_str:
return jsonify({"status": "error", "message": "Missing initData"}), 400
user_data_parsed, is_valid = verify_telegram_data(init_data_str)
user_info_dict = {}
if user_data_parsed and 'user' in user_data_parsed:
try:
user_json_str = unquote(user_data_parsed['user'][0])
user_info_dict = json.loads(user_json_str)
except Exception as e:
logging.error(f"Could not parse user JSON: {e}")
user_info_dict = {}
if is_valid:
tg_user_id = user_info_dict.get('id')
if tg_user_id:
now = get_bishkek_time()
all_data = load_visitor_data()
existing_user_key = None
for key, user_data_item in all_data.items():
if str(user_data_item.get('telegram_id')) == str(tg_user_id):
existing_user_key = key
break
if existing_user_key:
user_entry = all_data[existing_user_key]
user_entry.update({
'first_name': user_info_dict.get('first_name'),
'last_name': user_info_dict.get('last_name'),
'username': user_info_dict.get('username'),
'photo_url': user_info_dict.get('photo_url'),
'language_code': user_info_dict.get('language_code'),
'visited_at': now.timestamp(),
'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S')
})
user_id_to_save = existing_user_key
else:
new_user_id = generate_unique_id(all_data)
user_entry = {
'id': new_user_id,
'telegram_id': tg_user_id,
'first_name': user_info_dict.get('first_name'),
'last_name': user_info_dict.get('last_name'),
'username': user_info_dict.get('username'),
'photo_url': user_info_dict.get('photo_url'),
'language_code': user_info_dict.get('language_code'),
'is_premium': user_info_dict.get('is_premium', False),
'phone_number': None,
'visited_at': now.timestamp(),
'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S'),
'bonuses': 0,
'history': [],
'debts': 0,
'debt_history': [],
'invoices': []
}
user_id_to_save = new_user_id
save_visitor_data({user_id_to_save: user_entry})
return jsonify({"status": "ok", "verified": True, "user_id": user_id_to_save})
else:
return jsonify({"status": "error", "verified": True, "message": "User ID not found in parsed data"}), 400
else:
logging.warning(f"Verification failed for user: {user_info_dict.get('id')}")
return jsonify({"status": "error", "verified": False, "message": "Invalid data"}), 403
except Exception as e:
logging.exception("Error in /verify endpoint")
return jsonify({"status": "error", "message": "Internal server error"}), 500
@app.route('/admin')
def admin_panel():
current_data = load_visitor_data()
users_list = []
for user_id, user_data in current_data.items():
if user_id == 'company_info': continue
user_data['id'] = user_id
users_list.append(user_data)
total_users = len(users_list)
total_bonuses = sum(u.get('bonuses', 0) for u in users_list)
total_debts = sum(u.get('debts', 0) for u in users_list)
users_with_debt = sum(1 for u in users_list if u.get('debts', 0) > 0)
company_info = current_data.get('company_info', {})
summary_stats = {
"total_users": total_users,
"total_bonuses": total_bonuses,
"total_debts": total_debts,
"users_with_debt": users_with_debt
}
return render_template_string(ADMIN_TEMPLATE, users=users_list, summary=summary_stats, company_info=company_info)
@app.route('/admin/save_company_info', methods=['POST'])
def save_company_info():
try:
data = request.get_json()
company_info = {
'name': data.get('name'),
'phone1': data.get('phone1'),
'phone2': data.get('phone2'),
'whatsapp': data.get('whatsapp'),
'telegram': data.get('telegram'),
'address': data.get('address'),
'website': data.get('website'),
'facebook': data.get('facebook'),
'instagram': data.get('instagram'),
'tiktok': data.get('tiktok')
}
all_data = load_visitor_data()
all_data['company_info'] = company_info
save_visitor_data({})
return jsonify({"status": "ok", "message": "Company info saved successfully"}), 200
except Exception as e:
logging.exception("Error in /admin/save_company_info endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/admin/lookup_client', methods=['GET'])
def lookup_client():
query = request.args.get('q', '').strip()
if not query:
return jsonify({"users": []})
all_data = load_visitor_data()
results = []
for user_id, user_data in all_data.items():
if user_id == 'company_info': continue
match = False
if query.lower() in str(user_id).lower(): match = True
if user_data.get('first_name') and query.lower() in user_data['first_name'].lower(): match = True
if user_data.get('last_name') and query.lower() in user_data['last_name'].lower(): match = True
if user_data.get('username') and query.lower() in user_data['username'].lower(): match = True
if user_data.get('phone_number') and query in user_data['phone_number']: match = True
if match:
results.append({
"id": user_id,
"first_name": user_data.get('first_name'),
"last_name": user_data.get('last_name'),
"username": user_data.get('username'),
"phone_number": user_data.get('phone_number')
})
if len(results) >= 5: break
return jsonify({"users": results})
@app.route('/admin/create_invoice', methods=['POST'])
def create_invoice():
try:
data = request.get_json()
items_data = data.get('items', [])
if not items_data:
return jsonify({"status": "error", "message": "Накладная не может быть пустой."}), 400
all_data = load_visitor_data()
invoice_id_counter = all_data.get('invoice_id_counter', 0)
new_invoice_id = invoice_id_counter + 1
processed_items = []
total_amount = 0
now = get_bishkek_time()
now_iso = now.isoformat()
now_str = now.strftime('%Y-%m-%d %H:%M:%S')
for item_data in items_data:
client_id = item_data.get('client_id')
product_name = item_data.get('product_name')
quantity = float(item_data.get('quantity', 0))
unit_price = float(item_data.get('unit_price', 0))
if not client_id or not product_name or quantity <= 0 or unit_price < 0:
return jsonify({"status": "error", "message": f"Некорректные данные для товара: {product_name or 'Неизвестно'}"}), 400
if client_id not in all_data or client_id == 'company_info':
return jsonify({"status": "error", "message": f"Клиент с ID '{client_id}' не найден."}), 404
item_total_price = quantity * unit_price
processed_items.append({
"product_name": product_name,
"quantity": quantity,
"unit_price": unit_price,
"total_price": item_total_price
})
total_amount += item_total_price
new_invoice = {
"id": str(new_invoice_id),
"date": now_iso,
"items": processed_items,
"total_amount": total_amount
}
client_invoices = all_data[client_id].setdefault('invoices', [])
client_invoices.append(new_invoice)
all_data['invoice_id_counter'] = new_invoice_id
save_visitor_data({})
return jsonify({"status": "ok", "message": "Накладная успешно создана.", "invoice_id": new_invoice_id}), 201
except Exception as e:
logging.exception("Error in /admin/create_invoice endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/admin/add_client', methods=['POST'])
def add_client():
try:
data = request.get_json()
phone_number = data.get('phone_number')
first_name = data.get('first_name')
if not phone_number or not first_name:
return jsonify({"status": "error", "message": "Имя и номер телефона обязательны."}), 400
all_data = load_visitor_data()
for user in all_data.values():
if user_id == 'company_info': continue
if user.get('phone_number') == phone_number:
return jsonify({"status": "error", "message": "Клиент с таким номером телефона уже существует."}), 409
now = get_bishkek_time()
new_id = generate_unique_id(all_data)
new_client = {
'id': new_id,
'telegram_id': None,
'first_name': first_name,
'last_name': None,
'username': None,
'photo_url': None,
'language_code': 'ru',
'is_premium': False,
'phone_number': phone_number,
'visited_at': now.timestamp(),
'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S'),
'bonuses': 0,
'history': [],
'debts': 0,
'debt_history': [],
'invoices': []
}
save_visitor_data({new_id: new_client})
return jsonify({"status": "ok", "message": "Client added successfully"}), 201
except Exception as e:
logging.exception("Error in /admin/add_client endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/admin/add_transaction', methods=['POST'])
def add_transaction():
try:
data = request.get_json()
user_id = data.get('user_id')
purchase_amount = float(data.get('purchase_amount', 0))
deduct_amount = float(data.get('deduct_amount', 0))
add_debt_amount = float(data.get('add_debt_amount', 0))
repay_debt_amount = float(data.get('repay_debt_amount', 0))
if not user_id:
return jsonify({"status": "error", "message": "User ID is required"}), 400
user_id_str = str(user_id)
all_data = load_visitor_data()
if user_id_str not in all_data or user_id_str == 'company_info':
return jsonify({"status": "error", "message": "User not found"}), 404
user = all_data[user_id_str]
now = get_bishkek_time()
now_iso = now.isoformat()
now_str = now.strftime('%Y-%m-%d %H:%M:%S')
if deduct_amount > user.get('bonuses', 0):
return jsonify({"status": "error", "message": "Недостаточно бонусов для списания"}), 400
if repay_debt_amount > user.get('debts', 0):
return jsonify({"status": "error", "message": "Сумма погашения превышает текущий долг"}), 400
accrual_amount = purchase_amount * 0.02
user['bonuses'] = user.get('bonuses', 0) + accrual_amount - deduct_amount
if 'history' not in user or not isinstance(user['history'], list):
user['history'] = []
if accrual_amount > 0:
user['history'].append({
"type": "accrual", "amount": accrual_amount,
"description": f"Начисление с покупки {purchase_amount:.2f}",
"date": now_iso, "date_str": now_str
})
if deduct_amount > 0:
user['history'].append({
"type": "deduction", "amount": deduct_amount,
"description": "Списание бонусов",
"date": now_iso, "date_str": now_str
})
user['debts'] = user.get('debts', 0) + add_debt_amount - repay_debt_amount
if 'debt_history' not in user or not isinstance(user['debt_history'], list):
user['debt_history'] = []
if add_debt_amount > 0:
user['debt_history'].append({
"type": "accrual", "amount": add_debt_amount,
"description": "Добавление долга",
"date": now_iso, "date_str": now_str
})
if repay_debt_amount > 0:
user['debt_history'].append({
"type": "payment", "amount": repay_debt_amount,
"description": "Погашение долга",
"date": now_iso, "date_str": now_str
})
save_visitor_data({user_id_str: user})
return jsonify({
"status": "ok", "message": "Transaction successful",
"new_balance": user['bonuses'], "new_debt": user['debts']
}), 200
except Exception as e:
logging.exception("Error in /admin/add_transaction endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/admin/delete_client', methods=['POST'])
def delete_client():
try:
data = request.get_json()
user_id = data.get('user_id')
if not user_id:
return jsonify({"status": "error", "message": "User ID is required"}), 400
user_id_str = str(user_id)
load_visitor_data()
with _data_lock:
if user_id_str not in visitor_data_cache or user_id_str == 'company_info':
return jsonify({"status": "error", "message": "User not found"}), 404
user_to_delete = visitor_data_cache[user_id_str]
if user_to_delete.get('telegram_id') is not None:
return jsonify({"status": "error", "message": "Cannot delete a Telegram-linked user"}), 403
del visitor_data_cache[user_id_str]
try:
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(visitor_data_cache, f, ensure_ascii=False, indent=4)
logging.info(f"User {user_id_str} deleted. Data saved to {DATA_FILE}.")
upload_data_to_hf_async()
except Exception as e:
logging.error(f"Error saving data after deletion: {e}")
return jsonify({"status": "error", "message": "Failed to save data after deletion"}), 500
return jsonify({"status": "ok", "message": "Client deleted successfully"}), 200
except Exception as e:
logging.exception("Error in /admin/delete_client endpoint")
return jsonify({"status": "error", "message": str(e)}), 500
if __name__ == '__main__':
print("--- BONUS SYSTEM SERVER ---")
print(f"Server starting on http://{HOST}:{PORT}")
if not HF_TOKEN_READ or not HF_TOKEN_WRITE:
print("WARNING: Hugging Face token(s) not set. Backup/restore functionality will be limited.")
else:
print("Attempting initial data download from Hugging Face...")
download_data_from_hf()
load_visitor_data()
print("WARNING: The /admin route is NOT protected. Implement proper authentication for production.")
if HF_TOKEN_WRITE:
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
print("Periodic backup thread started (every hour).")
print("--- Server Ready ---")
app.run(host=HOST, port=PORT, debug=False)