#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os from flask import Flask, request, Response, render_template_string, jsonify, redirect, url_for import hmac import hashlib import json from urllib.parse import unquote, parse_qs, quote import time from datetime import datetime import logging import threading from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import RepositoryNotFoundError BOT_TOKEN = os.getenv("BOT_TOKEN", "7531615056:AAFL7Lp1kc-sAshoiM0tfez5-7wea26GXYU") HOST = '0.0.0.0' PORT = 7860 DATA_FILE = 'data.json' REPO_ID = "flpolprojects/druzhbabase" HF_DATA_FILE_PATH = "data.json" HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") app = Flask(__name__) logging.basicConfig(level=logging.INFO) app.secret_key = os.urandom(24) _data_lock = threading.Lock() visitor_data_cache = {} def download_data_from_hf(): global visitor_data_cache if not HF_TOKEN_READ: logging.warning("HF_TOKEN_READ not set. Skipping Hugging Face download.") return False try: logging.info(f"Attempting to download {HF_DATA_FILE_PATH} from {REPO_ID}...") hf_hub_download( repo_id=REPO_ID, filename=HF_DATA_FILE_PATH, repo_type="dataset", token=HF_TOKEN_READ, local_dir=".", local_dir_use_symlinks=False, force_download=True, etag_timeout=10 ) logging.info("Data file successfully downloaded from Hugging Face.") with _data_lock: try: with open(DATA_FILE, 'r', encoding='utf-8') as f: visitor_data_cache = json.load(f) logging.info("Successfully loaded downloaded data into cache.") except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Error reading downloaded data file: {e}. Starting with empty cache.") visitor_data_cache = {} return True except RepositoryNotFoundError: logging.error(f"Hugging Face repository '{REPO_ID}' not found. Cannot download data.") except Exception as e: logging.error(f"Error downloading data from Hugging Face: {e}") return False def load_visitor_data(): global visitor_data_cache with _data_lock: if not visitor_data_cache: try: with open(DATA_FILE, 'r', encoding='utf-8') as f: visitor_data_cache = json.load(f) logging.info("Visitor data loaded from local JSON.") except FileNotFoundError: logging.warning(f"{DATA_FILE} not found locally. Starting with empty data.") visitor_data_cache = {} except json.JSONDecodeError: logging.error(f"Error decoding {DATA_FILE}. Starting with empty data.") visitor_data_cache = {} except Exception as e: logging.error(f"Unexpected error loading visitor data: {e}") visitor_data_cache = {} return visitor_data_cache def save_visitor_data(data): with _data_lock: try: visitor_data_cache.update(data) with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(visitor_data_cache, f, ensure_ascii=False, indent=4) logging.info(f"Visitor data successfully saved to {DATA_FILE}.") upload_data_to_hf_async() except Exception as e: logging.error(f"Error saving visitor data: {e}") def upload_data_to_hf(): if not HF_TOKEN_WRITE: logging.warning("HF_TOKEN_WRITE not set. Skipping Hugging Face upload.") return if not os.path.exists(DATA_FILE): logging.warning(f"{DATA_FILE} does not exist. Skipping upload.") return try: api = HfApi() with _data_lock: file_content_exists = os.path.getsize(DATA_FILE) > 0 if not file_content_exists: logging.warning(f"{DATA_FILE} is empty. Skipping upload.") return logging.info(f"Attempting to upload {DATA_FILE} to {REPO_ID}/{HF_DATA_FILE_PATH}...") api.upload_file( path_or_fileobj=DATA_FILE, path_in_repo=HF_DATA_FILE_PATH, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"Update bonus data {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) logging.info("Bonus data successfully uploaded to Hugging Face.") except Exception as e: logging.error(f"Error uploading data to Hugging Face: {e}") def upload_data_to_hf_async(): upload_thread = threading.Thread(target=upload_data_to_hf, daemon=True) upload_thread.start() def periodic_backup(): if not HF_TOKEN_WRITE: logging.info("Periodic backup disabled: HF_TOKEN_WRITE not set.") return while True: time.sleep(3600) logging.info("Initiating periodic backup...") upload_data_to_hf() def verify_telegram_data(init_data_str): try: parsed_data = parse_qs(init_data_str) received_hash = parsed_data.pop('hash', [None])[0] if not received_hash: return None, False data_check_list = [] for key, value in sorted(parsed_data.items()): data_check_list.append(f"{key}={value[0]}") data_check_string = "\n".join(data_check_list) secret_key = hmac.new("WebAppData".encode(), BOT_TOKEN.encode(), hashlib.sha256).digest() calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() if calculated_hash == received_hash: auth_date = int(parsed_data.get('auth_date', [0])[0]) current_time = int(time.time()) if current_time - auth_date > 86400: logging.warning(f"Telegram InitData is older than 24 hours (Auth Date: {auth_date}, Current: {current_time}).") return parsed_data, True else: logging.warning(f"Data verification failed. Calculated: {calculated_hash}, Received: {received_hash}") return parsed_data, False except Exception as e: logging.error(f"Error verifying Telegram data: {e}") return None, False TEMPLATE = """ Druzhba Бонусы

Добро пожаловать!

Ваши бонусы

{{ "%.2f"|format(user.bonuses|float) }}

История операций

{% if user.history %} {% else %}

Операций пока не было.

{% endif %}
""" ADMIN_TEMPLATE = """ Druzhba Admin

Панель администратора Druzhba

{% if users %}
{% for user in users|sort(attribute='visited_at', reverse=true) %}
Текущие бонусы
{{ "%.2f"|format(user.bonuses|float) }}
{% endfor %}
{% else %}

Пользователей пока нет.

{% endif %}
""" @app.route('/') def index(): user_id_str = request.args.get('user_id_for_test') current_data = load_visitor_data() user_data = {} if user_id_str and user_id_str in current_data: user_data = current_data[user_id_str] else: user_data = { "bonuses": 0, "history": [] } return render_template_string(TEMPLATE, user=user_data) @app.route('/verify', methods=['POST']) def verify_data(): try: req_data = request.get_json() init_data_str = req_data.get('initData') if not init_data_str: return jsonify({"status": "error", "message": "Missing initData"}), 400 user_data_parsed, is_valid = verify_telegram_data(init_data_str) user_info_dict = {} if user_data_parsed and 'user' in user_data_parsed: try: user_json_str = unquote(user_data_parsed['user'][0]) user_info_dict = json.loads(user_json_str) except Exception as e: logging.error(f"Could not parse user JSON: {e}") user_info_dict = {} if is_valid: user_id = user_info_dict.get('id') if user_id: now = datetime.now() user_id_str = str(user_id) all_data = load_visitor_data() if user_id_str in all_data: user_entry = all_data[user_id_str] user_entry.update({ 'first_name': user_info_dict.get('first_name'), 'last_name': user_info_dict.get('last_name'), 'username': user_info_dict.get('username'), 'photo_url': user_info_dict.get('photo_url'), 'language_code': user_info_dict.get('language_code'), 'visited_at': now.timestamp(), 'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S') }) else: user_entry = { 'id': user_id, 'first_name': user_info_dict.get('first_name'), 'last_name': user_info_dict.get('last_name'), 'username': user_info_dict.get('username'), 'photo_url': user_info_dict.get('photo_url'), 'language_code': user_info_dict.get('language_code'), 'is_premium': user_info_dict.get('is_premium', False), 'visited_at': now.timestamp(), 'visited_at_str': now.strftime('%Y-%m-%d %H:%M:%S'), 'bonuses': 0, 'history': [] } save_visitor_data({user_id_str: user_entry}) return redirect(url_for('index', user_id_for_test=user_id_str)) return jsonify({"status": "ok", "verified": True, "user": user_info_dict}), 200 else: logging.warning(f"Verification failed for user: {user_info_dict.get('id')}") return jsonify({"status": "error", "verified": False, "message": "Invalid data"}), 403 except Exception as e: logging.exception("Error in /verify endpoint") return jsonify({"status": "error", "message": "Internal server error"}), 500 @app.route('/admin') def admin_panel(): current_data = load_visitor_data() users_list = list(current_data.values()) return render_template_string(ADMIN_TEMPLATE, users=users_list) @app.route('/admin/add_transaction', methods=['POST']) def add_transaction(): try: data = request.get_json() user_id = data.get('user_id') purchase_amount = float(data.get('purchase_amount', 0)) deduct_amount = float(data.get('deduct_amount', 0)) if not user_id: return jsonify({"status": "error", "message": "User ID is required"}), 400 user_id_str = str(user_id) all_data = load_visitor_data() if user_id_str not in all_data: return jsonify({"status": "error", "message": "User not found"}), 404 user = all_data[user_id_str] now = datetime.now() now_str = now.strftime('%Y-%m-%d %H:%M:%S') accrual_amount = purchase_amount * 0.02 if deduct_amount > user.get('bonuses', 0): return jsonify({"status": "error", "message": "Not enough bonuses to deduct"}), 400 user['bonuses'] = user.get('bonuses', 0) + accrual_amount - deduct_amount if 'history' not in user or not isinstance(user['history'], list): user['history'] = [] if accrual_amount > 0: user['history'].append({ "type": "accrual", "amount": accrual_amount, "description": f"Начисление с покупки {purchase_amount}", "date": now.isoformat(), "date_str": now_str }) if deduct_amount > 0: user['history'].append({ "type": "deduction", "amount": deduct_amount, "description": "Списание бонусов", "date": now.isoformat(), "date_str": now_str }) save_visitor_data({user_id_str: user}) return jsonify({"status": "ok", "message": "Transaction successful", "new_balance": user['bonuses']}), 200 except Exception as e: logging.exception("Error in /admin/add_transaction endpoint") return jsonify({"status": "error", "message": str(e)}), 500 if __name__ == '__main__': print("--- DRUZHBA BONUS SYSTEM SERVER ---") print(f"Server starting on http://{HOST}:{PORT}") if not HF_TOKEN_READ or not HF_TOKEN_WRITE: print("WARNING: Hugging Face token(s) not set. Backup/restore functionality will be limited.") else: print("Attempting initial data download from Hugging Face...") download_data_from_hf() load_visitor_data() print("WARNING: The /admin route is NOT protected. Implement proper authentication for production.") if HF_TOKEN_WRITE: backup_thread = threading.Thread(target=periodic_backup, daemon=True) backup_thread.start() print("Periodic backup thread started (every hour).") print("--- Server Ready ---") app.run(host=HOST, port=PORT, debug=False)