m / app.py
Eluza133's picture
Update app.py
a85fcd2 verified
raw
history blame
76.4 kB
import flask
from flask import Flask, render_template_string, request, redirect, url_for, session, flash, send_file, jsonify, Response
from flask_caching import Cache
import json
import os
import logging
import threading
import time
from datetime import datetime
from huggingface_hub import HfApi, hf_hub_download, utils as hf_utils
from werkzeug.utils import secure_filename
import requests
from io import BytesIO
import uuid
from functools import wraps
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "supersecretkey_folders_unique_tma")
DATA_FILE = 'cloudeng_data_tma.json'
REPO_ID = "Eluza133/Z1e1u"
HF_TOKEN_WRITE = os.getenv("HF_TOKEN")
HF_TOKEN_READ = os.getenv("HF_TOKEN_READ") or HF_TOKEN_WRITE
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN", "6750208873:AAE2hvPlS99dBdhGa_Brre0IIpUdOvXxHt4")
ADMIN_TELEGRAM_ID = os.getenv("ADMIN_TELEGRAM_ID", "YOUR_ADMIN_TELEGRAM_USER_ID_HERE")
BOT_USERNAME = os.getenv("BOT_USERNAME", "ZeusCloudBot")
ADMIN_USERNAME = os.getenv("ADMIN_USERNAME", "admin")
ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "zeusadminpass")
UPLOAD_FOLDER = 'uploads_tma'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
logging.basicConfig(level=logging.INFO)
DEFAULT_STORAGE_LIMIT_GB = 10
BYTES_IN_GB = 1024**3
BASE_STYLE = '''
:root {
--primary: #ff4d6d; --secondary: #00ddeb; --accent: #8b5cf6;
--background-light: #f5f6fa; --background-dark: #1a1625;
--card-bg: rgba(255, 255, 255, 0.95); --card-bg-dark: rgba(40, 35, 60, 0.95);
--text-light: #2a1e5a; --text-dark: #e8e1ff; --shadow: 0 10px 30px rgba(0, 0, 0, 0.2);
--glass-bg: rgba(255, 255, 255, 0.15); --transition: all 0.3s ease; --delete-color: #ff4444;
--folder-color: #ffc107;
}
* { margin: 0; padding: 0; box-sizing: border-box; }
body { font-family: 'Inter', sans-serif; background: var(--background-light); color: var(--text-light); line-height: 1.6; }
body.dark { background: var(--background-dark); color: var(--text-dark); }
.container { margin: 20px auto; max-width: 1200px; padding: 25px; background: var(--card-bg); border-radius: 20px; box-shadow: var(--shadow); overflow-x: hidden; }
body.dark .container { background: var(--card-bg-dark); }
h1 { font-size: 2em; font-weight: 800; text-align: center; margin-bottom: 25px; background: linear-gradient(135deg, var(--primary), var(--accent)); -webkit-background-clip: text; color: transparent; }
h2 { font-size: 1.5em; margin-top: 30px; color: var(--text-light); }
body.dark h2 { color: var(--text-dark); }
h4 { font-size: 1.1em; margin-top: 15px; margin-bottom: 5px; color: var(--accent); }
ol, ul { margin-left: 20px; margin-bottom: 15px; }
li { margin-bottom: 5px; }
input, textarea, input[type="text"], input[type="password"], input[type="file"] { width: 100%; padding: 14px; margin: 12px 0; border: none; border-radius: 14px; background: var(--glass-bg); color: var(--text-light); font-size: 1.1em; box-shadow: inset 0 3px 10px rgba(0, 0, 0, 0.1); }
body.dark input, body.dark textarea, body.dark input[type="text"], body.dark input[type="password"], body.dark input[type="file"] { color: var(--text-dark); background: rgba(255,255,255,0.05); }
input:focus, textarea:focus { outline: none; box-shadow: 0 0 0 4px var(--primary); }
.btn { padding: 14px 28px; background: var(--primary); color: white; border: none; border-radius: 14px; cursor: pointer; font-size: 1.1em; font-weight: 600; transition: var(--transition); box-shadow: var(--shadow); display: inline-block; text-decoration: none; margin-top: 5px; margin-right: 5px; }
.btn:hover { transform: scale(1.05); background: #e6415f; }
.download-btn { background: var(--secondary); }
.download-btn:hover { background: #00b8c5; }
.delete-btn { background: var(--delete-color); }
.delete-btn:hover { background: #cc3333; }
.folder-btn { background: var(--folder-color); }
.folder-btn:hover { background: #e6a000; }
.flash { color: var(--secondary); text-align: center; margin-bottom: 15px; padding: 10px; background: rgba(0, 221, 235, 0.1); border-radius: 10px; }
.flash.error { color: var(--delete-color); background: rgba(255, 68, 68, 0.1); }
.flash.success { color: var(--accent); background: rgba(139, 92, 246, 0.1); }
.flash.info { color: #555; background: rgba(200, 200, 200, 0.1); }
.file-grid { display: grid; grid-template-columns: repeat(auto-fill, minmax(200px, 1fr)); gap: 20px; margin-top: 20px; }
.user-list { margin-top: 20px; }
.user-item { padding: 15px; background: var(--card-bg); border-radius: 16px; margin-bottom: 10px; box-shadow: var(--shadow); transition: var(--transition); }
body.dark .user-item { background: var(--card-bg-dark); }
.user-item:hover { transform: translateY(-5px); }
.user-item a { color: var(--primary); text-decoration: none; font-weight: 600; }
.user-item a:hover { color: var(--accent); }
.item { background: var(--card-bg); padding: 15px; border-radius: 16px; box-shadow: var(--shadow); text-align: center; transition: var(--transition); display: flex; flex-direction: column; justify-content: space-between; }
body.dark .item { background: var(--card-bg-dark); }
.item:hover { transform: translateY(-5px); }
.item-preview { max-width: 100%; height: 130px; object-fit: cover; border-radius: 10px; margin-bottom: 10px; cursor: pointer; display: block; margin-left: auto; margin-right: auto;}
.item.folder .item-preview { object-fit: contain; font-size: 60px; color: var(--folder-color); line-height: 130px; }
.item p { font-size: 0.9em; margin: 5px 0; word-break: break-all; }
.item a { color: var(--primary); text-decoration: none; }
.item a:hover { color: var(--accent); }
.item-actions { margin-top: 10px; display: flex; flex-wrap: wrap; gap: 5px; justify-content: center; }
.item-actions .btn { font-size: 0.9em; padding: 5px 10px; }
.modal { display: none; position: fixed; top: 0; left: 0; width: 100%; height: 100%; background: rgba(0, 0, 0, 0.85); z-index: 2000; justify-content: center; align-items: center; }
.modal-content { max-width: 95%; max-height: 95%; background: #fff; padding: 10px; border-radius: 15px; overflow: auto; position: relative; }
body.dark .modal-content { background: var(--card-bg-dark); }
.modal img, .modal video, .modal iframe, .modal pre { max-width: 100%; max-height: 85vh; display: block; margin: auto; border-radius: 10px; }
.modal iframe { width: 80vw; height: 85vh; border: none; }
.modal pre { background: #eee; color: #333; padding: 15px; border-radius: 8px; white-space: pre-wrap; word-wrap: break-word; text-align: left; max-height: 85vh; overflow-y: auto;}
body.dark .modal pre { background: #2b2a33; color: var(--text-dark); }
.modal-close-btn { position: absolute; top: 15px; right: 25px; font-size: 30px; color: #aaa; cursor: pointer; background: rgba(0,0,0,0.5); border-radius: 50%; width: 30px; height: 30px; line-height: 30px; text-align: center; }
body.dark .modal-close-btn { color: #555; background: rgba(255,255,255,0.2); }
#progress-container { width: 100%; background: var(--glass-bg); border-radius: 10px; margin: 15px 0; display: none; position: relative; height: 20px; }
#progress-bar { width: 0%; height: 100%; background: var(--primary); border-radius: 10px; transition: width 0.3s ease; }
#progress-text { position: absolute; width: 100%; text-align: center; line-height: 20px; color: white; font-size: 0.9em; font-weight: bold; text-shadow: 1px 1px 1px rgba(0,0,0,0.5); }
.breadcrumbs { margin-bottom: 20px; font-size: 1.1em; }
.breadcrumbs a { color: var(--accent); text-decoration: none; }
.breadcrumbs a:hover { text-decoration: underline; }
.breadcrumbs span { margin: 0 5px; color: #aaa; }
.folder-actions { margin-top: 20px; margin-bottom: 10px; display: flex; gap: 10px; align-items: center; flex-wrap: wrap; }
.folder-actions input[type=text] { width: auto; flex-grow: 1; margin: 0; min-width: 150px; }
.folder-actions .btn { margin: 0; flex-shrink: 0;}
@media (max-width: 768px) {
.file-grid { grid-template-columns: repeat(auto-fill, minmax(150px, 1fr)); }
.folder-actions { flex-direction: column; align-items: stretch; }
.folder-actions input[type=text] { width: 100%; }
.item-preview { height: 100px; }
.item.folder .item-preview { font-size: 50px; line-height: 100px; }
h1 { font-size: 1.8em; }
.btn { padding: 12px 24px; font-size: 1em; }
.item-actions .btn { padding: 4px 8px; font-size: 0.8em;}
}
@media (max-width: 480px) {
.container { padding: 15px; }
.file-grid { grid-template-columns: repeat(auto-fill, minmax(120px, 1fr)); gap: 15px; }
.item-preview { height: 80px; }
.item.folder .item-preview { font-size: 40px; line-height: 80px; }
.item p { font-size: 0.8em;}
.breadcrumbs { font-size: 1em; }
.btn { padding: 10px 20px; }
}
'''
def find_node_by_id(filesystem, node_id):
if not filesystem: return None, None
if filesystem.get('id') == node_id:
return filesystem, None
queue = [(filesystem, None)]
while queue:
current_node, parent = queue.pop(0)
if current_node.get('type') == 'folder' and 'children' in current_node:
for i, child in enumerate(current_node['children']):
if child.get('id') == node_id:
return child, current_node
if child.get('type') == 'folder':
queue.append((child, current_node))
return None, None
def add_node(filesystem, parent_id, node_data):
parent_node, _ = find_node_by_id(filesystem, parent_id)
if parent_node and parent_node.get('type') == 'folder':
if 'children' not in parent_node:
parent_node['children'] = []
parent_node['children'].append(node_data)
return True
return False
def remove_node(filesystem, node_id):
node_to_remove, parent_node = find_node_by_id(filesystem, node_id)
if node_to_remove and parent_node and 'children' in parent_node:
parent_node['children'] = [child for child in parent_node['children'] if child.get('id') != node_id]
return True
return False
def get_node_path_string(filesystem, node_id):
path_list = []
current_id = node_id
while current_id:
node, parent = find_node_by_id(filesystem, current_id)
if not node: break
if node.get('id') != 'root':
path_list.append(node.get('name', node.get('original_filename', '')))
if not parent: break
current_id = parent.get('id') if parent else None
return " / ".join(reversed(path_list)) or "Root"
def calculate_current_storage_usage(filesystem_node):
total_size = 0
if not filesystem_node:
return 0
if filesystem_node.get('type') == 'file':
return filesystem_node.get('size_bytes', 0)
if filesystem_node.get('type') == 'folder' and 'children' in filesystem_node:
for child in filesystem_node['children']:
total_size += calculate_current_storage_usage(child)
return total_size
def initialize_user_filesystem_tma(user_data, tma_user_id_str):
if 'filesystem' not in user_data:
user_data['filesystem'] = {
"type": "folder", "id": "root", "name": "root", "children": []
}
user_data.setdefault('storage_limit_gb', DEFAULT_STORAGE_LIMIT_GB)
user_data.setdefault('unlimited_storage', False)
user_data.setdefault('storage_used_bytes', 0)
if 'files' in user_data and isinstance(user_data['files'], list):
total_migrated_size = 0
for old_file in user_data['files']:
file_id = old_file.get('id', uuid.uuid4().hex)
original_filename = old_file.get('filename', 'unknown_file')
name_part, ext_part = os.path.splitext(original_filename)
unique_suffix = uuid.uuid4().hex[:8]
unique_filename = f"{name_part}_{unique_suffix}{ext_part}"
estimated_size = 0
existing_path = old_file.get('path')
if not existing_path:
existing_path = f"cloud_files/{tma_user_id_str}/root/{unique_filename}"
file_node = {
'type': 'file', 'id': file_id, 'original_filename': original_filename,
'unique_filename': unique_filename, 'path': existing_path,
'file_type': get_file_type(original_filename),
'upload_date': old_file.get('upload_date', datetime.now().strftime('%Y-%m-%d %H:%M:%S')),
'size_bytes': estimated_size
}
if add_node(user_data['filesystem'], 'root', file_node):
total_migrated_size += estimated_size
user_data['storage_used_bytes'] += total_migrated_size
del user_data['files']
@cache.memoize(timeout=300)
def load_data():
try:
download_db_from_hf()
with open(DATA_FILE, 'r', encoding='utf-8') as file:
data = json.load(file)
if not isinstance(data, dict):
data = {'users': {}}
data.setdefault('users', {})
for tma_user_id_str, user_data_item in data['users'].items():
user_data_item.setdefault('storage_limit_gb', DEFAULT_STORAGE_LIMIT_GB)
user_data_item.setdefault('unlimited_storage', False)
initialize_user_filesystem_tma(user_data_item, tma_user_id_str)
user_data_item['storage_used_bytes'] = calculate_current_storage_usage(user_data_item['filesystem'])
return data
except Exception as e:
return {'users': {}}
def save_data(data):
try:
with open(DATA_FILE, 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
upload_db_to_hf()
cache.clear()
except Exception as e:
raise
def upload_db_to_hf():
if not HF_TOKEN_WRITE: return
try:
api = HfApi()
api.upload_file(
path_or_fileobj=DATA_FILE, path_in_repo=DATA_FILE, repo_id=REPO_ID,
repo_type="dataset", token=HF_TOKEN_WRITE,
commit_message=f"Backup {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
except Exception as e:
pass
def download_db_from_hf():
if not HF_TOKEN_READ:
if not os.path.exists(DATA_FILE):
with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
return
try:
hf_hub_download(
repo_id=REPO_ID, filename=DATA_FILE, repo_type="dataset",
token=HF_TOKEN_READ, local_dir=".", local_dir_use_symlinks=False
)
except (hf_utils.RepositoryNotFoundError, hf_utils.EntryNotFoundError):
if not os.path.exists(DATA_FILE):
with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
except Exception as e:
if not os.path.exists(DATA_FILE):
with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
def periodic_backup():
while True:
upload_db_to_hf()
time.sleep(1800)
def get_file_type(filename):
filename_lower = filename.lower()
if filename_lower.endswith(('.mp4', '.mov', '.avi', '.webm', '.mkv')): return 'video'
elif filename_lower.endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg')): return 'image'
elif filename_lower.endswith('.pdf'): return 'pdf'
elif filename_lower.endswith('.txt'): return 'text'
return 'other'
def is_admin_tma():
if not ADMIN_TELEGRAM_ID or ADMIN_TELEGRAM_ID == "YOUR_ADMIN_TELEGRAM_USER_ID_HERE":
return False
return 'telegram_user_id' in session and str(session['telegram_user_id']) == str(ADMIN_TELEGRAM_ID)
def admin_browser_login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('admin_browser_logged_in'):
return redirect(url_for('admin_login', next=request.url))
return f(*args, **kwargs)
return decorated_function
def format_bytes(bytes_value):
if bytes_value is None:
return "N/A"
if bytes_value < 1024:
return f"{bytes_value} B"
elif bytes_value < 1024**2:
return f"{bytes_value / 1024:.2f} KB"
elif bytes_value < 1024**3:
return f"{bytes_value / (1024**2):.2f} MB"
else:
return f"{bytes_value / (1024**3):.2f} GB"
TMA_ENTRY_HTML = '''
<!DOCTYPE html><html lang="ru"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Zeus Cloud TMA</title><script src="https://telegram.org/js/telegram-web-app.js"></script>
<style>body { font-family: sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f0f0f0; } .loading { font-size: 1.5em; }</style>
</head><body><div class="loading">Загрузка приложения...</div>
<script>
window.Telegram.WebApp.ready();
const initData = window.Telegram.WebApp.initData;
const initDataUnsafe = window.Telegram.WebApp.initDataUnsafe;
if (initDataUnsafe && initDataUnsafe.user) {
fetch("{{ url_for('auth_via_telegram') }}", {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ user: initDataUnsafe.user, initData: initData })
})
.then(response => response.json())
.then(data => {
if (data.status === 'success') {
window.location.href = data.redirect_url;
} else {
document.body.innerHTML = `<div class="loading">Ошибка авторизации: ${data.message}</div>`;
Telegram.WebApp.showAlert(data.message || "Ошибка авторизации");
}
})
.catch(error => {
document.body.innerHTML = `<div class="loading">Ошибка сети: ${error}</div>`;
Telegram.WebApp.showAlert("Ошибка сети при авторизации.");
});
} else {
document.body.innerHTML = '<div class="loading">Не удалось получить данные пользователя Telegram. Попробуйте перезапустить приложение.</div>';
Telegram.WebApp.showAlert("Не удалось получить данные пользователя Telegram.");
}
</script></body></html>
'''
@app.route('/tma')
def tma_entry_page():
return render_template_string(TMA_ENTRY_HTML)
@app.route('/')
def root_redirect():
return redirect(url_for('tma_entry_page'))
@app.route('/auth_via_telegram', methods=['POST'])
def auth_via_telegram():
try:
payload = request.json
tg_user_data = payload.get('user')
if not tg_user_data or not tg_user_data.get('id'):
return jsonify({'status': 'error', 'message': 'Отсутствуют данные пользователя Telegram.'}), 400
tma_user_id_str = str(tg_user_data['id'])
data = load_data()
if tma_user_id_str not in data['users']:
data['users'][tma_user_id_str] = {
'telegram_id': tg_user_data['id'],
'telegram_username': tg_user_data.get('username'),
'first_name': tg_user_data.get('first_name'),
'last_name': tg_user_data.get('last_name'),
'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'filesystem': {"type": "folder", "id": "root", "name": "root", "children": []}
}
initialize_user_filesystem_tma(data['users'][tma_user_id_str], tma_user_id_str)
try:
save_data(data)
except Exception as e:
return jsonify({'status': 'error', 'message': 'Ошибка сохранения данных нового пользователя.'}), 500
session['telegram_user_id'] = tma_user_id_str
display_name = tg_user_data.get('first_name') or tg_user_data.get('username') or f"User {tma_user_id_str}"
session['telegram_display_name'] = display_name
return jsonify({'status': 'success', 'redirect_url': url_for('tma_dashboard')})
except Exception as e:
return jsonify({'status': 'error', 'message': 'Внутренняя ошибка сервера при авторизации.'}), 500
TMA_DASHBOARD_HTML_TEMPLATE = '''
<!DOCTYPE html><html lang="ru"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Zeus Cloud</title><link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;600;800&display=swap" rel="stylesheet">
<script src="https://telegram.org/js/telegram-web-app.js"></script>
<style>''' + BASE_STYLE + '''</style></head><body class="dark"><div class="container">
<h1>Zeus Cloud</h1><p>Пользователь: {{ display_name }}</p>
<p>
{% if user_data.unlimited_storage %}
Память: Безлимитно (Использовано: {{ storage_used_formatted }})
{% else %}
Память: {{ storage_used_formatted }} из {{ user_data.storage_limit_gb }} ГБ
{% endif %}
</p>
{% if not user_data.unlimited_storage %}
<button type="button" class="btn" style="background: var(--secondary); margin-left: 0; margin-bottom: 10px;" onclick="buyUnlimitedStorage('{{ tma_user_id }}', '{{ bot_username }}')">Купить безлимит за 2000 ⭐️</button>
{% endif %}
{% with messages = get_flashed_messages(with_categories=true) %}{% if messages %}
{% for category, message in messages %}<div class="flash {{ category }}">{{ message }}</div>{% endfor %}
{% endif %}{% endwith %}
<div class="breadcrumbs">
{% for crumb in breadcrumbs %}
{% if crumb.is_link %}<a href="{{ url_for('tma_dashboard', folder_id=crumb.id) }}">{{ crumb.name if crumb.id != 'root' else 'Главная' }}</a>
{% else %}<span>{{ crumb.name if crumb.id != 'root' else 'Главная' }}</span>{% endif %}
{% if not loop.last %}<span>/</span>{% endif %}
{% endfor %}
</div>
<div class="folder-actions">
<form method="POST" action="{{ url_for('create_folder_tma') }}" style="display: contents;">
<input type="hidden" name="parent_folder_id" value="{{ current_folder_id }}">
<input type="text" name="folder_name" placeholder="Имя новой папки" required>
<button type="submit" class="btn folder-btn">Создать папку</button>
</form>
</div>
<form id="upload-form" method="POST" enctype="multipart/form-data" action="{{ url_for('tma_dashboard') }}">
<input type="hidden" name="current_folder_id" value="{{ current_folder_id }}">
<input type="file" name="files" id="file-input" multiple required>
<button type="submit" class="btn" id="upload-btn">Загрузить файлы сюда</button>
</form>
<div id="progress-container"><div id="progress-bar"></div><div id="progress-text">0%</div></div>
<h2>Содержимое папки: {{ current_folder.name if current_folder_id != 'root' else 'Главная' }}</h2>
<div class="file-grid">
{% for item in items %}
<div class="item {{ item.type }}">
{% if item.type == 'folder' %}
<a href="{{ url_for('tma_dashboard', folder_id=item.id) }}" class="item-preview" title="Перейти в папку {{ item.name }}">📁</a>
<p><b>{{ item.name }}</b></p>
<div class="item-actions">
<a href="{{ url_for('tma_dashboard', folder_id=item.id) }}" class="btn folder-btn">Открыть</a>
<form method="POST" action="{{ url_for('delete_folder_tma', folder_id=item.id) }}" style="display: inline;" onsubmit="return confirm('Удалить папку {{ item.name }}? Папку можно удалить только если она пуста.');">
<input type="hidden" name="current_view_folder_id" value="{{ current_folder_id }}">
<button type="submit" class="btn delete-btn">Удалить</button>
</form>
</div>
{% elif item.type == 'file' %}
{% set previewable = item.file_type in ['image', 'video', 'pdf', 'text'] %}
{% if item.file_type == 'image' %}<img class="item-preview" src="{{ hf_file_url_jinja(item.path) }}" alt="{{ item.original_filename }}" loading="lazy" onclick="openModal('{{ hf_file_url_jinja(item.path) }}', '{{ item.file_type }}', '{{ item.id }}')">
{% elif item.file_type == 'video' %}<video class="item-preview" preload="metadata" muted loading="lazy" onclick="openModal('{{ hf_file_url_jinja(item.path, True) }}#t=0.5" type="video/mp4"></video>
{% elif item.file_type == 'pdf' %}<div class="item-preview" style="font-size: 60px; line-height: 130px; color: var(--accent); cursor: pointer;" onclick="openModal('{{ hf_file_url_jinja(item.path, True) }}', '{{ item.file_type }}', '{{ item.id }}')">📄</div>
{% elif item.file_type == 'text' %}<div class="item-preview" style="font-size: 60px; line-height: 130px; color: var(--secondary); cursor: pointer;" onclick="openModal('{{ url_for('get_text_content_tma', file_id=item.id) }}', '{{ item.file_type }}', '{{ item.id }}')">📝</div>
{% else %}<div class="item-preview" style="font-size: 60px; line-height: 130px; color: #aaa;">❓</div>{% endif %}
<p title="{{ item.original_filename }}">{{ item.original_filename | truncate(25, True) }}</p>
<p style="font-size: 0.8em; color: #888;">{{ item.upload_date }}</p>
<div class="item-actions">
<button type="button" class="btn download-btn" onclick="tmaDownloadFile('{{ url_for('download_tma', file_id=item.id, _external=True) }}', '{{ item.original_filename }}')">Скачать</button>
{% if previewable %}<button type="button" class="btn" style="background: var(--accent);" onclick="openModal('{{ hf_file_url_jinja(item.path) if item.file_type != 'text' else url_for('get_text_content_tma', file_id=item.id) }}', '{{ item.file_type }}', '{{ item.id }}')">Просмотр</button>{% endif %}
<form method="POST" action="{{ url_for('delete_file_tma', file_id=item.id) }}" style="display: inline;" onsubmit="return confirm('Вы уверены, что хотите удалить файл {{ item.original_filename }}?');">
<input type="hidden" name="current_view_folder_id" value="{{ current_folder_id }}"><button type="submit" class="btn delete-btn">Удалить</button>
</form>
</div>
{% endif %}
</div>
{% endfor %}
{% if not items %} <p>Эта папка пуста.</p> {% endif %}
</div>
<a href="{{ url_for('tma_logout') }}" class="btn" style="margin-top: 20px;" id="logout-btn">Выйти (очистить сессию)</a>
{% if is_tma_user_admin_flag %}
<a href="{{ url_for('admin_panel') }}" class="btn" style="margin-top: 20px; background-color: var(--accent);">Админ-панель</a>
{% endif %}
</div>
<div class="modal" id="mediaModal" onclick="closeModal(event)"><div class="modal-content" id="modalContentContainer">
<span onclick="closeModalManual()" class="modal-close-btn">×</span><div id="modalContent"></div></div></div>
<script>
window.Telegram.WebApp.ready();
document.body.classList.add('dark');
const repoId = "{{ repo_id_js }}";
const hfTokenRead = "{{ HF_TOKEN_READ_js or '' }}";
function hfFileUrl(path, download = false) {
let url = `https://huggingface.co/datasets/${repoId}/resolve/main/${path}`;
if (download) url += '?download=true'; return url;
}
async function openModal(srcOrUrl, type, itemId) {
const modal = document.getElementById('mediaModal'); const modalContent = document.getElementById('modalContent');
modalContent.innerHTML = '<p>Загрузка...</p>'; modal.style.display = 'flex';
try {
if (type === 'image') modalContent.innerHTML = `<img src="${srcOrUrl}" alt="Просмотр изображения">`;
else if (type === 'video') modalContent.innerHTML = `<video controls autoplay style='max-width: 95%; max-height: 85vh;'><source src="${srcOrUrl}" type="video/mp4">Ваш браузер не поддерживает видео.</video>`;
else if (type === 'pdf') modalContent.innerHTML = `<iframe src="${srcOrUrl}" title="Просмотр PDF"></iframe>`;
else if (type === 'text') {
const response = await fetch(srcOrUrl); if (!response.ok) throw new Error(`Ошибка загрузки текста: ${response.statusText}`);
const text = await response.text();
const escapedText = text.replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">");
modalContent.innerHTML = `<pre>${escapedText}</pre>`;
} else modalContent.innerHTML = '<p>Предпросмотр для этого типа файла не поддерживается.</p>';
} catch (error) { modalContent.innerHTML = `<p>Не удалось загрузить содержимое для предпросмотра. ${error.message}</p>`; }
}
function closeModal(event) { if (event.target === document.getElementById('mediaModal')) closeModalManual(); }
function closeModalManual() {
const modal = document.getElementById('mediaModal'); modal.style.display = 'none';
const video = modal.querySelector('video'); if (video) video.pause();
const iframe = modal.querySelector('iframe'); if (iframe) iframe.src = 'about:blank';
document.getElementById('modalContent').innerHTML = '';
}
function tmaDownloadFile(downloadUrl, filename) {
if (window.Telegram && window.Telegram.WebApp && Telegram.WebApp.openLink) {
Telegram.WebApp.openLink(downloadUrl);
} else {
const link = document.createElement('a');
link.href = downloadUrl;
link.setAttribute('download', filename);
document.body.appendChild(link);
link.click();
document.body.removeChild(link);
}
}
const form = document.getElementById('upload-form'); const fileInput = document.getElementById('file-input');
const progressBar = document.getElementById('progress-bar'); const progressText = document.getElementById('progress-text');
const progressContainer = document.getElementById('progress-container'); const uploadBtn = document.getElementById('upload-btn');
if (form) {
form.addEventListener('submit', function(e) {
e.preventDefault(); const files = fileInput.files;
if (files.length === 0) { Telegram.WebApp.showAlert('Пожалуйста, выберите файлы для загрузки.'); return; }
if (files.length > 20) { Telegram.WebApp.showAlert('Максимум 20 файлов за раз!'); return; }
progressContainer.style.display = 'block'; progressBar.style.width = '0%'; progressText.textContent = '0%';
uploadBtn.disabled = true; uploadBtn.textContent = 'Загрузка...';
const formData = new FormData(form); const xhr = new XMLHttpRequest();
xhr.upload.addEventListener('progress', function(event) {
if (event.lengthComputable) {
const percentComplete = Math.round((event.loaded / event.total) * 100);
progressBar.style.width = percentComplete + '%'; progressText.textContent = percentComplete + '%';
}
});
xhr.addEventListener('load', function() { uploadBtn.disabled = false; uploadBtn.textContent = 'Загрузить файлы сюда'; progressContainer.style.display = 'none'; window.location.reload(); });
xhr.addEventListener('error', function() { Telegram.WebApp.showAlert('Произошла ошибка во время загрузки.'); uploadBtn.disabled = false; uploadBtn.textContent = 'Загрузить файлы сюда'; progressContainer.style.display = 'none'; });
xhr.addEventListener('abort', function() { Telegram.WebApp.showAlert('Загрузка отменена.'); uploadBtn.disabled = false; uploadBtn.textContent = 'Загрузить файлы сюда'; progressContainer.style.display = 'none'; });
xhr.open('POST', form.action, true); xhr.send(formData);
});
}
document.getElementById('logout-btn').addEventListener('click', function(e) { e.preventDefault(); window.location.href = "{{ url_for('tma_logout') }}"; });
function buyUnlimitedStorage(userId, botUsername) {
if (!Telegram.WebApp.openLink) {
Telegram.WebApp.showAlert('Telegram Web App Link opening is not available.');
return;
}
const deepLink = `https://t.me/${botUsername}?start=buy_unlimited_stars_${userId}`;
Telegram.WebApp.openLink(deepLink);
Telegram.WebApp.showAlert('Вас перенаправит в чат с ботом для оплаты. После оплаты свяжитесь с администратором для активации безлимитного хранилища.');
}
</script></body></html>
'''
@app.route('/tma_dashboard', methods=['GET', 'POST'])
def tma_dashboard():
if 'telegram_user_id' not in session:
flash('Пожалуйста, авторизуйтесь через Telegram.', 'error')
return redirect(url_for('tma_entry_page'))
tma_user_id = session['telegram_user_id']
display_name = session.get('telegram_display_name', 'Пользователь')
data = load_data()
if tma_user_id not in data['users']:
session.clear()
flash('Пользователь не найден в системе. Пожалуйста, перезапустите приложение.', 'error')
return redirect(url_for('tma_entry_page'))
user_data = data['users'][tma_user_id]
initialize_user_filesystem_tma(user_data, tma_user_id)
current_folder_id = request.args.get('folder_id', 'root')
current_folder, _ = find_node_by_id(user_data['filesystem'], current_folder_id)
if not current_folder or current_folder.get('type') != 'folder':
flash('Папка не найдена!', 'error')
current_folder_id = 'root'
current_folder, _ = find_node_by_id(user_data['filesystem'], current_folder_id)
if not current_folder:
flash('Критическая ошибка: корневая папка не найдена.', 'error')
session.clear()
return redirect(url_for('tma_entry_page'))
items_in_folder = sorted(current_folder.get('children', []), key=lambda x: (x['type'] != 'folder', x.get('name', x.get('original_filename', '')).lower()))
if request.method == 'POST':
if not HF_TOKEN_WRITE:
flash('Загрузка невозможна: токен для записи не настроен.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_folder_id))
files = request.files.getlist('files')
if not files or all(not f.filename for f in files):
flash('Файлы для загрузки не выбраны.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_folder_id))
if len(files) > 20:
flash('Максимум 20 файлов за раз!', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_folder_id))
target_folder_id = request.form.get('current_folder_id', 'root')
target_folder_node, _ = find_node_by_id(user_data['filesystem'], target_folder_id)
if not target_folder_node or target_folder_node.get('type') != 'folder':
flash('Целевая папка для загрузки не найдена!', 'error')
return redirect(url_for('tma_dashboard'))
if not user_data['unlimited_storage']:
total_new_files_size = 0
for file_obj in files:
if file_obj and file_obj.filename:
file_obj.seek(0, os.SEEK_END)
total_new_files_size += file_obj.tell()
file_obj.seek(0)
if user_data['storage_used_bytes'] + total_new_files_size > user_data['storage_limit_gb'] * BYTES_IN_GB:
flash(f'Недостаточно места! Ваш лимит {user_data["storage_limit_gb"]} ГБ, использовано {format_bytes(user_data["storage_used_bytes"])}. '
f'Эти файлы слишком большие ({format_bytes(total_new_files_size)}).', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_folder_id))
api = HfApi()
uploaded_count = 0
errors_list = []
for file_obj in files:
if file_obj and file_obj.filename:
original_filename = secure_filename(file_obj.filename)
name_part, ext_part = os.path.splitext(original_filename)
unique_suffix = uuid.uuid4().hex[:8]
unique_filename = f"{name_part}_{unique_suffix}{ext_part}"
file_id = uuid.uuid4().hex
hf_path = f"cloud_files/{tma_user_id}/{target_folder_id}/{unique_filename}"
temp_path = os.path.join(UPLOAD_FOLDER, f"{file_id}_{unique_filename}")
file_obj.save(temp_path)
file_size = os.path.getsize(temp_path)
try:
api.upload_file(
path_or_fileobj=temp_path, path_in_repo=hf_path, repo_id=REPO_ID,
repo_type="dataset", token=HF_TOKEN_WRITE,
commit_message=f"UserTMA {tma_user_id} uploaded {original_filename} to folder {target_folder_id}"
)
file_info = {
'type': 'file', 'id': file_id, 'original_filename': original_filename,
'unique_filename': unique_filename, 'path': hf_path,
'file_type': get_file_type(original_filename),
'upload_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'size_bytes': file_size
}
if add_node(user_data['filesystem'], target_folder_id, file_info):
user_data['storage_used_bytes'] += file_size
uploaded_count += 1
else:
errors_list.append(f"Ошибка добавления метаданных для {original_filename}.")
try: api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
except Exception as del_err: pass
except Exception as e:
errors_list.append(f"Ошибка загрузки файла {original_filename}: {e}")
finally:
if os.path.exists(temp_path): os.remove(temp_path)
if uploaded_count > 0:
try:
save_data(data)
flash(f'{uploaded_count} файл(ов) успешно загружено!')
except Exception as e:
flash('Файлы загружены, но ошибка сохранения метаданных.', 'error')
if errors_list:
for error_msg in errors_list: flash(error_msg, 'error')
return redirect(url_for('tma_dashboard', folder_id=target_folder_id))
breadcrumbs = []
temp_id = current_folder_id
while temp_id:
node, parent_node_bc = find_node_by_id(user_data['filesystem'], temp_id)
if not node: break
is_link = (node['id'] != current_folder_id)
breadcrumbs.append({'id': node['id'], 'name': node.get('name', 'Root'), 'is_link': is_link})
if not parent_node_bc: break
temp_id = parent_node_bc.get('id')
breadcrumbs.reverse()
storage_used_formatted = format_bytes(user_data['storage_used_bytes'])
return render_template_string(TMA_DASHBOARD_HTML_TEMPLATE,
display_name=display_name, items=items_in_folder,
current_folder_id=current_folder_id, current_folder=current_folder,
breadcrumbs=breadcrumbs, repo_id_js=REPO_ID, HF_TOKEN_READ_js=HF_TOKEN_READ,
hf_file_url_jinja=lambda path, download=False: f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{path}{'?download=true' if download else ''}",
is_tma_user_admin_flag=is_admin_tma(),
user_data=user_data, storage_used_formatted=storage_used_formatted,
tma_user_id=tma_user_id, bot_username=BOT_USERNAME)
@app.route('/tma_buy_unlimited', methods=['POST'])
def tma_buy_unlimited():
if 'telegram_user_id' not in session:
flash('Пожалуйста, авторизуйтесь.', 'error')
return redirect(url_for('tma_entry_page'))
tma_user_id = session['telegram_user_id']
data = load_data()
user_data = data['users'].get(tma_user_id)
if not user_data:
session.clear()
flash('Пользователь не найден.', 'error')
return redirect(url_for('tma_entry_page'))
flash('Для получения безлимитного хранилища оплатите 2000 Telegram Stars через бота. После оплаты свяжитесь с администратором для активации.', 'info')
return redirect(url_for('tma_dashboard'))
@app.route('/create_folder_tma', methods=['POST'])
def create_folder_tma():
if 'telegram_user_id' not in session:
flash('Не авторизован', 'error')
return redirect(url_for('tma_entry_page'))
tma_user_id = session['telegram_user_id']
data = load_data()
user_data = data['users'].get(tma_user_id)
if not user_data:
flash('Пользователь не найден', 'error')
return redirect(url_for('tma_entry_page'))
parent_folder_id = request.form.get('parent_folder_id', 'root')
folder_name = request.form.get('folder_name', '').strip()
if not folder_name:
flash('Имя папки не может быть пустым!', 'error')
return redirect(url_for('tma_dashboard', folder_id=parent_folder_id))
folder_id = uuid.uuid4().hex
folder_data = {'type': 'folder', 'id': folder_id, 'name': folder_name, 'children': []}
if add_node(user_data['filesystem'], parent_folder_id, folder_data):
try:
save_data(data)
flash(f'Папка "{folder_name}" успешно создана.')
except Exception as e: flash('Ошибка сохранения данных при создании папки.', 'error')
else:
flash('Не удалось найти родительскую папку.', 'error')
return redirect(url_for('tma_dashboard', folder_id=parent_folder_id))
@app.route('/download_tma/<file_id>')
def download_tma(file_id):
current_tma_user_id = session.get('telegram_user_id')
is_browser_admin_session = session.get('admin_browser_logged_in', False)
data = load_data()
file_node = None
if is_browser_admin_session:
for uid_str_iter, udata_iter in data.get('users', {}).items():
node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file':
file_node = node
break
elif current_tma_user_id:
user_data = data['users'].get(current_tma_user_id)
if user_data:
file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
if not file_node and is_admin_tma():
for uid_str_iter, udata_iter in data.get('users', {}).items():
node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file':
file_node = node
break
else:
flash('Пожалуйста, авторизуйтесь.', 'error')
if request.referrer:
return redirect(url_for('tma_entry_page'))
else:
return Response("Unauthorized", status=401, mimetype='text/plain')
redirect_url_fallback = url_for('tma_dashboard')
if is_browser_admin_session:
redirect_url_fallback = url_for('admin_panel')
redirect_url = request.referrer or redirect_url_fallback
if not file_node or file_node.get('type') != 'file':
flash('Файл не найден или доступ запрещен!', 'error')
return redirect(redirect_url)
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', 'downloaded_file')
if not hf_path:
flash('Ошибка: Путь к файлу не найден.', 'error')
return redirect(redirect_url)
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
try:
req_headers = {}
if HF_TOKEN_READ: req_headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
r = requests.get(file_url, headers=req_headers, stream=True)
r.raise_for_status()
def generate_chunks():
for chunk in r.iter_content(chunk_size=8192):
yield chunk
mimetype = 'application/octet-stream'
if '.' in original_filename:
ext = original_filename.rsplit('.', 1)[1].lower()
content_types = {
'pdf': 'application/pdf', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg',
'png': 'image/png', 'gif': 'image/gif', 'txt': 'text/plain',
'mp4': 'video/mp4', 'mov': 'video/quicktime', 'avi': 'video/x-msvideo',
'zip': 'application/zip', 'rar': 'application/x-rar-compressed',
}
mimetype = content_types.get(ext, 'application/octet-stream')
resp = Response(generate_chunks(), mimetype=mimetype)
resp.headers['Content-Disposition'] = f'attachment; filename="{original_filename}"'
resp.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
resp.headers['Pragma'] = 'no-cache'
resp.headers['Expires'] = '0'
return resp
except requests.exceptions.RequestException as e:
flash(f'Ошибка скачивания файла: {e}', 'error')
except Exception as e:
flash(f'Внутренняя ошибка при скачивании: {e}', 'error')
return redirect(redirect_url)
@app.route('/delete_file_tma/<file_id>', methods=['POST'])
def delete_file_tma():
if 'telegram_user_id' not in session:
flash('Пожалуйста, авторизуйтесь.', 'error')
return redirect(url_for('tma_entry_page'))
file_id = request.view_args.get('file_id')
tma_user_id = session['telegram_user_id']
data = load_data()
user_data = data['users'].get(tma_user_id)
if not user_data:
session.clear(); flash('Пользователь не найден.', 'error'); return redirect(url_for('tma_entry_page'))
file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
current_view_folder_id = request.form.get('current_view_folder_id', 'root')
if not file_node or file_node.get('type') != 'file':
flash('Файл не найден.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', 'файл')
file_size_to_deduct = file_node.get('size_bytes', 0)
if not hf_path:
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Метаданные файла {original_filename} удалены (путь отсутствовал).')
except Exception as e: flash('Ошибка сохранения данных после удаления метаданных.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
if not HF_TOKEN_WRITE:
flash('Удаление невозможно: токен для записи не настроен.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
try:
api = HfApi()
api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Файл {original_filename} успешно удален!')
except Exception as e: flash('Файл удален с сервера, но ошибка обновления базы.', 'error')
else: flash('Файл удален с сервера, но не найден в базе для удаления.', 'error')
except hf_utils.EntryNotFoundError:
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Файл {original_filename} не найден на сервере, удален из базы.')
except Exception as e: flash('Ошибка сохранения (файл не на сервере).', 'error')
except Exception as e:
flash(f'Ошибка удаления файла {original_filename}: {e}', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
@app.route('/delete_folder_tma/<folder_id>', methods=['POST'])
def delete_folder_tma(folder_id):
if 'telegram_user_id' not in session:
flash('Пожалуйста, авторизуйтесь.', 'error'); return redirect(url_for('tma_entry_page'))
if folder_id == 'root':
flash('Нельзя удалить корневую папку!', 'error'); return redirect(url_for('tma_dashboard'))
tma_user_id = session['telegram_user_id']
data = load_data()
user_data = data['users'].get(tma_user_id)
if not user_data:
session.clear(); flash('Пользователь не найден.', 'error'); return redirect(url_for('tma_entry_page'))
folder_node, parent_node = find_node_by_id(user_data['filesystem'], folder_id)
current_view_folder_id = request.form.get('current_view_folder_id', 'root')
if not folder_node or folder_node.get('type') != 'folder' or not parent_node:
flash('Папка не найдена.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
if folder_node.get('children'):
flash(f'Папку "{folder_node.get("name")}" можно удалить только если она пуста.', 'error')
return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
if remove_node(user_data['filesystem'], folder_id):
try: save_data(data); flash(f'Папка "{folder_node.get("name")}" удалена.')
except Exception as e: flash('Ошибка сохранения после удаления папки.', 'error')
else: flash('Не удалось удалить папку.', 'error')
return redirect(url_for('tma_dashboard', folder_id=parent_node.get('id', 'root')))
@app.route('/get_text_content_tma/<file_id>')
def get_text_content_tma(file_id):
current_tma_user_id = session.get('telegram_user_id')
is_browser_admin_session = session.get('admin_browser_logged_in', False)
data = load_data()
file_node = None
if is_browser_admin_session:
for uid_str, udata_iter in data.get('users', {}).items():
node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file' and node.get('file_type') == 'text':
file_node = node; break
elif current_tma_user_id:
user_data = data['users'].get(current_tma_user_id)
if user_data:
file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
if not file_node and is_admin_tma():
for uid_str, udata_iter in data.get('users', {}).items():
node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file' and node.get('file_type') == 'text':
file_node = node; break
else:
return Response("Не авторизован", status=401)
if not file_node or file_node.get('type') != 'file' or file_node.get('file_type') != 'text':
return Response("Текстовый файл не найден", status=404)
hf_path = file_node.get('path')
if not hf_path: return Response("Ошибка: путь к файлу отсутствует", status=500)
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
try:
req_headers = {};
if HF_TOKEN_READ: req_headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
response = requests.get(file_url, headers=req_headers)
response.raise_for_status()
if len(response.content) > 1 * 1024 * 1024: return Response("Файл слишком большой.", status=413)
try: text_content = response.content.decode('utf-8')
except UnicodeDecodeError: text_content = response.content.decode('latin-1', errors='ignore')
return Response(text_content, mimetype='text/plain')
except Exception as e: return Response(f"Ошибка загрузки: {e}", status=502)
@app.route('/tma_logout')
def tma_logout():
session.pop('telegram_user_id', None)
session.pop('telegram_display_name', None)
flash('Вы вышли из сессии приложения.')
return redirect(url_for('tma_entry_page'))
ADMIN_LOGIN_HTML_TEMPLATE = '''
<!DOCTYPE html><html lang="ru"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Admin Login</title><link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;600;800&display=swap" rel="stylesheet">
<style>
{{ admin_base_style_css }}
body { display: flex; justify-content: center; align-items: center; height: 100vh; background: var(--background-dark); }
.login-container { text-align: center; padding: 40px; background: var(--card-bg-dark); border-radius: 20px; box-shadow: var(--shadow); width: 100%; max-width: 400px; }
h1 { color: var(--primary); margin-bottom: 20px; }
input[type="text"], input[type="password"] { background: rgba(255,255,255,0.1); color: var(--text-dark); margin-bottom: 20px; }
.btn { background: var(--accent); width: 100%; }
</style>
</head><body class="dark">
<div class="login-container">
<h1>Admin Login</h1>
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="flash {{ category }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
<form method="POST" action="{{ url_for('admin_login') }}">
<input type="hidden" name="next" value="{{ request.args.get('next', '') }}">
<input type="text" name="username" placeholder="Username" required>
<input type="password" name="password" placeholder="Password" required>
<button type="submit" class="btn">Login</button>
</form>
</div>
</body></html>
'''
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
next_url = request.form.get('next')
if username == ADMIN_USERNAME and password == ADMIN_PASSWORD:
session['admin_browser_logged_in'] = True
flash('Успешный вход в админ-панель.', 'success')
if next_url:
return redirect(next_url)
return redirect(url_for('admin_panel'))
else:
flash('Неверное имя пользователя или пароль.', 'error')
return render_template_string(ADMIN_LOGIN_HTML_TEMPLATE, admin_base_style_css=BASE_STYLE)
@app.route('/admin/logout')
def admin_logout():
session.pop('admin_browser_logged_in', None)
flash('Вы вышли из админ-панели.', 'success')
return redirect(url_for('admin_login'))
ADMIN_PANEL_HTML_TEMPLATE = '''
<!DOCTYPE html><html lang="ru"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Админ-панель</title><link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;600;800&display=swap" rel="stylesheet">
<style>''' + BASE_STYLE + '''</style></head><body class="dark"><div class="container"><h1>Админ-панель</h1>
<a href="{{ url_for('admin_logout') }}" class="btn" style="margin-bottom:20px; background-color: var(--accent);">Выйти из админ-панели</a>
<a href="{{ url_for('tma_dashboard') }}" class="btn" style="margin-bottom:20px;">В приложение (если TMA сессия есть)</a>
{% with messages = get_flashed_messages(with_categories=true) %}{% if messages %}{% for category, message in messages %}<div class="flash {{ category }}">{{ message }}</div>{% endfor %}{% endif %}{% endwith %}
<h2>Пользователи</h2><div class="user-list">
{% for user in user_details %}
<div class="user-item">
<a href="{{ url_for('admin_user_files', tma_user_id_str=user.id_key) }}">{{ user.display_name }} (ID: {{user.id_key}})</a>
<p>Зарегистрирован: {{ user.created_at }}</p><p>Файлов: {{ user.file_count }}</p>
<p>Память: {% if user.unlimited_storage %}Безлимитно ({{ user.storage_used_formatted }}){% else %}{{ user.storage_used_formatted }} из {{ user.storage_limit_gb }} ГБ{% endif %}</p>
<form method="POST" action="{{ url_for('admin_delete_user', tma_user_id_str=user.id_key) }}" style="display: inline; margin-left: 10px;" onsubmit="return confirm('УДАЛИТЬ пользователя {{ user.display_name }} и ВСЕ его файлы? НЕОБРАТИМО!');">
<button type="submit" class="btn delete-btn" style="padding: 5px 10px; font-size: 0.9em;">Удалить</button>
</form>
</div>
{% else %}<p>Пользователей нет.</p>{% endfor %}</div></div></body></html>'''
@app.route('/admhosto')
@admin_browser_login_required
def admin_panel():
data = load_data()
users = data.get('users', {})
user_details = []
for tma_id_str, udata in users.items():
user_details.append({
'id_key': tma_id_str,
'display_name': udata.get('first_name', udata.get('telegram_username', f"User {tma_id_str}")),
'created_at': udata.get('created_at', 'N/A'),
'file_count': sum(1 for _, node in get_all_nodes(udata.get('filesystem', {})) if node.get('type') == 'file'),
'storage_limit_gb': udata.get('storage_limit_gb', DEFAULT_STORAGE_LIMIT_GB),
'storage_used_formatted': format_bytes(udata.get('storage_used_bytes', 0)),
'unlimited_storage': udata.get('unlimited_storage', False)
})
user_details.sort(key=lambda x: x.get('created_at', ''), reverse=True)
return render_template_string(ADMIN_PANEL_HTML_TEMPLATE, user_details=user_details)
def get_all_nodes(filesystem):
queue = [filesystem]
while queue:
current_node = queue.pop(0)
yield current_node['id'], current_node
if current_node.get('type') == 'folder' and 'children' in current_node:
for child in current_node['children']:
queue.append(child)
ADMIN_USER_FILES_HTML_TEMPLATE = '''
<!DOCTYPE html><html lang="ru"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Файлы {{ display_name_admin_view }}</title><link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;600;800&display=swap" rel="stylesheet">
<style>''' + BASE_STYLE + '''
.file-item { background: var(--card-bg-dark); padding: 15px; border-radius: 16px; box-shadow: var(--shadow); transition: var(--transition); display: flex; flex-direction: column; justify-content: space-between; }
.file-preview { max-width: 100%; height: 100px; object-fit: contain; border-radius: 10px; margin-bottom: 10px; display: block; margin-left: auto; margin-right: auto; }
.admin-file-actions { margin-top: 10px; display: flex; flex-wrap: wrap; gap: 5px; justify-content: center; }
.admin-file-actions .btn { font-size: 0.8em; padding: 4px 8px; margin: 0; }
</style></head><body class="dark"><div class="container"><h1>Файлы пользователя: {{ display_name_admin_view }} (ID: {{ tma_user_id_str_admin_view }})</h1>
<a href="{{ url_for('admin_panel') }}" class="btn" style="margin-bottom: 20px;">Назад к пользователям</a>
<p>Лимит памяти:
{% if user_data_admin_view.unlimited_storage %}Безлимитно (Использовано: {{ storage_used_formatted_admin_view }})
{% else %}{{ storage_used_formatted_admin_view }} из {{ user_data_admin_view.storage_limit_gb }} ГБ
{% endif %}
</p>
<form method="POST" action="{{ url_for('admin_toggle_unlimited_storage', tma_user_id_str=tma_user_id_str_admin_view) }}" style="display: inline-block;">
<button type="submit" class="btn" style="background: var(--accent);">
{{ 'Отключить безлимит' if user_data_admin_view.unlimited_storage else 'Включить безлимит' }}
</button>
</form>
{% with messages = get_flashed_messages(with_categories=true) %}{% if messages %}{% for category, message in messages %}<div class="flash {{ category }}">{{ message }}</div>{% endfor %}{% endif %}{% endwith %}
<div style="display: grid; grid-template-columns: repeat(auto-fill, minmax(220px, 1fr)); gap: 20px;">
{% for file_item in files %}
<div class="file-item"><div>
{% if file_item.file_type == 'image' %} <img class="file-preview" src="{{ hf_file_url_jinja(file_item.path) }}" loading="lazy" onclick="openModalAdmin('{{ hf_file_url_jinja(file_item.path) }}', '{{ file_item.file_type }}', '{{ file_item.id }}')" style="cursor: pointer;">
{% elif file_item.file_type == 'video' %} <video class="file-preview" preload="metadata" muted onclick="openModalAdmin('{{ hf_file_url_jinja(file_item.path) }}', '{{ file_item.file_type }}', '{{ file_item.id }}')" style="cursor: pointer;"><source src="{{ hf_file_url_jinja(file_item.path, True) }}#t=0.5"></video>
{% elif file_item.file_type == 'pdf' %} <div class="file-preview" style="font-size: 40px; line-height: 100px; text-align: center; color: var(--accent); cursor: pointer;" onclick="openModalAdmin('{{ hf_file_url_jinja(file_item.path, True) }}', '{{ file_item.file_type }}', '{{ file_item.id }}')">📄</div>
{% elif file_item.file_type == 'text' %} <div class="file-preview" style="font-size: 40px; line-height: 100px; text-align: center; color: var(--secondary); cursor: pointer;" onclick="openModalAdmin('{{ url_for('get_text_content_tma', file_id=file_item.id) }}', '{{ file_item.file_type }}', '{{ file_item.id }}')">📝</div>
{% else %} <div class="file-preview" style="font-size: 40px; line-height: 100px; text-align: center; color: #aaa;">❓</div> {% endif %}
<p title="{{ file_item.original_filename }}"><b>{{ file_item.original_filename | truncate(30) }}</b></p>
<p style="font-size: 0.8em; color: #888;">В папке: {{ file_item.parent_path_str }}</p>
<p style="font-size: 0.8em; color: #888;">Загружен: {{ file_item.upload_date }}</p>
<p style="font-size: 0.8em; color: #888;">Размер: {{ file_item.size_formatted }}</p>
<p style="font-size: 0.7em; color: #ccc;">ID: {{ file_item.id }}</p>
<p style="font-size: 0.7em; color: #ccc; word-break: break-all;">Path: {{ file_item.path }}</p>
</div><div class="admin-file-actions">
<a href="{{ url_for('download_tma', file_id=file_item.id) }}" class="btn download-btn" download="{{ file_item.original_filename }}">Скачать</a>
{% set previewable = file_item.file_type in ['image', 'video', 'pdf', 'text'] %}
{% if previewable %}<button type="button" class="btn" style="background: var(--accent);" onclick="openModalAdmin('{{ hf_file_url_jinja(file_item.path) if file_item.file_type != 'text' else url_for('get_text_content_tma', file_id=file_item.id) }}', '{{ file_item.file_type }}', '{{ file_item.id }}')">Просмотр</button>{% endif %}
<form method="POST" action="{{ url_for('admin_delete_file', tma_user_id_str_form=tma_user_id_str_admin_view, file_id=file_item.id) }}" style="display: inline-block;" onsubmit="return confirm('Удалить файл {{ file_item.original_filename }}?');">
<button type="submit" class="btn delete-btn">Удалить</button></form>
</div></div>{% else %} <p>У пользователя нет файлов.</p> {% endfor %}</div></div>
<div class="modal" id="mediaModalAdmin" onclick="closeModalAdminEv(event)"><div class="modal-content" id="modalContentContainerAdmin">
<span onclick="closeModalAdminManual()" class="modal-close-btn">×</span><div id="modalContentAdmin"></div></div></div>
<script>
const repoIdJs = "{{ repo_id_js_admin }}";
function hfFileUrlAdmin(path, download = false) { let url = `https://huggingface.co/datasets/${repoIdJs}/resolve/main/${path}`; if (download) url += '?download=true'; return url; }
async function openModalAdmin(srcOrUrl, type, itemId) {
const modal = document.getElementById('mediaModalAdmin'); const modalContent = document.getElementById('modalContentAdmin');
modalContent.innerHTML = '<p>Загрузка...</p>'; modal.style.display = 'flex';
try {
if (type === 'image') modalContent.innerHTML = `<img src="${srcOrUrl}" alt="Просмотр изображения">`;
else if (type === 'video') modalContent.innerHTML = `<video controls autoplay style='max-width: 95%; max-height: 85vh;'><source src="${srcOrUrl}" type="video/mp4"></video>`;
else if (type === 'pdf') modalContent.innerHTML = `<iframe src="${srcOrUrl}" title="Просмотр PDF"></iframe>`;
else if (type === 'text') { const response = await fetch(srcOrUrl); if (!response.ok) throw new Error('Network response was not ok for text file.'); const text = await response.text(); const esc = text.replace(/&/g, "&").replace(/</g, "<").replace(/>/g, ">"); modalContent.innerHTML = `<pre>${esc}</pre>`;}
else modalContent.innerHTML = '<p>Предпросмотр не поддерживается.</p>';
} catch (error) { modalContent.innerHTML = `<p>Не удалось загрузить содержимое для предпросмотра. ${error.message}</p>`;}
}
function closeModalAdminEv(event) { if (event.target === document.getElementById('mediaModalAdmin')) closeModalAdminManual(); }
function closeModalAdminManual() {
const modal = document.getElementById('mediaModalAdmin'); modal.style.display = 'none';
const video = modal.querySelector('video'); if (video) video.pause();
const iframe = modal.querySelector('iframe'); if (iframe) iframe.src = 'about:blank';
document.getElementById('modalContentAdmin').innerHTML = '';
}
</script></body></html>'''
@app.route('/admhosto/user/<tma_user_id_str>')
@admin_browser_login_required
def admin_user_files(tma_user_id_str):
data = load_data()
user_data = data.get('users', {}).get(tma_user_id_str)
if not user_data:
flash(f'Пользователь ID {tma_user_id_str} не найден.', 'error'); return redirect(url_for('admin_panel'))
all_files = []
def collect_files_admin(folder, current_path_id='root'):
parent_path_str = get_node_path_string(user_data['filesystem'], current_path_id)
for item in folder.get('children', []):
if item.get('type') == 'file':
item['parent_path_str'] = parent_path_str
item['size_formatted'] = format_bytes(item.get('size_bytes'))
all_files.append(item)
elif item.get('type') == 'folder': collect_files_admin(item, item.get('id'))
collect_files_admin(user_data.get('filesystem', {}))
all_files.sort(key=lambda x: x.get('upload_date', ''), reverse=True)
display_name_admin_view = user_data.get('first_name', user_data.get('telegram_username', f"User {tma_user_id_str}"))
storage_used_formatted_admin_view = format_bytes(user_data['storage_used_bytes'])
return render_template_string(ADMIN_USER_FILES_HTML_TEMPLATE,
tma_user_id_str_admin_view=tma_user_id_str, display_name_admin_view=display_name_admin_view, files=all_files,
repo_id_js_admin=REPO_ID,
hf_file_url_jinja=lambda path, download=False: f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{path}{'?download=true' if download else ''}",
user_data_admin_view=user_data, storage_used_formatted_admin_view=storage_used_formatted_admin_view)
@app.route('/admhosto/toggle_unlimited_storage/<tma_user_id_str>', methods=['POST'])
@admin_browser_login_required
def admin_toggle_unlimited_storage(tma_user_id_str):
data = load_data()
user_data = data.get('users', {}).get(tma_user_id_str)
if not user_data:
flash(f'Пользователь {tma_user_id_str} не найден.', 'error')
return redirect(url_for('admin_panel'))
user_data['unlimited_storage'] = not user_data.get('unlimited_storage', False)
try:
save_data(data)
status = "включено" if user_data['unlimited_storage'] else "отключено"
flash(f'Безлимитное хранилище для пользователя {tma_user_id_str} {status}.', 'success')
except Exception as e:
flash(f'Ошибка при изменении статуса безлимитного хранилища: {e}', 'error')
return redirect(url_for('admin_user_files', tma_user_id_str=tma_user_id_str))
@app.route('/admhosto/delete_user/<tma_user_id_str>', methods=['POST'])
@admin_browser_login_required
def admin_delete_user(tma_user_id_str):
if not HF_TOKEN_WRITE:
flash('Удаление невозможно: токен для записи не настроен.', 'error'); return redirect(url_for('admin_panel'))
data = load_data()
if tma_user_id_str not in data['users']:
flash('Пользователь не найден!', 'error'); return redirect(url_for('admin_panel'))
try:
api = HfApi()
user_folder_path_on_hf = f"cloud_files/{tma_user_id_str}"
api.delete_folder(folder_path=user_folder_path_on_hf, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, ignore_patterns=[".keep"])
except hf_utils.HfHubHTTPError as e:
if e.response.status_code != 404:
flash(f'Ошибка удаления файлов пользователя {tma_user_id_str} с сервера: {e}. Пользователь из базы не удален.', 'error'); return redirect(url_for('admin_panel'))
except Exception as e:
flash(f'Ошибка удаления файлов пользователя {tma_user_id_str} с сервера: {e}. Пользователь из базы не удален.', 'error'); return redirect(url_for('admin_panel'))
try:
del data['users'][tma_user_id_str]
save_data(data)
flash(f'Пользователь {tma_user_id_str} и его файлы (если были) удалены.')
except Exception as e:
flash(f'Файлы на сервере могли быть удалены, но произошла ошибка удаления пользователя из базы данных: {e}', 'error')
return redirect(url_for('admin_panel'))
@app.route('/admhosto/delete_file/<tma_user_id_str_form>/<file_id>', methods=['POST'])
@admin_browser_login_required
def admin_delete_file(tma_user_id_str_form, file_id):
if not HF_TOKEN_WRITE:
flash('Удаление невозможно: токен для записи не настроен.', 'error'); return redirect(url_for('admin_user_files', tma_user_id_str=tma_user_id_str_form))
data = load_data()
user_data = data.get('users', {}).get(tma_user_id_str_form)
if not user_data:
flash(f'Пользователь {tma_user_id_str_form} не найден.', 'error'); return redirect(url_for('admin_panel'))
file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
if not file_node or file_node.get('type') != 'file':
flash('Файл не найден в базе данных этого пользователя.', 'error'); return redirect(url_for('admin_user_files', tma_user_id_str=tma_user_id_str_form))
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', 'файл')
file_size_to_deduct = file_node.get('size_bytes', 0)
if not hf_path:
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Метаданные файла {original_filename} удалены из базы (путь к файлу на сервере отсутствовал).')
except Exception as e: flash('Ошибка сохранения данных после удаления метаданных (путь отсутствовал).', 'error')
return redirect(url_for('admin_user_files', tma_user_id_str=tma_user_id_str_form))
try:
api = HfApi()
api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Файл {original_filename} успешно удален с сервера и из базы данных!')
except Exception as e: flash('Файл удален с сервера, но произошла ошибка при обновлении базы данных.', 'error')
else:
flash(f'Файл {original_filename} удален с сервера, но не найден в структуре папок пользователя для удаления из базы.', 'error')
except hf_utils.EntryNotFoundError:
flash(f'Файл {original_filename} не найден на сервере. Удаляем из базы данных.')
if remove_node(user_data['filesystem'], file_id):
user_data['storage_used_bytes'] -= file_size_to_deduct
try: save_data(data); flash(f'Запись о файле {original_filename} удалена из базы.')
except Exception as e: flash('Ошибка сохранения данных после удаления записи о файле (файл не на сервере).', 'error')
except Exception as e:
flash(f'Ошибка при удалении файла {original_filename}: {e}', 'error')
return redirect(url_for('admin_user_files', tma_user_id_str=tma_user_id_str_form))
if __name__ == '__main__':
if os.getenv("HF_TOKEN") is None:
pass
if os.getenv("HF_TOKEN_READ") is None:
pass
if os.getenv("ADMIN_TELEGRAM_ID") == "YOUR_ADMIN_TELEGRAM_USER_ID_HERE":
pass
if os.getenv("ADMIN_USERNAME") == "admin" and os.getenv("ADMIN_PASSWORD") == "zeusadminpass":
pass
if os.getenv("BOT_USERNAME") == "ZeusCloudBot":
pass
if HF_TOKEN_WRITE:
download_db_from_hf()
threading.Thread(target=periodic_backup, daemon=True).start()
elif HF_TOKEN_READ:
download_db_from_hf()
else:
if not os.path.exists(DATA_FILE):
with open(DATA_FILE, 'w', encoding='utf-8') as f: json.dump({'users': {}}, f)
app.run(debug=False, host='0.0.0.0', port=7860)