#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import flask
from flask import Flask, request, Response, render_template_string, jsonify, redirect, url_for, send_file
import hmac
import hashlib
import json
from urllib.parse import unquote, parse_qs, quote
import time
from datetime import datetime
import logging
import threading
from huggingface_hub import HfApi, hf_hub_download, list_repo_files
from huggingface_hub.utils import RepositoryNotFoundError, EntryNotFoundError
import mimetypes
import io
import math
BOT_TOKEN = os.getenv("BOT_TOKEN", "6750208873:AAE2hvPlJ99dBdhGa_Brre0IIpUdOvXxHt4")
HOST = '0.0.0.0'
PORT = 7860
DATA_FILE = 'data.json'
REPO_ID = os.getenv("HF_REPO_ID", "Eluza133/Z1e1u")
HF_DATA_FILE_PATH = "data.json"
HF_UPLOAD_FOLDER = "uploads"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN_WRITE")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ")
MAX_UPLOAD_FILES = 20
AUTH_TIMEOUT = 86400
app = Flask(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app.secret_key = os.urandom(24)
_data_lock = threading.RLock()
metadata_cache = {}
def get_hf_api(write=False):
token = HF_TOKEN_WRITE if write else HF_TOKEN_READ
if not token:
logging.warning(f"Hugging Face {'write' if write else 'read'} token not set.")
return None
return HfApi(token=token)
def download_metadata_from_hf():
global metadata_cache
api = get_hf_api(write=False)
if not api:
logging.warning("HF Read token missing. Cannot download metadata.")
return False
try:
logging.info(f"Attempting to download {HF_DATA_FILE_PATH} from {REPO_ID}...")
download_path = hf_hub_download(
repo_id=REPO_ID,
filename=HF_DATA_FILE_PATH,
repo_type="dataset",
token=api.token,
local_dir=".",
local_dir_use_symlinks=False,
force_download=True,
etag_timeout=10
)
logging.info("Metadata file successfully downloaded from Hugging Face.")
with _data_lock:
try:
with open(download_path, 'r', encoding='utf-8') as f:
metadata_cache = json.load(f)
logging.info("Successfully loaded downloaded metadata into cache.")
except (FileNotFoundError, json.JSONDecodeError) as e:
logging.error(f"Error reading downloaded metadata file: {e}. Resetting cache.")
metadata_cache = {}
return True
except EntryNotFoundError:
logging.warning(f"Metadata file '{HF_DATA_FILE_PATH}' not found in repo '{REPO_ID}'. Starting fresh.")
with _data_lock:
metadata_cache = {}
return True
except RepositoryNotFoundError:
logging.error(f"Hugging Face repository '{REPO_ID}' not found. Cannot download metadata.")
except Exception as e:
logging.error(f"Error downloading metadata from Hugging Face: {e}", exc_info=True)
return False
def load_local_metadata():
global metadata_cache
with _data_lock:
if not metadata_cache:
try:
with open(DATA_FILE, 'r', encoding='utf-8') as f:
metadata_cache = json.load(f)
logging.info("Metadata loaded from local JSON.")
except FileNotFoundError:
logging.warning(f"{DATA_FILE} not found locally. Starting with empty data.")
metadata_cache = {}
except json.JSONDecodeError:
logging.error(f"Error decoding {DATA_FILE}. Starting with empty data.")
metadata_cache = {}
except Exception as e:
logging.error(f"Unexpected error loading metadata: {e}")
metadata_cache = {}
return metadata_cache
def save_metadata(data_to_update=None):
global metadata_cache
with _data_lock:
try:
if data_to_update:
metadata_cache.update(data_to_update)
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump(metadata_cache, f, ensure_ascii=False, indent=4)
logging.info(f"Metadata successfully saved locally to {DATA_FILE}.")
upload_metadata_to_hf_async()
return True
except Exception as e:
logging.error(f"Error saving metadata: {e}", exc_info=True)
return False
def update_user_file_metadata(user_id, file_info_list):
user_id_str = str(user_id)
with _data_lock:
if user_id_str not in metadata_cache:
metadata_cache[user_id_str] = {"user_info": {}, "files": []}
if "files" not in metadata_cache[user_id_str]:
metadata_cache[user_id_str]["files"] = []
existing_filenames = {f['filename'] for f in metadata_cache[user_id_str]["files"]}
new_files_added = 0
for file_info in file_info_list:
if file_info['filename'] not in existing_filenames:
metadata_cache[user_id_str]["files"].append(file_info)
existing_filenames.add(file_info['filename'])
new_files_added += 1
else:
logging.warning(f"File '{file_info['filename']}' already exists for user {user_id}. Skipping add.")
if new_files_added > 0:
logging.info(f"Added {new_files_added} file metadata entries for user {user_id}.")
if not save_metadata():
return False
else:
logging.info(f"No new file metadata added for user {user_id}.")
return True
def _upload_metadata_to_hf_task():
api = get_hf_api(write=True)
if not api:
logging.warning("HF Write token missing. Skipping metadata upload.")
return
if not os.path.exists(DATA_FILE):
logging.warning(f"{DATA_FILE} does not exist locally. Skipping upload.")
return
try:
with _data_lock:
if os.path.getsize(DATA_FILE) == 0:
logging.warning(f"{DATA_FILE} is empty. Skipping upload.")
return
file_to_upload = DATA_FILE
logging.info(f"Attempting to upload {file_to_upload} to {REPO_ID}/{HF_DATA_FILE_PATH}...")
api.upload_file(
path_or_fileobj=file_to_upload,
path_in_repo=HF_DATA_FILE_PATH,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"Update metadata {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logging.info("Metadata successfully uploaded to Hugging Face.")
except Exception as e:
logging.error(f"Error uploading metadata to Hugging Face: {e}", exc_info=True)
def upload_metadata_to_hf_async():
upload_thread = threading.Thread(target=_upload_metadata_to_hf_task, daemon=True)
upload_thread.start()
def verify_telegram_data(init_data_str):
try:
parsed_data = parse_qs(init_data_str)
received_hash = parsed_data.pop('hash', [None])[0]
if not received_hash:
logging.warning("Verification failed: Hash missing from initData.")
return None, False, "Hash missing"
data_check_list = []
for key, value in sorted(parsed_data.items()):
data_check_list.append(f"{key}={value[0]}")
data_check_string = "\n".join(data_check_list)
secret_key = hmac.new("WebAppData".encode(), BOT_TOKEN.encode(), hashlib.sha256).digest()
calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest()
if calculated_hash != received_hash:
logging.warning(f"Verification failed: Hash mismatch. Calculated: {calculated_hash}, Received: {received_hash}")
return parsed_data, False, "Invalid hash"
auth_date = int(parsed_data.get('auth_date', [0])[0])
current_time = int(time.time())
if current_time - auth_date > AUTH_TIMEOUT:
logging.warning(f"Verification failed: initData expired. Auth time: {auth_date}, Current time: {current_time}")
return parsed_data, False, "Data expired"
user_info_dict = None
if 'user' in parsed_data:
try:
user_json_str = unquote(parsed_data['user'][0])
user_info_dict = json.loads(user_json_str)
except Exception as e:
logging.error(f"Could not parse user JSON from initData: {e}")
logging.info(f"Telegram data verified successfully for user ID: {user_info_dict.get('id') if user_info_dict else 'Unknown'}")
return user_info_dict, True, "Verified"
except Exception as e:
logging.error(f"Error during Telegram data verification: {e}", exc_info=True)
return None, False, "Verification exception"
def authenticate_and_get_user(init_data_str):
user_info, is_valid, message = verify_telegram_data(init_data_str)
if not is_valid:
return None, message
user_id = user_info.get('id') if user_info else None
if not user_id:
logging.warning("Verification successful but user ID is missing in user data.")
return None, "User ID missing"
user_id_str = str(user_id)
with _data_lock:
should_save = False
if user_id_str not in metadata_cache:
metadata_cache[user_id_str] = {
"user_info": user_info,
"files": []
}
logging.info(f"New user registered: {user_id}")
should_save = True
else:
if "user_info" not in metadata_cache[user_id_str] or metadata_cache[user_id_str]["user_info"] != user_info:
metadata_cache[user_id_str]["user_info"] = user_info
should_save = True
if should_save:
if not save_metadata():
logging.error(f"Failed to save metadata after updating/adding user {user_id}")
return user_info, "Authenticated"
USER_TEMPLATE = """
Zeus Cloud
Ваши файлы
- У вас пока нет загруженных файлов.
"""
ADMIN_TEMPLATE = """
Admin - Zeus Cloud
Zeus Cloud - Админ Панель
ВНИМАНИЕ: Этот раздел не защищен! Добавьте аутентификацию для реального использования.
Управление метаданными
{% if users %}
{% for user_id, data in users.items() %}
Язык: {{ data.user_info.language_code or 'N/A' }}
Premium: {{ 'Да' if data.user_info and data.user_info.is_premium else 'Нет' }}
Файлов загружено: {{ data.files|length if data.files else 0 }}
Просмотреть файлы
{% endfor %}
{% else %}
Пользователей не найдено.
{% endif %}
"""
ADMIN_USER_FILES_TEMPLATE = """
Файлы пользователя {{ user_info.first_name or user_id }} - Admin
← Назад к списку пользователей
Файлы пользователя
{{ user_info.first_name or '' }} {{ user_info.last_name or '' }} (ID: {{ user_id }})
{% if files %}
| Имя файла |
Размер |
Дата загрузки |
Тип |
Действия |
{% for file in files|sort(attribute='uploaded_at_ts', reverse=true) %}
| {{ file.filename }} |
{{ file.size | filesizeformat if file.size else 'N/A' }} |
{{ file.uploaded_at_str or 'N/A' }} |
{{ file.content_type or 'N/A' }} |
Скачать
|
{% endfor %}
{% else %}
У этого пользователя нет загруженных файлов.
{% endif %}
"""
@app.template_filter('filesizeformat')
def filesizeformat(value):
try:
bytes_val = int(value)
if bytes_val == 0: return '0 Bytes'
k = 1024
sizes = ['Bytes', 'KB', 'MB', 'GB', 'TB']
i = min(int(math.floor(math.log(bytes_val) / math.log(k))), len(sizes) - 1)
return f"{bytes_val / math.pow(k, i):.2f} {sizes[i]}"
except (ValueError, TypeError):
return value
except Exception:
return 'N/A'
@app.route('/')
def index():
return render_template_string(USER_TEMPLATE, theme={}, max_files=MAX_UPLOAD_FILES)
@app.route('/files', methods=['POST'])
def get_user_files():
req_data = request.get_json()
init_data_str = req_data.get('initData')
if not init_data_str:
return jsonify({"status": "error", "message": "Missing initData"}), 400
user_info, message = authenticate_and_get_user(init_data_str)
if not user_info:
return jsonify({"status": "error", "message": message}), 403
user_id_str = str(user_info['id'])
with _data_lock:
user_data = metadata_cache.get(user_id_str, {})
files = user_data.get('files', [])
return jsonify({"status": "ok", "files": files}), 200
@app.route('/upload', methods=['POST'])
def upload_files():
init_data_str = request.form.get('initData')
if not init_data_str:
return jsonify({"status": "error", "message": "Missing initData"}), 400
user_info, message = authenticate_and_get_user(init_data_str)
if not user_info:
return jsonify({"status": "error", "message": message}), 403
user_id = user_info['id']
user_id_str = str(user_id)
uploaded_files = request.files.getlist('files')
if not uploaded_files or len(uploaded_files) == 0:
return jsonify({"status": "error", "message": "No files selected for upload."}), 400
if len(uploaded_files) > MAX_UPLOAD_FILES:
return jsonify({"status": "error", "message": f"Cannot upload more than {MAX_UPLOAD_FILES} files at once."}), 400
api = get_hf_api(write=True)
if not api:
return jsonify({"status": "error", "message": "Server error: Cannot connect to storage."}), 500
successful_uploads_metadata = []
errors = []
for file_storage in uploaded_files:
filename = file_storage.filename
if not filename:
errors.append("Received a file without a name.")
continue
path_in_repo = f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}"
file_content = file_storage.read()
file_size = len(file_content)
content_type, _ = mimetypes.guess_type(filename)
try:
logging.info(f"Uploading '{filename}' for user {user_id} to {path_in_repo}...")
file_obj = io.BytesIO(file_content)
api.upload_file(
path_or_fileobj=file_obj,
path_in_repo=path_in_repo,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"User {user_id} uploaded {filename}"
)
logging.info(f"Successfully uploaded '{filename}' for user {user_id}.")
now = time.time()
successful_uploads_metadata.append({
"filename": filename,
"hf_path": path_in_repo,
"uploaded_at_ts": now,
"uploaded_at_str": datetime.fromtimestamp(now).strftime('%Y-%m-%d %H:%M:%S'),
"size": file_size,
"content_type": content_type
})
except Exception as e:
logging.error(f"Failed to upload '{filename}' for user {user_id}: {e}", exc_info=True)
errors.append(f"Ошибка загрузки {filename}: {str(e)}")
if successful_uploads_metadata:
if not update_user_file_metadata(user_id, successful_uploads_metadata):
errors.append("Ошибка обновления списка файлов после загрузки.")
if not errors:
return jsonify({"status": "ok", "message": f"Загружено {len(successful_uploads_metadata)} файл(ов)."}), 200
else:
return jsonify({
"status": "error" if not successful_uploads_metadata else "partial_success",
"message": f"Загружено {len(successful_uploads_metadata)} из {len(uploaded_files)}. Ошибки: {'; '.join(errors)}",
"uploaded_files": [f['filename'] for f in successful_uploads_metadata],
"errors": errors
}), 207
@app.route('/download/', methods=['GET'])
def download_file(filename):
init_data_str = request.args.get('initData')
if not init_data_str:
return "Authentication required.", 401
user_info, message = authenticate_and_get_user(init_data_str)
if not user_info:
return f"Access denied: {message}", 403
user_id = user_info['id']
user_id_str = str(user_id)
with _data_lock:
user_data = metadata_cache.get(user_id_str, {})
user_files = user_data.get('files', [])
file_metadata = next((f for f in user_files if f['filename'] == filename), None)
if not file_metadata:
logging.warning(f"User {user_id} attempted to download unlisted/unowned file: {filename}")
return "File not found or access denied.", 404
api = get_hf_api(write=False)
if not api:
return "Server error: Cannot connect to storage.", 500
path_in_repo = file_metadata.get('hf_path', f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}")
try:
logging.info(f"User {user_id} requesting download of {path_in_repo}")
local_file_path = hf_hub_download(
repo_id=REPO_ID,
filename=path_in_repo,
repo_type="dataset",
token=api.token,
force_download=False,
etag_timeout=10
)
logging.info(f"File {path_in_repo} downloaded to cache: {local_file_path}")
content_type = file_metadata.get('content_type') or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
return send_file(
local_file_path,
mimetype=content_type,
as_attachment=False,
download_name=filename
)
except EntryNotFoundError:
logging.error(f"File not found on Hugging Face: {path_in_repo}")
return "File not found on storage.", 404
except RepositoryNotFoundError:
logging.error(f"Repository not found: {REPO_ID}")
return "Storage repository not found.", 500
except Exception as e:
logging.error(f"Error downloading file {path_in_repo} for user {user_id}: {e}", exc_info=True)
return "Server error during download.", 500
@app.route('/admin')
def admin_panel():
current_data = load_local_metadata()
return render_template_string(ADMIN_TEMPLATE, users=current_data)
@app.route('/admin/user/')
def admin_user_files(user_id):
current_data = load_local_metadata()
user_data = current_data.get(str(user_id))
if not user_data:
return "User not found", 404
user_info = user_data.get("user_info", {"id": user_id})
files = user_data.get("files", [])
return render_template_string(ADMIN_USER_FILES_TEMPLATE,
user_id=user_id,
user_info=user_info,
files=files)
@app.route('/admin/download//', methods=['GET'])
def admin_download_file(user_id, filename):
user_id_str = str(user_id)
logging.info(f"Admin request to download file '{filename}' for user {user_id}")
api = get_hf_api(write=False)
if not api:
return "Server error: Cannot connect to storage.", 500
path_in_repo = f"{HF_UPLOAD_FOLDER}/{user_id_str}/{filename}"
with _data_lock:
user_data = metadata_cache.get(user_id_str, {})
user_files = user_data.get('files', [])
file_metadata = next((f for f in user_files if f['filename'] == filename), None)
if file_metadata and 'hf_path' in file_metadata:
path_in_repo = file_metadata['hf_path']
try:
local_file_path = hf_hub_download(
repo_id=REPO_ID,
filename=path_in_repo,
repo_type="dataset",
token=api.token,
force_download=False,
etag_timeout=10
)
logging.info(f"Admin download: File {path_in_repo} cached at {local_file_path}")
content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
if file_metadata and 'content_type' in file_metadata:
content_type = file_metadata['content_type'] or content_type
return send_file(
local_file_path,
mimetype=content_type,
as_attachment=True,
download_name=filename
)
except EntryNotFoundError:
logging.error(f"Admin download: File not found on Hugging Face: {path_in_repo}")
return "File not found on storage.", 404
except Exception as e:
logging.error(f"Admin download: Error for file {path_in_repo}: {e}", exc_info=True)
return "Server error during download.", 500
@app.route('/admin/download_metadata', methods=['POST'])
def admin_trigger_download_metadata():
success = download_metadata_from_hf()
if success:
return jsonify({"status": "ok", "message": "Скачивание data.json с Hugging Face завершено. Обновите страницу."})
else:
return jsonify({"status": "error", "message": "Ошибка скачивания data.json. Проверьте логи."}), 500
if __name__ == '__main__':
print("---")
print("--- ZEUS CLOUD MINI APP SERVER ---")
print("---")
print(f"Starting Flask server on http://{HOST}:{PORT}")
print(f"Using Bot Token ID: {BOT_TOKEN.split(':')[0]}")
print(f"Metadata file (local): {DATA_FILE}")
print(f"Hugging Face Repo: {REPO_ID}")
print(f"HF Metadata Path: {HF_DATA_FILE_PATH}")
print(f"HF Upload Folder: {HF_UPLOAD_FOLDER}//")
if not HF_TOKEN_READ or not HF_TOKEN_WRITE:
print("---")
print("--- WARNING: HUGGING FACE TOKEN(S) NOT SET ---")
print("--- Storage functionality requires HF_TOKEN_READ and HF_TOKEN_WRITE env vars.")
print("---")
else:
print("--- Hugging Face tokens found.")
print("--- Attempting initial metadata download from Hugging Face...")
download_metadata_from_hf()
load_local_metadata()
print(f"--- Initial metadata cache loaded with {len(metadata_cache)} user(s).")
print("---")
print("--- SECURITY WARNING ---")
print("--- The /admin routes are NOT protected by authentication.")
print("--- Implement proper auth before any production deployment.")
print("---")
print("--- Server Ready ---")
app.run(host=HOST, port=PORT, debug=False, threaded=True)