import os import hmac import hashlib import json from urllib.parse import unquote, parse_qs, quote import time from datetime import datetime import logging import threading import random import pytz import uuid from flask import Flask, request, Response, render_template_string, jsonify, redirect, url_for from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError from PIL import Image import io BOT_TOKEN = os.getenv("BOT_TOKEN", "7835463659:AAGNePbelZIAOeaglyQi1qulOqnjs4BGQn4") HOST = '0.0.0.0' PORT = 7860 DATA_FILE = 'data.json' REPO_ID = "flpolprojects/examplebonus" HF_DATA_FILE_PATH = "data.json" HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") BISHKEK_TZ = pytz.timezone('Asia/Bishkek') app = Flask(__name__) logging.basicConfig(level=logging.ERROR) app.secret_key = os.urandom(24) _data_lock = threading.Lock() visitor_data_cache = {} def generate_unique_id(all_data): while True: new_id = str(random.randint(10000, 99999)) if new_id not in all_data: return new_id def download_data_from_hf(): global visitor_data_cache if not HF_TOKEN_READ: return False try: hf_hub_download( repo_id=REPO_ID, filename=HF_DATA_FILE_PATH, repo_type="dataset", token=HF_TOKEN_READ, local_dir=".", local_dir_use_symlinks=False, force_download=True, etag_timeout=10 ) with _data_lock: try: with open(DATA_FILE, 'r', encoding='utf-8') as f: visitor_data_cache = json.load(f) except (FileNotFoundError, json.JSONDecodeError): visitor_data_cache = {} return True except RepositoryNotFoundError: pass except Exception: pass return False def load_visitor_data(): global visitor_data_cache with _data_lock: if not visitor_data_cache: try: with open(DATA_FILE, 'r', encoding='utf-8') as f: visitor_data_cache = json.load(f) except FileNotFoundError: visitor_data_cache = {"organization_details": {}} except json.JSONDecodeError: visitor_data_cache = {"organization_details": {}} except Exception: visitor_data_cache = {"organization_details": {}} if "organization_details" not in visitor_data_cache: visitor_data_cache["organization_details"] = {} return visitor_data_cache def save_visitor_data(data): with _data_lock: try: with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(visitor_data_cache, f, ensure_ascii=False, indent=4) upload_data_to_hf_async() except Exception: pass def upload_data_to_hf(): if not HF_TOKEN_WRITE: return if not os.path.exists(DATA_FILE): return try: api = HfApi() with _data_lock: file_content_exists = os.path.getsize(DATA_FILE) > 0 if not file_content_exists: return api.upload_file( path_or_fileobj=DATA_FILE, path_in_repo=HF_DATA_FILE_PATH, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Update bonus data {datetime.now(BISHKEK_TZ).strftime('%Y-%m-%d %H:%M:%S')}" ) except Exception: pass def upload_data_to_hf_async(): upload_thread = threading.Thread(target=upload_data_to_hf, daemon=True) upload_thread.start() def periodic_backup(): if not HF_TOKEN_WRITE: return while True: time.sleep(3600) upload_data_to_hf() def verify_telegram_data(init_data_str): try: parsed_data = parse_qs(init_data_str) received_hash = parsed_data.pop('hash', [None])[0] if not received_hash: return None, False data_check_list = [] for key, value in sorted(parsed_data.items()): data_check_list.append(f"{key}={value[0]}") data_check_string = "\n".join(data_check_list) secret_key = hmac.new("WebAppData".encode(), BOT_TOKEN.encode(), hashlib.sha256).digest() calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() if calculated_hash == received_hash: auth_date = int(parsed_data.get('auth_date', [0])[0]) current_time = int(time.time()) if current_time - auth_date > 86400: pass return parsed_data, True else: pass return parsed_data, False except Exception: return None, False TEMPLATE = """
Добро пожаловать!
Ваши бонусы
{{ "%.2f"|format(user.bonuses|float) }}
Ваш долг
{{ "%.2f"|format(user.debts|float) }}
Ваш ID клиента
{{ user.id }}
Операций пока не было.
{% endif %}Накладных пока нет.
{% endif %}Данные организации не указаны.
{% endif %}Пользователей пока нет.
{% endif %}This is a placeholder for your terms of service.
" @app.route('/privacy') def privacy(): return "This is a placeholder for your privacy policy.
" @app.route('/') def index(): user_id_str = request.args.get('user_id_for_test') all_data = load_visitor_data() user_data = {} if user_id_str and user_id_str in all_data: user_data = all_data[user_id_str] user_data['id'] = user_id_str bonus_history = user_data.get('history', []) for item in bonus_history: item['transaction_type'] = 'bonus' debt_history = user_data.get('debt_history', []) for item in debt_history: item['transaction_type'] = 'debt' combined_history = sorted( bonus_history + debt_history, key=lambda x: x['date'], reverse=True ) user_data['combined_history'] = combined_history user_data['invoices'] = user_data.get('invoices', []) user_data['ton_address'] = user_data.get('ton_address', '') else: user_data = { "id": "N/A", "bonuses": 0, "debts": 0, "history": [], "debt_history": [], "combined_history": [], "invoices": [], "ton_address": "" } org_details = all_data.get('organization_details', {}) return render_template_string(TEMPLATE, user=user_data, org_details=org_details) @app.route('/verify', methods=['POST']) def verify_data(): try: req_data = request.get_json() init_data_str = req_data.get('initData') if not init_data_str: return jsonify({"status": "error", "message": "Missing initData"}), 400 user_data_parsed, is_valid = verify_telegram_data(init_data_str) user_info_dict = {} if user_data_parsed and 'user' in user_data_parsed: try: user_json_str = unquote(user_data_parsed['user'][0]) user_info_dict = json.loads(user_json_str) except Exception: user_info_dict = {} if is_valid: tg_user_id = user_info_dict.get('id') if tg_user_id: now = datetime.now(BISHKEK_TZ) all_data = load_visitor_data() existing_user_key = None for key, user_data_item in all_data.items(): if key == "organization_details": continue if str(user_data_item.get('telegram_id')) == str(tg_user_id): existing_user_key = key break if existing_user_key: user_entry = all_data[existing_user_key] user_entry.update({ 'first_name': user_info_dict.get('first_name'), 'last_name': user_info_dict.get('last_name'), 'username': user_info_dict.get('username'), 'photo_url': user_info_dict.get('photo_url'), 'language_code': user_info_dict.get('language_code'), 'visited_at': now.timestamp(), 'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S') }) user_id_to_save = existing_user_key else: new_user_id = generate_unique_id(all_data) user_entry = { 'id': new_user_id, 'telegram_id': tg_user_id, 'first_name': user_info_dict.get('first_name'), 'last_name': user_info_dict.get('last_name'), 'username': user_info_dict.get('username'), 'photo_url': user_info_dict.get('photo_url'), 'language_code': user_info_dict.get('language_code'), 'is_premium': user_info_dict.get('is_premium', False), 'phone_number': None, 'visited_at': now.timestamp(), 'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S'), 'bonuses': 0, 'history': [], 'debts': 0, 'debt_history': [], 'invoices': [], 'ton_address': "" } user_id_to_save = new_user_id all_data[user_id_to_save] = user_entry save_visitor_data(all_data) return jsonify({"status": "ok", "verified": True, "user_id": user_id_to_save}) else: return jsonify({"status": "error", "verified": True, "message": "User ID not found in parsed data"}), 400 else: return jsonify({"status": "error", "verified": False, "message": "Invalid data"}), 403 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/save_ton_address', methods=['POST']) def save_ton_address(): try: req_data = request.get_json() internal_user_id = req_data.get('user_id') ton_address = req_data.get('ton_address') init_data_str = req_data.get('initData') if not internal_user_id: return jsonify({"status": "error", "message": "Internal User ID is required"}), 400 user_data_parsed, is_valid = verify_telegram_data(init_data_str) if not is_valid: return jsonify({"status": "error", "message": "Invalid Telegram InitData"}), 403 user_info_dict = {} if user_data_parsed and 'user' in user_data_parsed: try: user_json_str = unquote(user_data_parsed['user'][0]) user_info_dict = json.loads(user_json_str) except Exception: user_info_dict = {} telegram_user_id = user_info_dict.get('id') all_data = load_visitor_data() user_entry = all_data.get(internal_user_id) if not user_entry or internal_user_id == "organization_details": return jsonify({"status": "error", "message": "User not found"}), 404 if user_entry.get('telegram_id') is not None and str(user_entry.get('telegram_id')) != str(telegram_user_id): return jsonify({"status": "error", "message": "Telegram user ID mismatch or not authorized to update this account"}), 403 user_entry['ton_address'] = ton_address all_data[internal_user_id] = user_entry save_visitor_data(all_data) return jsonify({"status": "ok", "message": "TON address updated successfully"}), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin') def admin_panel(): all_data = load_visitor_data() users_list = [] for user_id, user_data in all_data.items(): if user_id == "organization_details": continue user_data['id'] = user_id users_list.append(user_data) total_users = len(users_list) total_bonuses = sum(u.get('bonuses', 0) for u in users_list) total_debts = sum(u.get('debts', 0) for u in users_list) users_with_debt = sum(1 for u in users_list if u.get('debts', 0) > 0) summary_stats = { "total_users": total_users, "total_bonuses": total_bonuses, "total_debts": total_debts, "users_with_debt": users_with_debt } return render_template_string(ADMIN_TEMPLATE, users=users_list, summary=summary_stats) @app.route('/admin/add_client', methods=['POST']) def add_client(): try: data = request.get_json() phone_number = data.get('phone_number') first_name = data.get('first_name') if not phone_number or not first_name: return jsonify({"status": "error", "message": "Имя и номер телефона обязательны."}), 400 all_data = load_visitor_data() for key, user in all_data.items(): if key == "organization_details": continue if user.get('phone_number') == phone_number: return jsonify({"status": "error", "message": "Клиент с таким номером телефона уже существует."}), 409 now = datetime.now(BISHKEK_TZ) new_id = generate_unique_id(all_data) new_client = { 'id': new_id, 'telegram_id': None, 'first_name': first_name, 'last_name': None, 'username': None, 'photo_url': None, 'language_code': 'ru', 'is_premium': False, 'phone_number': phone_number, 'visited_at': now.timestamp(), 'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S'), 'bonuses': 0, 'history': [], 'debts': 0, 'debt_history': [], 'invoices': [], 'ton_address': "" } all_data[new_id] = new_client save_visitor_data(all_data) return jsonify({"status": "ok", "message": "Client added successfully"}), 201 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/add_transaction', methods=['POST']) def add_transaction(): try: data = request.get_json() user_id = data.get('user_id') purchase_amount = float(data.get('purchase_amount', 0)) deduct_amount = float(data.get('deduct_amount', 0)) add_debt_amount = float(data.get('add_debt_amount', 0)) repay_debt_amount = float(data.get('repay_debt_amount', 0)) if not user_id: return jsonify({"status": "error", "message": "User ID is required"}), 400 user_id_str = str(user_id) all_data = load_visitor_data() if user_id_str not in all_data or user_id_str == "organization_details": return jsonify({"status": "error", "message": "User not found"}), 404 user = all_data[user_id_str] now = datetime.now(BISHKEK_TZ) now_iso = now.isoformat() now_str = now.strftime('%Y-%m-%d %H:%M:%S') if deduct_amount > user.get('bonuses', 0): return jsonify({"status": "error", "message": "Недостаточно бонусов для списания"}), 400 if repay_debt_amount > user.get('debts', 0): return jsonify({"status": "error", "message": "Сумма погашения превышает текущий долг"}), 400 accrual_amount = purchase_amount * 0.02 user['bonuses'] = round(user.get('bonuses', 0) + accrual_amount - deduct_amount, 2) if 'history' not in user or not isinstance(user['history'], list): user['history'] = [] if accrual_amount > 0: user['history'].append({ "type": "accrual", "amount": round(accrual_amount, 2), "description": f"Начисление с покупки {round(purchase_amount, 2)}", "date": now_iso, "date_str": now_str }) if deduct_amount > 0: user['history'].append({ "type": "deduction", "amount": round(deduct_amount, 2), "description": "Списание бонусов", "date": now_iso, "date_str": now_str }) user['debts'] = round(user.get('debts', 0) + add_debt_amount - repay_debt_amount, 2) if 'debt_history' not in user or not isinstance(user['debt_history'], list): user['debt_history'] = [] if add_debt_amount > 0: user['debt_history'].append({ "type": "accrual", "amount": round(add_debt_amount, 2), "description": "Добавление долга", "date": now_iso, "date_str": now_str }) if repay_debt_amount > 0: user['debt_history'].append({ "type": "payment", "amount": round(repay_debt_amount, 2), "description": "Погашение долга", "date": now_iso, "date_str": now_str }) all_data[user_id_str] = user save_visitor_data(all_data) return jsonify({ "status": "ok", "message": "Transaction successful", "new_balance": user['bonuses'], "new_debt": user['debts'] }), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/add_invoice', methods=['POST']) def add_invoice(): try: data = request.get_json() user_id = data.get('user_id') total_amount = float(data.get('total_amount', 0)) items = data.get('items', []) if not user_id: return jsonify({"status": "error", "message": "User ID is required"}), 400 if not items: return jsonify({"status": "error", "message": "Необходимо добавить товары в накладную."}), 400 user_id_str = str(user_id) all_data = load_visitor_data() if user_id_str not in all_data or user_id_str == "organization_details": return jsonify({"status": "error", "message": "User not found"}), 404 user = all_data[user_id_str] now = datetime.now(BISHKEK_TZ) now_iso = now.isoformat() now_str = now.strftime('%Y-%m-%d %H:%M:%S') invoice_id = str(uuid.uuid4().hex[:8]).upper() processed_items = [] for item in items: p_name = item.get('product_name') qty = float(item.get('quantity', 0)) u_price = float(item.get('unit_price', 0)) i_total = round(qty * u_price, 2) processed_items.append({ "product_name": p_name, "quantity": qty, "unit_price": u_price, "item_total": i_total }) new_invoice = { "invoice_id": invoice_id, "date": now_iso, "date_str": now_str, "total_amount": round(total_amount, 2), "items": processed_items } if 'invoices' not in user or not isinstance(user['invoices'], list): user['invoices'] = [] user['invoices'].append(new_invoice) all_data[user_id_str] = user save_visitor_data(all_data) return jsonify({"status": "ok", "message": "Invoice added successfully", "invoice_id": invoice_id}), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/delete_invoice', methods=['POST']) def delete_invoice(): try: data = request.get_json() user_id = data.get('user_id') invoice_id = data.get('invoice_id') if not user_id or not invoice_id: return jsonify({"status": "error", "message": "User ID and Invoice ID are required"}), 400 user_id_str = str(user_id) all_data = load_visitor_data() if user_id_str not in all_data or user_id_str == "organization_details": return jsonify({"status": "error", "message": "User not found"}), 404 user = all_data[user_id_str] if 'invoices' not in user or not isinstance(user['invoices'], list): return jsonify({"status": "error", "message": "User has no invoices"}), 404 original_invoice_count = len(user['invoices']) user['invoices'] = [inv for inv in user['invoices'] if inv.get('invoice_id') != invoice_id] if len(user['invoices']) == original_invoice_count: return jsonify({"status": "error", "message": "Invoice not found for this user"}), 404 all_data[user_id_str] = user save_visitor_data(all_data) return jsonify({"status": "ok", "message": "Invoice deleted successfully"}), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/delete_client', methods=['POST']) def delete_client(): try: data = request.get_json() user_id = data.get('user_id') if not user_id: return jsonify({"status": "error", "message": "User ID is required"}), 400 user_id_str = str(user_id) all_data = load_visitor_data() with _data_lock: if user_id_str not in all_data or user_id_str == "organization_details": return jsonify({"status": "error", "message": "User not found"}), 404 user_to_delete = all_data[user_id_str] if user_to_delete.get('telegram_id') is not None: return jsonify({"status": "error", "message": "Cannot delete a Telegram-linked user"}), 403 del all_data[user_id_str] try: with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(all_data, f, ensure_ascii=False, indent=4) upload_data_to_hf_async() except Exception: return jsonify({"status": "error", "message": "Failed to save data after deletion"}), 500 return jsonify({"status": "ok", "message": "Client deleted successfully"}), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/organization_details', methods=['GET']) def get_organization_details(): try: all_data = load_visitor_data() org_details = all_data.get('organization_details', {}) return jsonify(org_details), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin/organization_details', methods=['POST']) def save_organization_details(): try: data = request.get_json() new_org_details = { "name": data.get("name", ""), "phone_numbers": data.get("phone_numbers", []), "address": data.get("address", ""), "whatsapp_link": data.get("whatsapp_link", ""), "telegram_link": data.get("telegram_link", "") } all_data = load_visitor_data() all_data['organization_details'] = new_org_details save_visitor_data(all_data) return jsonify({"status": "ok", "message": "Organization details saved successfully"}), 200 except Exception: return jsonify({"status": "error", "message": "Internal server error"}), 500 if __name__ == '__main__': if not HF_TOKEN_READ or not HF_TOKEN_WRITE: pass else: download_data_from_hf() load_visitor_data() if HF_TOKEN_WRITE: backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() app.run(host=HOST, port=PORT, debug=False)