#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import flask from flask import Flask, request, Response, render_template_string, jsonify, redirect, url_for, send_file import hmac import hashlib import json from urllib.parse import unquote, parse_qs, quote import time from datetime import datetime import logging import threading from huggingface_hub import HfApi, hf_hub_download, list_repo_files from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError import mimetypes import io import math BOT_TOKEN = os.getenv("BOT_TOKEN", "6750208873:AAE2hvPlJ99dBdhGa_Brre0IIpUdOvXxHt4") HOST = '0.0.0.0' PORT = 7860 DATA_FILE = 'data.json' REPO_ID = os.getenv("HF_REPO_ID", "Eluza133/Z1e1u") HF_DATA_FILE_PATH = "data.json" HF_UPLOAD_FOLDER = "uploads" HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE") HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") MAX_UPLOAD_FILES = 20 AUTH_TIMEOUT = 86400 app = Flask(__name__) logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') app.secret_key = os.urandom(24) _data_lock = threading.RLock() metadata_cache = {} def get_hf_api(write=False): token = HF_TOKEN_WRITE if write else HF_TOKEN_READ if not token: logging.warning(f"Hugging Face {'write' if write else 'read'} token not set.") return None return HfApi(token=token) def download_metadata_from_hf(): global metadata_cache api = get_hf_api(write=False) if not api: logging.warning("HF Read token missing. Cannot download metadata.") return False try: logging.info(f"Attempting to download {HF_DATA_FILE_PATH} from {REPO_ID}...") download_path = hf_hub_download( repo_id=REPO_ID, filename=HF_DATA_FILE_PATH, repo_type="dataset", token=api.token, local_dir=".", local_dir_use_symlinks=False, force_download=True, etag_timeout=10 ) logging.info("Metadata file successfully downloaded from Hugging Face.") with _data_lock: try: with open(download_path, 'r', encoding='utf-8') as f: metadata_cache = json.load(f) logging.info("Successfully loaded downloaded metadata into cache.") except (FileNotFoundError, json.JSONDecodeError) as e: logging.error(f"Error reading downloaded metadata file: {e}. Resetting cache.") metadata_cache = {} return True except EntryNotFoundError: logging.warning(f"Metadata file '{HF_DATA_FILE_PATH}' not found in repo '{REPO_ID}'. Starting fresh.") with _data_lock: metadata_cache = {} return True except RepositoryNotFoundError: logging.error(f"Hugging Face repository '{REPO_ID}' not found. Cannot download metadata.") except Exception as e: logging.error(f"Error downloading metadata from Hugging Face: {e}", exc_info=True) return False def load_local_metadata(): global metadata_cache with _data_lock: if not metadata_cache: try: with open(DATA_FILE, 'r', encoding='utf-8') as f: metadata_cache = json.load(f) logging.info("Metadata loaded from local JSON.") except FileNotFoundError: logging.warning(f"{DATA_FILE} not found locally. Starting with empty data.") metadata_cache = {} except json.JSONDecodeError: logging.error(f"Error decoding {DATA_FILE}. Starting with empty data.") metadata_cache = {} except Exception as e: logging.error(f"Unexpected error loading metadata: {e}") metadata_cache = {} return metadata_cache def save_metadata(data_to_update=None): global metadata_cache with _data_lock: try: if data_to_update: metadata_cache.update(data_to_update) with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump(metadata_cache, f, ensure_ascii=False, indent=4) logging.info(f"Metadata successfully saved locally to {DATA_FILE}.") upload_metadata_to_hf_async() return True except Exception as e: logging.error(f"Error saving metadata: {e}", exc_info=True) return False def update_user_file_metadata(user_id, file_info_list): user_id_str = str(user_id) with _data_lock: if user_id_str not in metadata_cache: metadata_cache[user_id_str] = {"user_info": {}, "files": []} if "files" not in metadata_cache[user_id_str]: metadata_cache[user_id_str]["files"] = [] existing_filenames = {f['filename'] for f in metadata_cache[user_id_str]["files"]} new_files_added = 0 for file_info in file_info_list: if file_info['filename'] not in existing_filenames: metadata_cache[user_id_str]["files"].append(file_info) existing_filenames.add(file_info['filename']) new_files_added += 1 else: logging.warning(f"File '{file_info['filename']}' already exists for user {user_id}. Skipping add.") if new_files_added > 0: logging.info(f"Added {new_files_added} file metadata entries for user {user_id}.") if not save_metadata(): return False else: logging.info(f"No new file metadata added for user {user_id}.") return True def _upload_metadata_to_hf_task(): api = get_hf_api(write=True) if not api: logging.warning("HF Write token missing. Skipping metadata upload.") return if not os.path.exists(DATA_FILE): logging.warning(f"{DATA_FILE} does not exist locally. Skipping upload.") return try: with _data_lock: if os.path.getsize(DATA_FILE) == 0: logging.warning(f"{DATA_FILE} is empty. Skipping upload.") return file_to_upload = DATA_FILE logging.info(f"Attempting to upload {file_to_upload} to {REPO_ID}/{HF_DATA_FILE_PATH}...") api.upload_file( path_or_fileobj=file_to_upload, path_in_repo=HF_DATA_FILE_PATH, repo_id=REPO_ID, repo_type="dataset", commit_message=f"Update metadata {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" ) logging.info("Metadata successfully uploaded to Hugging Face.") except Exception as e: logging.error(f"Error uploading metadata to Hugging Face: {e}", exc_info=True) def upload_metadata_to_hf_async(): upload_thread = threading.Thread(target=_upload_metadata_to_hf_task, daemon=True) upload_thread.start() def verify_telegram_data(init_data_str): try: parsed_data = parse_qs(init_data_str) received_hash = parsed_data.pop('hash', [None])[0] if not received_hash: logging.warning("Verification failed: Hash missing from initData.") return None, False, "Hash missing" data_check_list = [] for key, value in sorted(parsed_data.items()): data_check_list.append(f"{key}={value[0]}") data_check_string = "\n".join(data_check_list) secret_key = hmac.new("WebAppData".encode(), BOT_TOKEN.encode(), hashlib.sha256).digest() calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() if calculated_hash != received_hash: logging.warning(f"Verification failed: Hash mismatch. Calculated: {calculated_hash}, Received: {received_hash}") return parsed_data, False, "Invalid hash" auth_date = int(parsed_data.get('auth_date', [0])[0]) current_time = int(time.time()) if current_time - auth_date > AUTH_TIMEOUT: logging.warning(f"Verification failed: initData expired. Auth time: {auth_date}, Current time: {current_time}") return parsed_data, False, "Data expired" user_info_dict = None if 'user' in parsed_data: try: user_json_str = unquote(parsed_data['user'][0]) user_info_dict = json.loads(user_json_str) except Exception as e: logging.error(f"Could not parse user JSON from initData: {e}") logging.info(f"Telegram data verified successfully for user ID: {user_info_dict.get('id') if user_info_dict else 'Unknown'}") return user_info_dict, True, "Verified" except Exception as e: logging.error(f"Error during Telegram data verification: {e}", exc_info=True) return None, False, "Verification exception" def authenticate_and_get_user(init_data_str): user_info, is_valid, message = verify_telegram_data(init_data_str) if not is_valid: return None, message user_id = user_info.get('id') if user_info else None if not user_id: logging.warning("Verification successful but user ID is missing in user data.") return None, "User ID missing" user_id_str = str(user_id) with _data_lock: should_save = False if user_id_str not in metadata_cache: metadata_cache[user_id_str] = { "user_info": user_info, "files": [] } logging.info(f"New user registered: {user_id}") should_save = True else: if "user_info" not in metadata_cache[user_id_str] or metadata_cache[user_id_str]["user_info"] != user_info: metadata_cache[user_id_str]["user_info"] = user_info should_save = True if should_save: if not save_metadata(): logging.error(f"Failed to save metadata after updating/adding user {user_id}") return user_info, "Authenticated" USER_TEMPLATE = """ Zeus Cloud

Zeus Cloud

Загрузить файлы

Файлы не выбраны

Ваши файлы

""" ADMIN_TEMPLATE = """ Admin - Zeus Cloud

Zeus Cloud - Админ Панель

ВНИМАНИЕ: Этот раздел не защищен! Добавьте аутентификацию для реального использования.

Управление метаданными

{% if users %}
{% for user_id, data in users.items() %}
User Avatar
Язык: {{ data.user_info.language_code or 'N/A' }}
Premium: {{ 'Да' if data.user_info and data.user_info.is_premium else 'Нет' }}
Файлов загружено: {{ data.files|length if data.files else 0 }}
Просмотреть файлы
{% endfor %}
{% else %}

Пользователей не найдено.

{% endif %}
""" ADMIN_USER_FILES_TEMPLATE = """ Файлы пользователя {{ user_info.first_name or user_id }} - Admin
← Назад к списку пользователей

Файлы пользователя

{{ user_info.first_name or '' }} {{ user_info.last_name or '' }} (ID: {{ user_id }})
{% if files %} {% for file in files|sort(attribute='uploaded_at_ts', reverse=true) %} {% endfor %}
Имя файла Размер Дата загрузки Тип Действия
{{ file.filename }} {{ file.size | filesizeformat if file.size else 'N/A' }} {{ file.uploaded_at_str or 'N/A' }} {{ file.content_type or 'N/A' }} Скачать
{% else %}

У этого пользователя нет загруженных файлов.

{% endif %}
""" @app.template_filter('filesizeformat') def filesizeformat(value): try: bytes_val = int(value) if bytes_val == 0: return '0 Bytes' k = 1024 sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB'] i = min(int(math.floor(math.log(bytes_val) / math.log(k))), len(sizes) - 1) return f"{bytes_val / math.pow(k, i):.2f} {sizes[i]}" except (ValueError, TypeError): return value except Exception: return 'N/A' @app.route('/') def index(): return render_template_string(USER_TEMPLATE, theme={}, max_files=MAX_UPLOAD_FILES) @app.route('/files', methods=['POST']) def get_user_files(): req_data = request.get_json() init_data_str = req_data.get('initData') if not init_data_str: return jsonify({"status": "error", "message": "Missing initData"}), 400 user_info, message = authenticate_and_get_user(init_data_str) if not user_info: return jsonify({"status": "error", "message": message}), 403 user_id_str = str(user_info['id']) with _data_lock: user_data = metadata_cache.get(user_id_str, {}) files = user_data.get('files', []) return jsonify({"status": "ok", "files": files}), 200 @app.route('/upload', methods=['POST']) def upload_files(): init_data_str = request.form.get('initData') if not init_data_str: return jsonify({"status": "error", "message": "Missing initData"}), 400 user_info, message = authenticate_and_get_user(init_data_str) if not user_info: return jsonify({"status": "error", "message": message}), 403 user_id = user_info['id'] user_id_str = str(user_id) uploaded_files = request.files.getlist('files') if not uploaded_files or len(uploaded_files) == 0: return jsonify({"status": "error", "message": "No files selected for upload."}), 400 if len(uploaded_files) > MAX_UPLOAD_FILES: return jsonify({"status": "error", "message": f"Cannot upload more than {MAX_UPLOAD_FILES} files at once."}), 400 api = get_hf_api(write=True) if not api: return jsonify({"status": "error", "message": "Server error: Cannot connect to storage."}), 500 successful_uploads_metadata = [] errors = [] for file_storage in uploaded_files: filename = file_storage.filename if not filename: errors.append("Received a file without a name.") continue path_in_repo = f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}" file_content = file_storage.read() file_size = len(file_content) content_type, _ = mimetypes.guess_type(filename) try: logging.info(f"Uploading '{filename}' for user {user_id} to {path_in_repo}...") file_obj = io.BytesIO(file_content) api.upload_file( path_or_fileobj=file_obj, path_in_repo=path_in_repo, repo_id=REPO_ID, repo_type="dataset", commit_message=f"User {user_id} uploaded {filename}" ) logging.info(f"Successfully uploaded '{filename}' for user {user_id}.") now = time.time() successful_uploads_metadata.append({ "filename": filename, "hf_path": path_in_repo, "uploaded_at_ts": now, "uploaded_at_str": datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S'), "size": file_size, "content_type": content_type }) except Exception as e: logging.error(f"Failed to upload '{filename}' for user {user_id}: {e}", exc_info=True) errors.append(f"Ошибка загрузки {filename}: {str(e)}") if successful_uploads_metadata: if not update_user_file_metadata(user_id, successful_uploads_metadata): errors.append("Ошибка обновления списка файлов после загрузки.") if not errors: return jsonify({"status": "ok", "message": f"Загружено {len(successful_uploads_metadata)} файл(ов)."}), 200 else: return jsonify({ "status": "error" if not successful_uploads_metadata else "partial_success", "message": f"Загружено {len(successful_uploads_metadata)} из {len(uploaded_files)}. Ошибки: {'; '.join(errors)}", "uploaded_files": [f['filename'] for f in successful_uploads_metadata], "errors": errors }), 207 @app.route('/download/', methods=['GET']) def download_file(filename): init_data_str = request.args.get('initData') if not init_data_str: return "Authentication required.", 401 user_info, message = authenticate_and_get_user(init_data_str) if not user_info: return f"Access denied: {message}", 403 user_id = user_info['id'] user_id_str = str(user_id) with _data_lock: user_data = metadata_cache.get(user_id_str, {}) user_files = user_data.get('files', []) file_metadata = next((f for f in user_files if f['filename'] == filename), None) if not file_metadata: logging.warning(f"User {user_id} attempted to download unlisted/unowned file: {filename}") return "File not found or access denied.", 404 api = get_hf_api(write=False) if not api: return "Server error: Cannot connect to storage.", 500 path_in_repo = file_metadata.get('hf_path', f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}") try: logging.info(f"User {user_id} requesting download of {path_in_repo}") local_file_path = hf_hub_download( repo_id=REPO_ID, filename=path_in_repo, repo_type="dataset", token=api.token, force_download=False, etag_timeout=10 ) logging.info(f"File {path_in_repo} downloaded to cache: {local_file_path}") content_type = file_metadata.get('content_type') or mimetypes.guess_type(filename)[0] or 'application/octet-stream' return send_file( local_file_path, mimetype=content_type, as_attachment=False, download_name=filename ) except EntryNotFoundError: logging.error(f"File not found on Hugging Face: {path_in_repo}") return "File not found on storage.", 404 except RepositoryNotFoundError: logging.error(f"Repository not found: {REPO_ID}") return "Storage repository not found.", 500 except Exception as e: logging.error(f"Error downloading file {path_in_repo} for user {user_id}: {e}", exc_info=True) return "Server error during download.", 500 @app.route('/admin') def admin_panel(): current_data = load_local_metadata() return render_template_string(ADMIN_TEMPLATE, users=current_data) @app.route('/admin/user/') def admin_user_files(user_id): current_data = load_local_metadata() user_data = current_data.get(str(user_id)) if not user_data: return "User not found", 404 user_info = user_data.get("user_info", {"id": user_id}) files = user_data.get("files", []) return render_template_string(ADMIN_USER_FILES_TEMPLATE, user_id=user_id, user_info=user_info, files=files) @app.route('/admin/download//', methods=['GET']) def admin_download_file(user_id, filename): user_id_str = str(user_id) logging.info(f"Admin request to download file '{filename}' for user {user_id}") api = get_hf_api(write=False) if not api: return "Server error: Cannot connect to storage.", 500 path_in_repo = f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}" with _data_lock: user_data = metadata_cache.get(user_id_str, {}) user_files = user_data.get('files', []) file_metadata = next((f for f in user_files if f['filename'] == filename), None) if file_metadata and 'hf_path' in file_metadata: path_in_repo = file_metadata['hf_path'] try: local_file_path = hf_hub_download( repo_id=REPO_ID, filename=path_in_repo, repo_type="dataset", token=api.token, force_download=False, etag_timeout=10 ) logging.info(f"Admin download: File {path_in_repo} cached at {local_file_path}") content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream' if file_metadata and 'content_type' in file_metadata: content_type = file_metadata['content_type'] or content_type return send_file( local_file_path, mimetype=content_type, as_attachment=True, download_name=filename ) except EntryNotFoundError: logging.error(f"Admin download: File not found on Hugging Face: {path_in_repo}") return "File not found on storage.", 404 except Exception as e: logging.error(f"Admin download: Error for file {path_in_repo}: {e}", exc_info=True) return "Server error during download.", 500 @app.route('/admin/download_metadata', methods=['POST']) def admin_trigger_download_metadata(): success = download_metadata_from_hf() if success: return jsonify({"status": "ok", "message": "Скачивание data.json с Hugging Face завершено. Обновите страницу."}) else: return jsonify({"status": "error", "message": "Ошибка скачивания data.json. Проверьте логи."}), 500 if __name__ == '__main__': print("---") print("--- ZEUS CLOUD MINI APP SERVER ---") print("---") print(f"Starting Flask server on http://{HOST}:{PORT}") print(f"Using Bot Token ID: {BOT_TOKEN.split(':')[0]}") print(f"Metadata file (local): {DATA_FILE}") print(f"Hugging Face Repo: {REPO_ID}") print(f"HF Metadata Path: {HF_DATA_FILE_PATH}") print(f"HF Upload Folder: {HF_UPLOAD_FOLDER}//") if not HF_TOKEN_READ or not HF_TOKEN_WRITE: print("---") print("--- WARNING: HUGGING FACE TOKEN(S) NOT SET ---") print("--- Storage functionality requires HF_TOKEN_READ and HF_TOKEN_WRITE env vars.") print("---") else: print("--- Hugging Face tokens found.") print("--- Attempting initial metadata download from Hugging Face...") download_metadata_from_hf() load_local_metadata() print(f"--- Initial metadata cache loaded with {len(metadata_cache)} user(s).") print("---") print("--- SECURITY WARNING ---") print("--- The /admin routes are NOT protected by authentication.") print("--- Implement proper auth before any production deployment.") print("---") print("--- Server Ready ---") app.run(host=HOST, port=PORT, debug=False, threaded=True)