-'''
- template_context = {
- 'current_user_display_name': get_current_user_display_name(),
- 'items': items_in_folder, 'current_folder_id': current_folder_id,
- 'current_folder': current_folder, 'breadcrumbs': breadcrumbs,
- 'repo_id': REPO_ID, 'HF_TOKEN_READ': HF_TOKEN_READ,
- 'hf_file_url': lambda path, download=False: f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{path}{'?download=true' if download else ''}",
- 'os': os
- }
- return render_template_string(html, **template_context)
+ return render_template_string(TMA_DASHBOARD_HTML_TEMPLATE,
+ display_name=display_name, items=items_in_folder,
+ current_folder_id=current_folder_id, current_folder=current_folder,
+ breadcrumbs=breadcrumbs, repo_id_js=REPO_ID, HF_TOKEN_READ_js=HF_TOKEN_READ,
+ hf_file_url_jinja=lambda path, download=False: f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{path}{'?download=true' if download else ''}",
+ is_admin=is_admin_tma())
+
-@app.route('/api/create_folder', methods=['POST'])
-def create_folder_api():
- if 'telegram_user_id' not in session: return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
- user_telegram_id_str = str(session['telegram_user_id'])
+@app.route('/create_folder_tma', methods=['POST'])
+def create_folder_tma():
+ if 'telegram_user_id' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
+ tma_user_id = session['telegram_user_id']
data = load_data()
- user_data = data['users'].get(user_telegram_id_str)
+ user_data = data['users'].get(tma_user_id)
if not user_data: return jsonify({'status': 'error', 'message': 'Пользователь не найден'}), 404
parent_folder_id = request.form.get('parent_folder_id', 'root')
folder_name = request.form.get('folder_name', '').strip()
-
if not folder_name:
flash('Имя папки не может быть пустым!', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=parent_folder_id))
- if not all(c.isalnum() or c in [' ', '_', '-'] for c in folder_name):
- flash('Имя папки может содержать буквы, цифры, пробелы, дефисы и подчеркивания.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=parent_folder_id))
+ return redirect(url_for('tma_dashboard', folder_id=parent_folder_id))
folder_id = uuid.uuid4().hex
- folder_data = {'type': 'folder', 'id': folder_id, 'name': folder_name, 'children': [] }
+ folder_data = {'type': 'folder', 'id': folder_id, 'name': folder_name, 'children': []}
if add_node(user_data['filesystem'], parent_folder_id, folder_data):
- try: save_data(data); flash(f'Папка "{folder_name}" успешно создана.')
- except Exception as e: flash('Ошибка сохранения данных при создании папки.', 'error'); logging.error(f"Create folder save error: {e}")
- else: flash('Не удалось найти родительскую папку.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=parent_folder_id))
-
-@app.route('/download/')
-def download_file(file_id):
- is_admin_access = is_telegram_admin() and (request.referrer and 'admhosto' in request.referrer)
-
- if 'telegram_user_id' not in session and not is_admin_access:
- flash('Пожалуйста, пройдите аутентификацию.')
- return redirect(url_for('root_path'))
+ try:
+ save_data(data)
+ flash(f'Папка "{folder_name}" успешно создана.')
+ except Exception as e: flash('Ошибка сохранения данных при создании папки.', 'error')
+ else:
+ flash('Не удалось найти родительскую папку.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=parent_folder_id))
+
+@app.route('/download_tma/')
+def download_tma(file_id):
+ current_tma_user_id = session.get('telegram_user_id')
+ if not current_tma_user_id:
+ flash('Пожалуйста, авторизуйтесь.', 'error')
+ return redirect(url_for('tma_entry_page'))
data = load_data()
file_node = None
- user_context_id_str = str(session.get('telegram_user_id')) if 'telegram_user_id' in session else None
-
- if user_context_id_str:
- user_data = data['users'].get(user_context_id_str)
- if user_data: file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
+
+ user_data = data['users'].get(current_tma_user_id)
+ if user_data:
+ file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
- if not file_node and is_telegram_admin(): # Admin can download any file
- for uid_str, udata_val in data.get('users', {}).items():
- node, _ = find_node_by_id(udata_val.get('filesystem', {}), file_id)
+ if not file_node and is_admin_tma():
+ for uid_str, udata_iter in data.get('users', {}).items():
+ node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file':
- file_node = node; user_context_id_str = uid_str; break
+ file_node = node
+ break
if not file_node or file_node.get('type') != 'file':
- flash('Файл не найден!', 'error')
- return redirect(request.referrer or url_for('main_app_view_and_upload' if 'telegram_user_id' in session else 'root_path'))
+ flash('Файл не найден или доступ запрещен!', 'error')
+ return redirect(request.referrer or url_for('tma_dashboard'))
hf_path = file_node.get('path')
original_filename = file_node.get('original_filename', 'downloaded_file')
if not hf_path:
flash('Ошибка: Путь к файлу не найден.', 'error')
- return redirect(request.referrer or url_for('main_app_view_and_upload' if 'telegram_user_id' in session else 'root_path'))
+ return redirect(request.referrer or url_for('tma_dashboard'))
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
try:
- headers = {};
+ headers = {}
if HF_TOKEN_READ: headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
- response = requests.get(file_url, headers=headers, stream=True); response.raise_for_status()
- return send_file(BytesIO(response.content), as_attachment=True, download_name=original_filename, mimetype='application/octet-stream')
+ response = requests.get(file_url, headers=headers, stream=True)
+ response.raise_for_status()
+ return send_file(BytesIO(response.content), as_attachment=True, download_name=original_filename)
except Exception as e:
- logging.error(f"Error downloading file from HF ({hf_path}): {e}")
- flash(f'Ошибка скачивания файла {original_filename}! ({e})', 'error')
- return redirect(request.referrer or url_for('main_app_view_and_upload' if 'telegram_user_id' in session else 'root_path'))
-
-@app.route('/api/delete_file/', methods=['POST'])
-def delete_file_api(file_id):
- if 'telegram_user_id' not in session: return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
- user_telegram_id_str = str(session['telegram_user_id'])
+ flash(f'Ошибка скачивания файла: {e}', 'error')
+ return redirect(request.referrer or url_for('tma_dashboard'))
+
+@app.route('/delete_file_tma/', methods=['POST'])
+def delete_file_tma(file_id):
+ if 'telegram_user_id' not in session:
+ flash('Пожалуйста, авторизуйтесь.', 'error')
+ return redirect(url_for('tma_entry_page'))
+ tma_user_id = session['telegram_user_id']
data = load_data()
- user_data = data['users'].get(user_telegram_id_str)
- if not user_data: flash('Пользователь не найден!', 'error'); session.clear(); return redirect(url_for('root_path'))
+ user_data = data['users'].get(tma_user_id)
+ if not user_data:
+ session.clear(); flash('Пользователь не найден.', 'error'); return redirect(url_for('tma_entry_page'))
- file_node, parent_node = find_node_by_id(user_data['filesystem'], file_id)
+ file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
current_view_folder_id = request.form.get('current_view_folder_id', 'root')
+ if not file_node or file_node.get('type') != 'file':
+ flash('Файл не найден.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
- if not file_node or file_node.get('type') != 'file' or not parent_node:
- flash('Файл не найден или не может быть удален.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
-
- hf_path = file_node.get('path'); original_filename = file_node.get('original_filename', 'файл')
+ hf_path = file_node.get('path')
+ original_filename = file_node.get('original_filename', 'файл')
if not hf_path:
- flash(f'Ошибка: Путь к файлу {original_filename} не найден. Удаление только из базы.', 'error')
if remove_node(user_data['filesystem'], file_id):
- try: save_data(data); flash(f'Метаданные файла {original_filename} удалены.')
- except Exception as e: flash('Ошибка сохранения данных.', 'error'); logging.error(f"Delete file metadata save error: {e}")
- return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
+ try: save_data(data); flash(f'Метаданные файла {original_filename} удалены (путь отсутствовал).')
+ except Exception as e: flash('Ошибка сохранения данных после удаления метаданных.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
- if not HF_TOKEN_WRITE: flash('Удаление невозможно: токен не настроен.', 'error'); return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
+ if not HF_TOKEN_WRITE:
+ flash('Удаление невозможно: токен для записи не настроен.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
try:
api = HfApi()
- api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE, commit_message=f"User {user_telegram_id_str} deleted {file_id}")
+ api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
if remove_node(user_data['filesystem'], file_id):
try: save_data(data); flash(f'Файл {original_filename} успешно удален!')
- except Exception as e: flash('Файл удален, ошибка обновления базы.', 'error'); logging.error(f"Delete file DB update error: {e}")
- else: flash('Файл удален, но не найден в базе.', 'error')
+ except Exception as e: flash('Файл удален с сервера, но ошибка обновления базы.', 'error')
+ else: flash('Файл удален с сервера, но не найден в базе для удаления.', 'error')
except hf_utils.EntryNotFoundError:
if remove_node(user_data['filesystem'], file_id):
try: save_data(data); flash(f'Файл {original_filename} не найден на сервере, удален из базы.')
- except Exception as e: flash('Ошибка сохранения (файл не на сервере).', 'error'); logging.error(f"Delete file metadata save error (HF not found): {e}")
- else: flash('Файл не найден нигде.', 'error')
- except Exception as e: logging.error(f"Error deleting file {hf_path} for {user_telegram_id_str}: {e}"); flash(f'Ошибка удаления {original_filename}: {e}', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
-
-@app.route('/api/delete_folder/', methods=['POST'])
-def delete_folder_api(folder_id):
- if 'telegram_user_id' not in session: return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
- if folder_id == 'root': flash('Нельзя удалить корневую папку!', 'error'); return redirect(url_for('main_app_view_and_upload'))
- user_telegram_id_str = str(session['telegram_user_id'])
+ except Exception as e: flash('Ошибка сохранения (файл не на сервере).', 'error')
+ except Exception as e:
+ flash(f'Ошибка удаления файла {original_filename}: {e}', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
+
+@app.route('/delete_folder_tma/', methods=['POST'])
+def delete_folder_tma(folder_id):
+ if 'telegram_user_id' not in session:
+ flash('Пожалуйста, авторизуйтесь.', 'error'); return redirect(url_for('tma_entry_page'))
+ if folder_id == 'root':
+ flash('Нельзя удалить корневую папку!', 'error'); return redirect(url_for('tma_dashboard'))
+ tma_user_id = session['telegram_user_id']
data = load_data()
- user_data = data['users'].get(user_telegram_id_str)
- if not user_data: flash('Пользователь не найден!', 'error'); session.clear(); return redirect(url_for('root_path'))
-
+ user_data = data['users'].get(tma_user_id)
+ if not user_data:
+ session.clear(); flash('Пользователь не найден.', 'error'); return redirect(url_for('tma_entry_page'))
+
folder_node, parent_node = find_node_by_id(user_data['filesystem'], folder_id)
current_view_folder_id = request.form.get('current_view_folder_id', 'root')
-
if not folder_node or folder_node.get('type') != 'folder' or not parent_node:
- flash('Папка не найдена или не может быть удалена.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
- folder_name = folder_node.get('name', 'папка')
+ flash('Папка не найдена.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
if folder_node.get('children'):
- flash(f'Папку "{folder_name}" можно удалить только если она пуста.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=current_view_folder_id))
+ flash(f'Папку "{folder_node.get("name")}" можно удалить только если она пуста.', 'error')
+ return redirect(url_for('tma_dashboard', folder_id=current_view_folder_id))
if remove_node(user_data['filesystem'], folder_id):
- try: save_data(data); flash(f'Папка "{folder_name}" удалена.')
- except Exception as e: flash('Ошибка сохранения данных.', 'error'); logging.error(f"Delete empty folder save error: {e}")
+ try: save_data(data); flash(f'Папка "{folder_node.get("name")}" удалена.')
+ except Exception as e: flash('Ошибка сохранения после удаления папки.', 'error')
else: flash('Не удалось удалить папку.', 'error')
- return redirect(url_for('main_app_view_and_upload', folder_id=parent_node.get('id', 'root')))
-
-@app.route('/get_text_content/')
-def get_text_content(file_id):
- is_admin_access = is_telegram_admin() and (request.referrer and 'admhosto' in request.referrer)
- if 'telegram_user_id' not in session and not is_admin_access: return Response("Не авторизован", status=401)
+ return redirect(url_for('tma_dashboard', folder_id=parent_node.get('id', 'root')))
+@app.route('/get_text_content_tma/')
+def get_text_content_tma(file_id):
+ current_tma_user_id = session.get('telegram_user_id')
+ if not current_tma_user_id: return Response("Не авторизован", status=401)
+
data = load_data()
file_node = None
- user_context_id_str = str(session.get('telegram_user_id')) if 'telegram_user_id' in session else None
+ user_data = data['users'].get(current_tma_user_id)
+ if user_data:
+ file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
- if user_context_id_str:
- user_data = data['users'].get(user_context_id_str)
- if user_data: file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
-
- if not file_node and is_telegram_admin():
- for uid_str, udata_val in data.get('users', {}).items():
- node, _ = find_node_by_id(udata_val.get('filesystem', {}), file_id)
+ if not file_node and is_admin_tma():
+ for uid_str, udata_iter in data.get('users', {}).items():
+ node, _ = find_node_by_id(udata_iter.get('filesystem', {}), file_id)
if node and node.get('type') == 'file' and node.get('file_type') == 'text':
file_node = node; break
- if not file_node or file_node.get('type') != 'file' or file_node.get('file_type') != 'text': return Response("Файл не найден", status=404)
+ if not file_node or file_node.get('type') != 'file' or file_node.get('file_type') != 'text':
+ return Response("Текстовый файл не найден", status=404)
hf_path = file_node.get('path')
- if not hf_path: return Response("Путь к файлу отсутствует", status=500)
+ if not hf_path: return Response("Ошибка: путь к файлу отсутствует", status=500)
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
try:
- headers = {};
+ headers = {};
if HF_TOKEN_READ: headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
- response = requests.get(file_url, headers=headers); response.raise_for_status()
+ response = requests.get(file_url, headers=headers)
+ response.raise_for_status()
if len(response.content) > 1 * 1024 * 1024: return Response("Файл слишком большой.", status=413)
try: text_content = response.content.decode('utf-8')
- except UnicodeDecodeError: text_content = response.content.decode('latin-1')
+ except UnicodeDecodeError: text_content = response.content.decode('latin-1', errors='ignore')
return Response(text_content, mimetype='text/plain')
- except Exception as e: logging.error(f"Error fetching text from HF ({hf_path}): {e}"); return Response(f"Ошибка: {e}", status=502)
+ except Exception as e: return Response(f"Ошибка загрузки: {e}", status=502)
+@app.route('/tma_logout')
+def tma_logout():
+ session.clear()
+ flash('Вы вышли из сессии приложения.')
+ return redirect(url_for('tma_entry_page'))
-@app.route('/admhosto')
-def admin_panel():
- if not is_telegram_admin(): flash('Доступ запрещен.', 'error'); return redirect(url_for('root_path'))
- data = load_data(); users = data.get('users', {}); user_details = []
- for uid_str, udata_val in users.items():
- file_count = 0; q = [udata_val.get('filesystem', {}).get('children', [])]
- while q:
- current_level = q.pop(0)
- for item_val in current_level:
- if item_val.get('type') == 'file': file_count += 1
- elif item_val.get('type') == 'folder' and 'children' in item_val: q.append(item_val.get('children', []))
- user_display = udata_val.get('telegram_first_name', '') or udata_val.get('telegram_username', uid_str)
- user_details.append({'id_str': uid_str, 'display_name': user_display, 'created_at': udata_val.get('created_at', 'N/A'), 'file_count': file_count})
- html = '''
+ADMIN_PANEL_HTML_TMA_TEMPLATE = '''
Админ-панель
Админ-панель
-Вернуться в приложение
+Назад в приложение
{% with messages = get_flashed_messages(with_categories=true) %}{% if messages %}{% for category, message in messages %}