+ {# Use JS to trigger download if direct link causes issues in TMA #}
+ Скачать
+ {% if previewable %}
+
+ {% endif %}
+
+
+ {% endif %}
+
+ {% endfor %}
+ {% if not items %}
Эта папка пуста.
{% endif %}
+
+
+ {# Очистить сессию #}
+ {# Logout doesn't make much sense in TMA context #}
+
+'''
+ template_context = {
+ 'tg_id': tg_id,
+ 'display_name': display_name,
+ 'items': items_in_folder,
+ 'current_folder_id': current_folder_id,
+ 'current_folder': current_folder,
+ 'breadcrumbs': breadcrumbs,
+ 'repo_id': REPO_ID,
+ 'HF_TOKEN_READ': HF_TOKEN_READ,
+ 'hf_file_url': lambda path, download=False: f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{path}{'?download=true' if download else ''}",
+ 'os': os # Keep os if needed by templates, though unlikely now
+ }
+ return render_template_string(dashboard_html, **template_context)
+
+
+# --- File/Folder Operations ---
+
+@app.route('/upload', methods=['POST'])
+def upload_file():
+ if 'tg_id' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
+
+ if not HF_TOKEN_WRITE:
+ # Return JSON for JS handler
+ return jsonify({'status': 'error', 'message': 'Загрузка невозможна: токен для записи не настроен.'}), 403
+
+ tg_id = session['tg_id']
+ tg_id_str = str(tg_id)
+ # Use tg_id for path to avoid issues with changing usernames
+ user_identifier_for_path = tg_id_str
+
+ data = load_data()
+ if tg_id_str not in data.get('users', {}):
+ return jsonify({'status': 'error', 'message': 'Пользователь не найден'}), 404
+ user_data = data['users'][tg_id_str]
+
+ files = request.files.getlist('files')
+ if not files or all(not f.filename for f in files):
+ return jsonify({'status': 'error', 'message': 'Файлы для загрузки не выбраны.'}), 400
+
+ if len(files) > 20:
+ return jsonify({'status': 'error', 'message': 'Максимум 20 файлов за раз!'}), 400
+
+ target_folder_id = request.form.get('current_folder_id', 'root')
+ target_folder_node, _ = find_node_by_id(user_data.get('filesystem'), target_folder_id)
+
+ if not target_folder_node or target_folder_node.get('type') != 'folder':
+ return jsonify({'status': 'error', 'message': 'Целевая папка для загрузки не найдена!'}), 404
+
+ api = HfApi()
+ uploaded_count = 0
+ errors = []
+ save_needed = False
+
+ for file in files:
+ if file and file.filename:
+ original_filename = secure_filename(file.filename)
+ name_part, ext_part = os.path.splitext(original_filename)
+ unique_suffix = uuid.uuid4().hex[:8]
+ # Keep filename relatively simple for HF path
+ unique_filename = f"{secure_filename(name_part)}_{unique_suffix}{ext_part}"
+ file_id = uuid.uuid4().hex
+
+ # Construct path using tg_id and target folder id
+ hf_path = f"cloud_files/{user_identifier_for_path}/{target_folder_id}/{unique_filename}"
+ temp_path = os.path.join(UPLOAD_FOLDER, f"{file_id}_{unique_filename}")
+
+ try:
+ file.save(temp_path)
+
+ api.upload_file(
+ path_or_fileobj=temp_path,
+ path_in_repo=hf_path,
+ repo_id=REPO_ID,
+ repo_type="dataset",
+ token=HF_TOKEN_WRITE,
+ commit_message=f"User {tg_id} uploaded {original_filename} to folder {target_folder_id}"
+ )
+
+ file_info = {
+ 'type': 'file',
+ 'id': file_id,
+ 'original_filename': original_filename,
+ 'unique_filename': unique_filename, # Store for potential reference
+ 'path': hf_path,
'file_type': get_file_type(original_filename),
'upload_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 'size': file_size # Add size if needed
}
if add_node(user_data['filesystem'], target_folder_id, file_info):
uploaded_count += 1
save_needed = True
else:
- errors.append(f"Error adding metadata for {original_filename}.")
- logging.error(f"Failed to add node metadata for file {file_id} to folder {target_folder_id} for user {telegram_id_str}")
- # Attempt to clean up the orphaned file on HF
- delete_hf_file(hf_path, telegram_id_str, original_filename)
+ error_msg = f"Ошибка добавления метаданных для {original_filename} в папку {target_folder_id}."
+ errors.append(error_msg)
+ logging.error(f"Failed to add node metadata for file {file_id} to folder {target_folder_id} for user {tg_id}")
+ # Attempt to clean up orphaned file on HF
+ try:
+ api.delete_file(path_in_repo=hf_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN_WRITE)
+ logging.info(f"Cleaned up orphaned HF file: {hf_path}")
+ except Exception as del_err:
+ logging.error(f"Failed to delete orphaned file {hf_path} from HF Hub: {del_err}")
except Exception as e:
- logging.error(f"Error uploading file {original_filename} for {telegram_id_str}: {e}", exc_info=True)
- errors.append(f"Error uploading {original_filename}: {str(e)}")
- # Clean up local temp file if upload fails
+ logging.error(f"Error uploading file {original_filename} for {tg_id}: {e}", exc_info=True)
+ errors.append(f"Ошибка загрузки файла {original_filename}: {e}")
finally:
if os.path.exists(temp_path):
- try:
- os.remove(temp_path)
- except Exception as e_rem:
- logging.error(f"Error removing temp file {temp_path}: {e_rem}")
+ os.remove(temp_path)
if save_needed:
try:
save_data(data)
+ logging.info(f"{uploaded_count} files uploaded successfully for user {tg_id}.")
except Exception as e:
- logging.error(f"Error saving data after upload for {telegram_id_str}: {e}")
- # Return success but with a warning about saving metadata
- return jsonify({
- "status": "warning",
- "message": f"{uploaded_count} file(s) uploaded, but failed to save metadata.",
- "errors": errors
- }), 500
+ errors.append('Файлы загружены, но произошла ошибка сохранения метаданных.')
+ logging.error(f"Error saving data after upload for {tg_id}: {e}", exc_info=True)
+ final_message = ""
+ if uploaded_count > 0:
+ final_message += f'{uploaded_count} файл(ов) успешно загружено! '
if errors:
- return jsonify({
- "status": "warning" if uploaded_count > 0 else "error",
- "message": f"{uploaded_count} file(s) uploaded with {len(errors)} errors.",
- "errors": errors
- }), 207 # Multi-Status or use 500 if critical
- elif uploaded_count > 0:
- return jsonify({"status": "success", "message": f"{uploaded_count} file(s) uploaded successfully."})
- else:
- # This case should ideally be caught earlier (no files selected)
- return jsonify({"status": "error", "message": "Upload failed. No files were processed."}), 400
+ final_message += "Ошибки: " + "; ".join(errors)
+ status_code = 200 if uploaded_count > 0 and not errors else (500 if errors else 200)
+ status_str = "success" if status_code == 200 else "error"
-@app.route('/create_folder', methods=['POST'])
-def create_folder():
- req_data = request.json
- init_data_str = req_data.get('initData')
- parent_folder_id = req_data.get('parentFolderId', 'root')
- folder_name = req_data.get('folderName', '').strip()
+ return jsonify({'status': status_str, 'message': final_message.strip()}), status_code
- user_info, is_valid = validate_telegram_data(init_data_str, BOT_TOKEN)
- if not is_valid or not user_info:
- return jsonify({"status": "error", "message": "Invalid session"}), 403
- telegram_id_str = str(user_info['id'])
-
- if not folder_name:
- return jsonify({"status": "error", "message": "Folder name cannot be empty."}), 400
-
- # Basic validation for folder name (adjust regex as needed)
- # Allow letters, numbers, spaces, underscores, hyphens
- if not all(c.isalnum() or c in ' _-' for c in folder_name):
- return jsonify({"status": "error", "message": "Folder name contains invalid characters."}), 400
+@app.route('/create_folder', methods=['POST'])
+def create_folder():
+ if 'tg_id' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
+ tg_id = session['tg_id']
+ tg_id_str = str(tg_id)
data = load_data()
- user_data = data['users'].get(telegram_id_str)
+ user_data = data['users'].get(tg_id_str)
if not user_data:
- return jsonify({"status": "error", "message": "User data not found."}), 404
+ return jsonify({'status': 'error', 'message': 'Пользователь не найден'}), 404
+
+ parent_folder_id = request.form.get('parent_folder_id', 'root')
+ # Sanitize folder name - allow letters, numbers, spaces, underscore
+ folder_name_raw = request.form.get('folder_name', '').strip()
+ folder_name = "".join(c for c in folder_name_raw if c.isalnum() or c in (' ', '_')).strip()
- # Check if parent folder exists
- parent_node, _ = find_node_by_id(user_data['filesystem'], parent_folder_id)
- if not parent_node or parent_node.get('type') != 'folder':
- logging.error(f"Parent folder {parent_folder_id} not found for folder creation by user {telegram_id_str}")
- return jsonify({"status": "error", "message": "Parent folder not found."}), 404
- # Optional: Check for duplicate folder name within the parent
- if 'children' in parent_node:
- existing_names = {child.get('name', '').lower() for child in parent_node['children'] if child.get('type') == 'folder'}
- if folder_name.lower() in existing_names:
- return jsonify({"status": "error", "message": f"A folder named '{folder_name}' already exists here."}), 409 # Conflict
+ if not folder_name:
+ return jsonify({'status': 'error', 'message': 'Имя папки не может быть пустым или содержать недопустимые символы.'}), 400
+ if len(folder_name) > 50: # Add length limit
+ return jsonify({'status': 'error', 'message': 'Имя папки слишком длинное (макс 50 симв).'}), 400
folder_id = uuid.uuid4().hex
@@ -695,275 +1215,295 @@ def create_folder():
'type': 'folder',
'id': folder_id,
'name': folder_name,
- 'children': []
- # 'created_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Optional
+ 'children': [] # Always initialize children for new folders
}
- if add_node(user_data['filesystem'], parent_folder_id, folder_data):
+ # Ensure filesystem structure is valid before adding
+ if 'filesystem' not in user_data or not isinstance(user_data.get('filesystem'), dict):
+ initialize_user_filesystem(user_data)
+
+
+ if add_node(user_data.get('filesystem'), parent_folder_id, folder_data):
try:
save_data(data)
- logging.info(f"Folder '{folder_name}' (ID: {folder_id}) created by user {telegram_id_str} in parent {parent_folder_id}")
- # Return the newly created folder info if needed by frontend
- return jsonify({"status": "success", "message": f"Folder '{folder_name}' created.", "newFolder": folder_data})
+ return jsonify({'status': 'success', 'message': f'Папка "{folder_name}" успешно создана.'})
except Exception as e:
- logging.error(f"Failed to save data after creating folder '{folder_name}' for user {telegram_id_str}: {e}")
- # Attempt to rollback? Difficult without transactions. Maybe remove the node added?
- # remove_node(user_data['filesystem'], folder_id) # Attempt rollback (might fail if structure changed)
- return jsonify({"status": "error", "message": "Failed to save data after creating folder."}), 500
+ logging.error(f"Create folder save error for user {tg_id}: {e}", exc_info=True)
+ # Attempt to remove the added node if save fails? Maybe too complex.
+ return jsonify({'status': 'error', 'message': 'Ошибка сохранения данных при создании папки.'}), 500
else:
- # This should theoretically be caught by the parent_node check earlier
- logging.error(f"add_node failed for folder '{folder_name}' by user {telegram_id_str} even after parent check passed.")
- return jsonify({"status": "error", "message": "Failed to add folder to filesystem."}), 500
+ return jsonify({'status': 'error', 'message': 'Не удалось найти родительскую папку или добавить узел.'}), 404
+@app.route('/download/')
+def download_file(file_id):
+ tg_id_str = None
+ is_admin_req = is_admin() # Check admin status early
-@app.route('/delete_item', methods=['POST'])
-def delete_item():
- req_data = request.json
- init_data_str = req_data.get('initData')
- item_id = req_data.get('itemId')
- item_type = req_data.get('itemType') # 'file' or 'folder'
+ if 'tg_id' in session:
+ tg_id_str = str(session['tg_id'])
+ elif not is_admin_req:
+ flash('Пожалуйста, авторизуйтесь.') # Flash might not be visible
+ return redirect(url_for('index')) # Redirect to main TMA page
- user_info, is_valid = validate_telegram_data(init_data_str, BOT_TOKEN)
- if not is_valid or not user_info:
- return jsonify({"status": "error", "message": "Invalid session"}), 403
+ data = load_data()
+ file_node = None
+ user_tg_id_of_file = None
+
+ # Try finding file for logged-in user first
+ if tg_id_str and tg_id_str in data.get('users', {}):
+ user_data = data['users'][tg_id_str]
+ file_node, _ = find_node_by_id(user_data.get('filesystem'), file_id)
+ if file_node:
+ user_tg_id_of_file = tg_id_str
+
+ # If not found for current user, admin can search all users
+ if not file_node and is_admin_req:
+ logging.info(f"Admin (TG ID: {session.get('tg_id')}) searching for file ID {file_id} across all users.")
+ for user_id, udata in data.get('users', {}).items():
+ if isinstance(udata, dict):
+ node, _ = find_node_by_id(udata.get('filesystem'), file_id)
+ if node and node.get('type') == 'file':
+ file_node = node
+ user_tg_id_of_file = user_id
+ logging.info(f"Admin found file ID {file_id} belonging to user TG ID {user_tg_id_of_file}")
+ break
+
+ if not file_node or file_node.get('type') != 'file':
+ # Flash might not be seen if redirected immediately
+ # flash('Файл не найден!', 'error')
+ logging.warning(f"File not found (ID: {file_id}) for user {tg_id_str} or admin search.")
+ # Redirect back to wherever they came from, or root dashboard
+ # Using referrer is unreliable; maybe redirect to root always on error?
+ # return redirect(request.referrer or url_for('index'))
+ return Response("Файл не найден", status=404)
- telegram_id_str = str(user_info['id'])
- if not item_id or item_id == 'root':
- return jsonify({"status": "error", "message": "Invalid item ID for deletion."}), 400
- if item_type not in ['file', 'folder']:
- return jsonify({"status": "error", "message": "Invalid item type for deletion."}), 400
+ hf_path = file_node.get('path')
+ original_filename = file_node.get('original_filename', 'downloaded_file')
- data = load_data()
- user_data = data['users'].get(telegram_id_str)
- if not user_data:
- return jsonify({"status": "error", "message": "User data not found."}), 404
+ if not hf_path:
+ logging.error(f"File ID {file_id} (User: {user_tg_id_of_file}) has missing path in metadata.")
+ return Response("Ошибка: Путь к файлу не найден в метаданных.", status=500)
- item_node, parent_node = find_node_by_id(user_data['filesystem'], item_id)
+ # Generate download URL (direct access)
+ file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
+ logging.info(f"Attempting download for file ID {file_id}, Path: {hf_path}, URL: {file_url}")
- if not item_node or item_node.get('type') != item_type or not parent_node:
- logging.warning(f"Item {item_id} (type {item_type}) not found or parent missing for deletion by user {telegram_id_str}")
- return jsonify({"status": "error", "message": f"{item_type.capitalize()} not found or cannot be deleted."}), 404
+ try:
+ headers = {}
+ # Use read token if available (necessary for private repos)
+ if HF_TOKEN_READ:
+ headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
- item_name = item_node.get('name', item_node.get('original_filename', 'item'))
+ # Stream the download
+ response = requests.get(file_url, headers=headers, stream=True, timeout=60) # Add timeout
+ response.raise_for_status() # Check for HTTP errors (4xx, 5xx)
- # --- Folder Deletion Specific Logic ---
- if item_type == 'folder':
- if item_node.get('children'): # Check if folder is not empty
- return jsonify({"status": "error", "message": f"Folder '{item_name}' is not empty. Cannot delete."}), 400
- # Folders usually don't have direct HF representation unless we create placeholder files
- # So, just remove from metadata
- if remove_node(user_data['filesystem'], item_id):
- try:
- save_data(data)
- logging.info(f"Empty folder '{item_name}' (ID: {item_id}) deleted by user {telegram_id_str}")
- return jsonify({"status": "success", "message": f"Folder '{item_name}' deleted."})
- except Exception as e:
- logging.error(f"Failed to save data after deleting folder {item_id} for user {telegram_id_str}: {e}")
- return jsonify({"status": "error", "message": "Failed to save changes after deleting folder."}), 500
- else:
- logging.error(f"remove_node failed for folder {item_id} user {telegram_id_str} despite checks.")
- return jsonify({"status": "error", "message": "Internal error deleting folder from structure."}), 500
-
- # --- File Deletion Specific Logic ---
- elif item_type == 'file':
- hf_path = item_node.get('path')
- if not hf_path:
- logging.warning(f"HF path missing for file {item_id} user {telegram_id_str}. Deleting metadata only.")
- # Proceed to delete metadata even if HF path is missing
- else:
- # Attempt to delete from Hugging Face Hub first
- if not delete_hf_file(hf_path, telegram_id_str, item_name):
- # Decide if failure to delete from HF should prevent metadata deletion
- # For now, let's proceed to delete metadata but return a warning/error
- logging.error(f"Failed to delete file {hf_path} from HF Hub for user {telegram_id_str}. Proceeding with metadata removal.")
- # Return error immediately? Or just warn and remove metadata?
- # return jsonify({"status": "error", "message": f"Failed to delete file '{item_name}' from storage. Please try again."}), 500
+ # Stream the content back to the user
+ return Response(
+ stream_with_context(response.iter_content(chunk_size=8192)),
+ headers={
+ 'Content-Disposition': f'attachment; filename="{original_filename}"',
+ 'Content-Type': response.headers.get('Content-Type', 'application/octet-stream'),
+ 'Content-Length': response.headers.get('Content-Length')
+ }
+ )
+ except requests.exceptions.RequestException as e:
+ logging.error(f"Error downloading file from HF ({hf_path}): {e}")
+ status_code = e.response.status_code if e.response is not None else 502
+ return Response(f'Ошибка скачивания файла {original_filename} с сервера ({status_code}).', status=status_code)
+ except Exception as e:
+ logging.error(f"Unexpected error during download ({hf_path}): {e}", exc_info=True)
+ return Response('Произошла непредвиденная ошибка при скачивании файла.', status=500)
- # Remove file node from filesystem metadata
- if remove_node(user_data['filesystem'], item_id):
- try:
- save_data(data)
- logging.info(f"File '{item_name}' (ID: {item_id}) metadata deleted by user {telegram_id_str}")
- # Check if HF deletion failed earlier to adjust message
- hf_delete_failed = hf_path and not delete_hf_file # Re-check or use a flag
- if hf_delete_failed:
- return jsonify({"status": "warning", "message": f"File '{item_name}' metadata deleted, but failed to remove from storage."})
- else:
- return jsonify({"status": "success", "message": f"File '{item_name}' deleted."})
- except Exception as e:
- logging.error(f"Failed to save data after deleting file {item_id} for user {telegram_id_str}: {e}")
- return jsonify({"status": "error", "message": "Failed to save changes after deleting file."}), 500
- else:
- logging.error(f"remove_node failed for file {item_id} user {telegram_id_str} despite checks.")
- return jsonify({"status": "error", "message": "Internal error deleting file from structure."}), 500
- # Should not reach here if item_type is validated
- return jsonify({"status": "error", "message": "Unknown error during deletion."}), 500
+@app.route('/delete_file/', methods=['POST'])
+def delete_file(file_id):
+ if 'tg_id' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
+ tg_id = session['tg_id']
+ tg_id_str = str(tg_id)
+ data = load_data()
+ user_data = data['users'].get(tg_id_str)
+ if not user_data:
+ return jsonify({'status': 'error', 'message': 'Пользователь не найден'}), 404
-@app.route('/download/')
-def download_file(file_id):
- # For TMA, authentication needs to be passed differently, e.g., via query parameter with initData hash
- # Or rely on a server-side session established after initial validation (less ideal for stateless TMA)
- # Simplest (but less secure if link shared): Check access token in query?
- # Better: Require initData in query/header and validate it.
+ file_node, parent_node = find_node_by_id(user_data.get('filesystem'), file_id)
+ current_view_folder_id = request.form.get('current_view_folder_id', 'root') # Keep track for reload
- init_data_str = request.args.get('initData')
- admin_override = request.args.get('admin_token') # Add a way for admin downloads
+ if not file_node or file_node.get('type') != 'file' or not parent_node:
+ return jsonify({'status': 'error', 'message': 'Файл не найден или не может быть удален.'}), 404
- user_info = None
- is_valid_user = False
- admin_access = False
+ hf_path = file_node.get('path')
+ original_filename = file_node.get('original_filename', 'файл')
+ save_needed = False
+ error_occurred = False
+ messages = []
- if init_data_str:
- user_info, is_valid_user = validate_telegram_data(init_data_str, BOT_TOKEN)
+ # Case 1: Path is missing in metadata, just remove from DB
+ if not hf_path:
+ logging.warning(f"File ID {file_id} (User: {tg_id}) has missing path. Deleting only metadata.")
+ if remove_node(user_data['filesystem'], file_id):
+ save_needed = True
+ messages.append(f'Метаданные файла {original_filename} удалены (путь отсутствовал).')
+ else:
+ messages.append('Не удалось удалить метаданные файла (путь отсутствовал).')
+ error_occurred = True
- # Temporary admin access check (replace with a proper mechanism)
- if admin_override and is_admin(admin_override): # Use the ID passed in token as the admin ID to check
- admin_access = True
- logging.info(f"Admin access granted for download of {file_id} by admin ID {admin_override}")
- # We still need to find *which* user the file belongs to if admin
- elif not is_valid_user or not user_info:
- return Response("Authentication required.", status=403)
+ # Case 2: Path exists, attempt HF deletion first
+ else:
+ if not HF_TOKEN_WRITE:
+ return jsonify({'status': 'error', 'message': 'Удаление невозможно: токен для записи не настроен.'}), 403
- data = load_data()
- file_node = None
- user_id_for_file = None
-
- if admin_access:
- # Admin needs to find the file across all users
- logging.info(f"Admin searching for file ID {file_id} across all users.")
- for u_id, u_data in data.get('users', {}).items():
- node, _ = find_node_by_id(u_data.get('filesystem', {}), file_id)
- if node and node.get('type') == 'file':
- file_node = node
- user_id_for_file = u_id
- logging.info(f"Admin found file ID {file_id} belonging to user {user_id_for_file}")
- break
- elif is_valid_user:
- telegram_id_str = str(user_info['id'])
- user_data = data['users'].get(telegram_id_str)
- if user_data:
- file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
- if file_node and file_node.get('type') == 'file':
- user_id_for_file = telegram_id_str
- else:
- logging.warning(f"User {telegram_id_str} tried to download non-existent/invalid file {file_id}")
- else:
- logging.error(f"User data not found for validated user {telegram_id_str} during download attempt.")
+ try:
+ api = HfApi()
+ api.delete_file(
+ path_in_repo=hf_path,
+ repo_id=REPO_ID,
+ repo_type="dataset",
+ token=HF_TOKEN_WRITE,
+ commit_message=f"User {tg_id} deleted file {original_filename} (ID: {file_id})"
+ )
+ logging.info(f"Deleted file {hf_path} from HF Hub for user {tg_id}")
+ messages.append(f'Файл {original_filename} удален с сервера.')
+
+ # Now remove from DB
+ if remove_node(user_data['filesystem'], file_id):
+ save_needed = True
+ messages.append('Метаданные удалены из базы.')
+ else:
+ messages.append('Файл удален с сервера, но не найден в базе для удаления метаданных.')
+ error_occurred = True # Metadata mismatch
+
+ except hf_utils.EntryNotFoundError:
+ logging.warning(f"File {hf_path} not found on HF Hub during delete for user {tg_id}. Removing from DB.")
+ messages.append(f'Файл {original_filename} не найден на сервере.')
+ if remove_node(user_data['filesystem'], file_id):
+ save_needed = True
+ messages.append('Удален из базы.')
+ else:
+ messages.append('Не найден ни на сервере, ни в базе данных.')
+ error_occurred = True
+ except Exception as e:
+ logging.error(f"Error deleting file {hf_path} for {tg_id}: {e}", exc_info=True)
+ messages.append(f'Ошибка удаления файла {original_filename} с сервера: {e}')
+ error_occurred = True # Don't remove metadata if server deletion failed
- if not file_node or not user_id_for_file:
- logging.warning(f"File node {file_id} not found for download (User valid: {is_valid_user}, Admin: {admin_access})")
- return Response("File not found or access denied.", status=404)
+ if save_needed:
+ try:
+ save_data(data)
+ messages.append('База данных обновлена.')
+ except Exception as e:
+ logging.error(f"Delete file DB update error for {tg_id}: {e}", exc_info=True)
+ messages.append('Ошибка сохранения базы данных после удаления.')
+ error_occurred = True
- hf_path = file_node.get('path')
- original_filename = file_node.get('original_filename', 'downloaded_file')
+ final_status = 'error' if error_occurred else 'success'
+ final_message = " ".join(messages)
+ return jsonify({'status': final_status, 'message': final_message}), 200 if final_status == 'success' else 500
- if not hf_path:
- logging.error(f"HF path missing for file {file_id} (User: {user_id_for_file}). Cannot download.")
- return Response("Error: File path missing in metadata.", status=500)
- # Generate the direct download URL for HF
- # Note: If repo is private, this direct link might not work without auth baked in,
- # which is insecure. Streaming through server is safer for private repos.
- file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
- logging.info(f"Attempting to stream download for file {file_id} from {file_url} (User: {user_id_for_file}, Admin: {admin_access})")
+@app.route('/delete_folder/', methods=['POST'])
+def delete_folder(folder_id):
+ if 'tg_id' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
- try:
- headers = {}
- if HF_TOKEN_READ:
- headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
+ if folder_id == 'root':
+ return jsonify({'status': 'error', 'message': 'Нельзя удалить корневую папку!'}), 400
- # Use stream=True to avoid loading large files into memory
- response = requests.get(file_url, headers=headers, stream=True, timeout=30) # Add timeout
- response.raise_for_status() # Check for HTTP errors (4xx, 5xx)
+ tg_id = session['tg_id']
+ tg_id_str = str(tg_id)
+ data = load_data()
+ user_data = data['users'].get(tg_id_str)
+ if not user_data:
+ return jsonify({'status': 'error', 'message': 'Пользователь не найден'}), 404
- # Stream the response back to the client
- return Response(
- response.iter_content(chunk_size=8192), # Stream in chunks
- content_type=response.headers.get('Content-Type', 'application/octet-stream'),
- headers={ "Content-Disposition": f"attachment; filename*=UTF-8''{secure_filename(original_filename)}" } # Correct encoding for filename
- )
+ folder_node, parent_node = find_node_by_id(user_data.get('filesystem'), folder_id)
+ current_view_folder_id = request.form.get('current_view_folder_id', 'root')
- except requests.exceptions.RequestException as e:
- logging.error(f"Error downloading file from HF ({hf_path}) for user {user_id_for_file}: {e}")
- status_code = 502 # Bad Gateway if HF fails
- if isinstance(e, requests.exceptions.HTTPError):
- status_code = e.response.status_code if e.response is not None else 500
- if status_code == 404: status_code = 404 # Pass through 404
- return Response(f'Error downloading file: {e}', status=status_code)
- except Exception as e:
- logging.error(f"Unexpected error during download streaming ({hf_path}) for user {user_id_for_file}: {e}", exc_info=True)
- return Response('Internal server error during download.', status=500)
+ if not folder_node or folder_node.get('type') != 'folder' or not parent_node:
+ return jsonify({'status': 'error', 'message': 'Папка не найдена или не может быть удалена.'}), 404
+ folder_name = folder_node.get('name', 'папка')
-@app.route('/get_text_content/')
-def get_text_content(file_id):
- # Similar authentication needed as for download
- init_data_str = request.args.get('initData')
- admin_override = request.args.get('admin_token')
+ # Check if folder is empty
+ if folder_node.get('children'):
+ return jsonify({'status': 'error', 'message': f'Папку "{folder_name}" можно удалить только если она пуста.'}), 400
+
+ # No HF deletion needed for empty folders, just remove metadata
+ if remove_node(user_data['filesystem'], folder_id):
+ try:
+ save_data(data)
+ # Determine redirect target (parent folder)
+ redirect_to_folder_id = parent_node.get('id', 'root')
+ return jsonify({'status': 'success', 'message': f'Пустая папка "{folder_name}" успешно удалена.', 'redirect_folder': redirect_to_folder_id })
+ except Exception as e:
+ logging.error(f"Delete empty folder save error for {tg_id}: {e}", exc_info=True)
+ return jsonify({'status': 'error', 'message': 'Ошибка сохранения данных после удаления папки.'}), 500
+ else:
+ return jsonify({'status': 'error', 'message': 'Не удалось удалить папку из базы данных.'}), 500
- user_info = None
- is_valid_user = False
- admin_access = False
- if init_data_str:
- user_info, is_valid_user = validate_telegram_data(init_data_str, BOT_TOKEN)
+@app.route('/get_text_content/')
+def get_text_content(file_id):
+ tg_id_str = None
+ is_admin_req = is_admin()
- if admin_override and is_admin(admin_override):
- admin_access = True
- elif not is_valid_user or not user_info:
- return Response("Authentication required.", status=403)
+ if 'tg_id' in session:
+ tg_id_str = str(session['tg_id'])
+ elif not is_admin_req:
+ return Response("Не авторизован", status=401)
data = load_data()
file_node = None
- user_id_for_file = None
-
- if admin_access:
- for u_id, u_data in data.get('users', {}).items():
- node, _ = find_node_by_id(u_data.get('filesystem', {}), file_id)
- if node and node.get('type') == 'file' and node.get('file_type') == 'text':
- file_node = node
- user_id_for_file = u_id
- break
- elif is_valid_user:
- telegram_id_str = str(user_info['id'])
- user_data = data['users'].get(telegram_id_str)
- if user_data:
- file_node, _ = find_node_by_id(user_data['filesystem'], file_id)
- if file_node and file_node.get('type') == 'file' and node.get('file_type') == 'text':
- user_id_for_file = telegram_id_str
-
-
- if not file_node or not user_id_for_file or file_node.get('file_type') != 'text':
- return Response("Text file not found or access denied.", status=404)
+ user_tg_id_of_file = None
+
+ # Try finding file for logged-in user first
+ if tg_id_str and tg_id_str in data.get('users', {}):
+ user_data = data['users'][tg_id_str]
+ file_node, _ = find_node_by_id(user_data.get('filesystem'), file_id)
+ if file_node and file_node.get('type') == 'file' and file_node.get('file_type') == 'text':
+ user_tg_id_of_file = tg_id_str
+
+ # If not found for current user, admin can search all users
+ if not file_node and is_admin_req:
+ logging.info(f"Admin (TG ID: {session.get('tg_id')}) searching for text file ID {file_id} across all users.")
+ for user_id, udata in data.get('users', {}).items():
+ if isinstance(udata, dict):
+ node, _ = find_node_by_id(udata.get('filesystem'), file_id)
+ if node and node.get('type') == 'file' and node.get('file_type') == 'text':
+ file_node = node
+ user_tg_id_of_file = user_id
+ logging.info(f"Admin found text file ID {file_id} belonging to user TG ID {user_tg_id_of_file}")
+ break
+
+ if not file_node: # Already checked for type=file and file_type=text in the loops
+ return Response("Текстовый файл не найден", status=404)
hf_path = file_node.get('path')
if not hf_path:
- return Response("Error: File path missing.", status=500)
+ logging.error(f"Text file ID {file_id} (User: {user_tg_id_of_file}) has missing path.")
+ return Response("Ошибка: путь к файлу отсутствует", status=500)
file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{hf_path}?download=true"
- logging.info(f"Fetching text content for {file_id} from {file_url} (User: {user_id_for_file}, Admin: {admin_access})")
try:
headers = {}
if HF_TOKEN_READ:
headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
- response = requests.get(file_url, headers=headers, timeout=15)
+ response = requests.get(file_url, headers=headers, timeout=15) # Timeout for text files
response.raise_for_status()
- # Limit preview size
- MAX_PREVIEW_SIZE = 2 * 1024 * 1024 # 2MB limit for text preview
- if len(response.content) > MAX_PREVIEW_SIZE:
- logging.warning(f"Text file {file_id} too large for preview ({len(response.content)} bytes). User: {user_id_for_file}")
- # Return only the beginning? Or an error?
- # return Response(response.content[:MAX_PREVIEW_SIZE] + b"\n\n--- File truncated ---", mimetype='text/plain')
- return Response("File too large for preview.", status=413)
-
+ # Limit preview size (e.g., 1MB)
+ max_preview_size = 1 * 1024 * 1024
+ if len(response.content) > max_preview_size:
+ # Provide truncated content instead of error?
+ # text_content = response.content[:max_preview_size].decode('utf-8', errors='ignore') + "\n\n[Файл слишком большой, показана только часть]"
+ return Response("Файл слишком большой для предпросмотра (>1MB).", status=413)
# Try decoding with UTF-8 first, then fallback
try:
@@ -971,1498 +1511,492 @@ def get_text_content(file_id):
except UnicodeDecodeError:
try:
# Common fallback for Windows-created files
- text_content = response.content.decode('cp1251') # Or latin-1
- logging.info(f"Decoded text file {file_id} using cp1251 fallback.")
+ text_content = response.content.decode('cp1251')
except Exception:
- logging.error(f"Could not decode text file {file_id} with UTF-8 or fallback encoding.")
- return Response("Error decoding file content. Unsupported encoding?", status=500)
+ # Last resort: latin-1 or ignore errors
+ text_content = response.content.decode('latin-1', errors='ignore')
+ logging.warning(f"Could not determine encoding for {hf_path}. Used latin-1 with errors ignored.")
+
- return Response(text_content, mimetype='text/plain; charset=utf-8') # Specify UTF-8 for browser
+ return Response(text_content, mimetype='text/plain; charset=utf-8') # Specify UTF-8
except requests.exceptions.RequestException as e:
- logging.error(f"Error fetching text content from HF ({hf_path}) for user {user_id_for_file}: {e}")
- status_code = 502
- if isinstance(e, requests.exceptions.HTTPError): status_code = e.response.status_code if e.response is not None else 500
- return Response(f"Error fetching content: {e}", status=status_code)
+ logging.error(f"Error fetching text content from HF ({hf_path}): {e}")
+ status_code = e.response.status_code if e.response is not None else 502
+ return Response(f"Ошибка загрузки содержимого: {status_code}", status=status_code)
except Exception as e:
- logging.error(f"Unexpected error fetching text content ({hf_path}) for user {user_id_for_file}: {e}", exc_info=True)
- return Response("Internal server error.", status=500)
+ logging.error(f"Unexpected error fetching text content ({hf_path}): {e}", exc_info=True)
+ return Response("Внутренняя ошибка сервера", status=500)
-# --- Admin Routes (/admhosto) ---
+@app.route('/logout') # Kept for potential session clearing during testing
+def logout():
+ session.clear()
+ # In TMA context, redirecting to login doesn't make sense.
+ # Maybe redirect to the main page which forces re-auth?
+ return redirect(url_for('index'))
+
+# --- Admin Panel (Separate Access - Not directly part of TMA flow) ---
+
+def is_admin():
+ # Check if the logged-in Telegram user's ID matches the ADMIN_TELEGRAM_ID
+ return 'tg_id' in session and session['tg_id'] == ADMIN_TELEGRAM_ID
@app.route('/admhosto')
def admin_panel():
- auth_header = request.headers.get('X-Telegram-Init-Data')
- admin_token = request.args.get('admin_token') # Allow token via query param for initial access maybe
-
- admin_user_id = None
- if auth_header:
- user_info, is_valid = validate_telegram_data(auth_header, BOT_TOKEN)
- if is_valid and user_info and is_admin(user_info['id']):
- admin_user_id = str(user_info['id'])
- elif admin_token and is_admin(admin_token):
- admin_user_id = admin_token # Use the token itself as the ID for check
-
- if not admin_user_id:
- # Return HTML indicating access denied or redirect logic
- return render_template_string(ADMIN_LOGIN_TEMPLATE)
- # return Response("Access Denied", status=403)
+ # Admin must access this URL in a browser AFTER authenticating via the TMA as the admin user
+ if not is_admin():
+ # flash('Доступ запрещен (Admin).', 'error') # Flash not useful here
+ # Redirect to main page or show an error
+ return Response("Доступ запрещен. Только для администратора.", status=403)
+
data = load_data()
users = data.get('users', {})
+
user_details = []
+ total_files_all_users = 0
+ for tg_id_str, udata in users.items():
+ if not isinstance(udata, dict): continue # Skip malformed data
- for u_id_str, u_data in users.items():
file_count = 0
- folder_count = 0
- total_size = 0 # Calculating size would require iterating and potentially querying HF - skip for now
-
- q = [(u_data.get('filesystem', {}))] # Start with root node
- while q:
- current_node = q.pop(0)
- if not current_node: continue
-
- node_type = current_node.get('type')
- if node_type == 'file':
- file_count += 1
- # size = current_node.get('size', 0) # Add size if stored
- # total_size += size
- elif node_type == 'folder':
- if current_node.get('id') != 'root': # Don't count root as a user folder
- folder_count += 1
- if 'children' in current_node:
- q.extend(current_node.get('children', []))
+ # Simple recursive counter (can be slow for very deep structures)
+ def count_files_recursive(folder):
+ count = 0
+ if not isinstance(folder, dict) or not isinstance(folder.get('children'), list):
+ return 0
+ for item in folder.get('children', []):
+ if isinstance(item, dict):
+ if item.get('type') == 'file':
+ count += 1
+ elif item.get('type') == 'folder':
+ count += count_files_recursive(item)
+ return count
+
+ file_count = count_files_recursive(udata.get('filesystem', {}))
+ total_files_all_users += file_count
user_details.append({
- 'id': u_id_str,
- 'username': u_data.get('username', 'N/A'),
- 'first_name': u_data.get('first_name', 'N/A'),
- 'created_at': u_data.get('created_at', 'N/A'),
- 'file_count': file_count,
- 'folder_count': folder_count,
- # 'total_size_mb': round(total_size / (1024*1024), 2) # Add size if calculated
+ 'tg_id': tg_id_str,
+ 'display_name': udata.get('tg_first_name', f"ID: {tg_id_str}") + (f" (@{udata['tg_username']})" if udata.get('tg_username') else ""),
+ 'created_at': udata.get('created_at', 'N/A'),
+ 'file_count': file_count
})
+ user_details.sort(key=lambda x: x['display_name'].lower())
+
+ admin_html = '''
+
+Админ-панель Cloud
+
+
Всего пользователей: {{ user_details|length }} | Всего файлов: {{ total_files_all_users }}
+{# Add flash message display area if needed for admin actions #}
+{% with messages = get_flashed_messages(with_categories=true) %}
+ {% if messages %}
+ {% for category, message in messages %}
+
- {% if file.file_type == 'image' and file.preview_url %}
-
- {% elif file.file_type == 'video' and file.preview_url %}
- {% set thumb_url = file.preview_url + '#t=0.5' %}
-
- {% endif %}
- {# Icons handled by CSS based on class #}
-
-
{{ file.original_filename | truncate(30) }}
-
-
В папке: {{ file.parent_path_str }}
-
Загружен: {{ file.upload_date }}
-
ID: {{ file.id }}
-
Путь: {{ file.path }}
-
-
-
- Скачать
- {% set previewable = file.file_type in ['image', 'video', 'pdf', 'text', 'audio'] %}
- {% if previewable and file.preview_url %}
-
- {% endif %}
-
-
-
- {% else %}
-
У этого пользователя нет файлов.
- {% endfor %}
-
-
-
-
-
-
- ×
-
Загрузка...
-
-
-
-
-
-
-'''
# --- App Initialization ---
-if __name__ == '__main__':
- if not BOT_TOKEN or len(BOT_TOKEN.split(':')) != 2:
- logging.critical("FATAL: TELEGRAM_BOT_TOKEN is missing or invalid!")
- exit(1)
- if not HF_TOKEN_WRITE:
- logging.warning("HF_TOKEN (write access) is not set. File uploads, deletions, and backups WILL FAIL.")
- if not HF_TOKEN_READ:
- logging.warning("HF_TOKEN_READ is not set. Falling back to HF_TOKEN. File downloads/previews might fail for private repos if HF_TOKEN is also unset.")
- # Perform initial database download before starting app or background tasks
- logging.info("Performing initial database download...")
- download_db_from_hf()
+if __name__ == '__main__':
+ print(f"--- Configuration ---")
+ print(f"Repo ID: {REPO_ID}")
+ print(f"Write Token Set: {'Yes' if HF_TOKEN_WRITE else 'No'}")
+ print(f"Read Token Set: {'Yes' if HF_TOKEN_READ else 'No (using Write Token if set)'}")
+ print(f"Telegram Bot Token Set: {'Yes' if TELEGRAM_BOT_TOKEN else 'No - TELEGRAM AUTH WILL FAIL'}")
+ print(f"Admin TG ID: {ADMIN_TELEGRAM_ID if ADMIN_TELEGRAM_ID else 'Not Set - ADMIN PANEL DISABLED'}")
+ print(f"Data File: {DATA_FILE}")
+ print(f"Upload Folder: {UPLOAD_FOLDER}")
+ print(f"---------------------")
+
+
+ if not TELEGRAM_BOT_TOKEN:
+ logging.error("CRITICAL: TELEGRAM_BOT_TOKEN is not set. Telegram authentication will fail.")
+ if not ADMIN_TELEGRAM_ID:
+ logging.warning("ADMIN_TELEGRAM_ID is not set. Admin panel functionality will be unavailable.")
+
+ # Initial DB download/check
+ if HF_TOKEN_READ or HF_TOKEN_WRITE:
+ logging.info("Performing initial database download/check before starting.")
+ download_db_from_hf()
+ else:
+ logging.warning("No HF read/write token. Database operations with Hugging Face Hub are disabled.")
+ # Ensure local file exists if HF is disabled
+ if not os.path.exists(DATA_FILE):
+ with open(DATA_FILE, 'w', encoding='utf-8') as f:
+ json.dump({'users': {}}, f)
+ logging.info(f"Created empty local database file: {DATA_FILE}")
- # Start periodic backup thread only if write token exists
+ # Start periodic backup thread if write token exists
if HF_TOKEN_WRITE:
backup_thread = threading.Thread(target=periodic_backup, daemon=True)
backup_thread.start()
logging.info("Periodic backup thread started.")
else:
- logging.warning("Periodic backup disabled because HF_TOKEN_WRITE is not set.")
+ logging.warning("Periodic backup disabled because HF_TOKEN (write access) is not set.")
+
+ # Set session lifetime (e.g., 30 days)
+ app.permanent_session_lifetime = timedelta(days=30)
- # Run Flask App (use appropriate server for production, e.g., Gunicorn)
- # For development:
- # Make sure to use host='0.0.0.0' to be accessible on the network
- # Use a proper WSGI server like gunicorn in production:
- # gunicorn --bind 0.0.0.0:7860 your_app_file_name:app
+ # Run the Flask app
+ # Use debug=False for production/TMA context
+ # Host 0.0.0.0 makes it accessible externally (needed for TMA)
app.run(debug=False, host='0.0.0.0', port=7860)
+
+# --- END OF FILE app.py ---
\ No newline at end of file