+ files = request.files.getlist('files')
+ if not files or all(not f.filename for f in files):
+ flash('Файлы для загрузки не выбраны.', 'warning')
+ return redirect(url_for('dashboard', current_folder_path=current_path))
- {% with messages = get_flashed_messages(with_categories=true) %}
- {% if messages %}
- {% for category, message in messages %}
-
+ target_hf_path_prefix = target_node.get("path", f"cloud_files/{username}/")
+ if not target_hf_path_prefix.endswith('/'):
+ target_hf_path_prefix += '/'
-
+ # Use a temporary directory for uploads if needed, or upload directly
+ # os.makedirs('temp_uploads', exist_ok=True) # Optional temp storage
-
-
-
-
- Просмотр файла
-
-
-
-
Загрузка...
-
-
-
-
+ api = get_hf_api(write=True)
+ uploaded_count = 0
+ errors = []
-
-
-
-'''
- # Pass helper functions/variables to template
- from urllib.parse import quote, unquote
-
- def encode_path_for_url(path):
- # Simple base64 encoding for path to avoid issues with slashes in URL parameters
- # Not true security, just for URL routing convenience.
- import base64
- return base64.urlsafe_b64encode(path.encode()).decode()
-
- def get_hf_file_url_template(file_path):
- # Use hf_hub_url which handles repo type and endpoint correctly
- # Note: This doesn't automatically add tokens. Do that in JS if needed.
- try:
- return hf_hub_url(repo_id=REPO_ID, filename=file_path, repo_type="dataset")
- except Exception as e:
- logging.error(f"Error generating HF URL for {file_path}: {e}")
- # Fallback or placeholder
- return f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{file_path}" # Older method
+ data = load_data()
+ user_data = data['users'].get(username)
+ if not user_data:
+ flash('Ошибка: пользователь не найден.', 'error')
+ return redirect(url_for('login'))
+ root_node = user_data['root_folder']
+ parent_node = find_node_by_path(root_node, path_segments)
- return render_template_string(
- html,
- username=username,
- user_files=sorted_files,
- sorted_folders=sorted_folders,
- path=path,
- breadcrumbs=breadcrumbs,
- REPO_ID=REPO_ID,
- encode_path=encode_path_for_url, # Pass encoding function
- get_hf_file_url=get_hf_file_url_template # Pass URL generation function
- )
+ if not parent_node or parent_node.get("type") != "folder":
+ flash('Родительская папка не найдена.', 'error')
+ return redirect(url_for('dashboard'))
-@app.route('/create_folder', methods=['POST'])
-@login_required
-def create_folder():
- username = session['username']
- current_path_str = request.form.get('current_path', '')
- new_folder_name = request.form.get('new_folder_name', '').strip()
+ # Check if folder already exists (case-insensitive)
+ if any(c.get("type") == "folder" and c.get("name", '').lower() == safe_folder_name.lower() for c in parent_node.get("children", [])):
+ flash(f'Папка "{safe_folder_name}" уже существует.', 'warning')
+ return redirect(url_for('dashboard', current_folder_path=current_path))
- if not is_safe_name(new_folder_name, MAX_FOLDER_NAME_LENGTH):
- flash(f'Недопустимое имя папки. Имя не должно содержать символы {INVALID_CHARS} и быть длиннее {MAX_FOLDER_NAME_LENGTH} символов.', 'error')
- return redirect(url_for('dashboard', path=current_path_str))
+ # Construct HF path for the new folder
+ parent_hf_path = parent_node.get("path", f"cloud_files/{username}/")
+ if not parent_hf_path.endswith('/'):
+ parent_hf_path += '/'
+ new_folder_hf_path = parent_hf_path + safe_folder_name + "/" # Folders end with /
- data = load_data()
- user_data = data['users'][username]
- current_path_list = parse_path(current_path_str)
+ folder_node = create_folder_node(safe_folder_name, new_folder_hf_path)
- if current_path_list is None:
- flash('Недопустимый текущий путь!', 'error')
- return redirect(url_for('dashboard', path=''))
+ if add_node_to_path(root_node, path_segments, folder_node):
+ try:
+ # HF Hub doesn't strictly need empty folders created,
+ # but we could upload a placeholder if desired. Let's skip for now.
+ # api = get_hf_api(write=True)
+ # placeholder_path = new_folder_hf_path + ".keep"
+ # api.upload_file(path_or_fileobj=BytesIO(b''), path_in_repo=placeholder_path, ...)
- parent_folder_obj = navigate_to_path(user_data['root'], current_path_list)
+ save_data(data)
+ flash(f'Папка "{safe_folder_name}" успешно создана.', 'success')
+ except Exception as e:
+ flash(f'Ошибка при сохранении данных после создания папки: {e}', 'error')
+ # Attempt to rollback the change in memory?
+ remove_node_by_path(root_node, path_segments, safe_folder_name) # Try removing added node
+ else:
+ # This case (collision after check) should be rare
+ flash(f'Не удалось добавить папку "{safe_folder_name}". Возможно, она уже существует.', 'error')
- if parent_folder_obj is None:
- flash('Родитель��кая папка не найдена!', 'error')
- return redirect(url_for('dashboard', path=''))
- parent_folder_obj.setdefault('folders', {}) # Ensure 'folders' key exists
+ return redirect(url_for('dashboard', current_folder_path=current_path))
- if new_folder_name in parent_folder_obj['folders']:
- flash(f'Папка с именем "{new_folder_name}" уже существует здесь.', 'warning')
- else:
- parent_folder_obj['folders'][new_folder_name] = initialize_user_root() # Create new empty folder structure
- try:
- save_data(data)
- flash(f'Папка "{new_folder_name}" успешно создана.', 'success')
- logging.info(f"User {username} created folder '{new_folder_name}' at path '{current_path_str}'")
- except Exception as e:
- flash('Ошибка при сохранении данных. Не удалось создать папку.', 'error')
- logging.error(f"Failed to save data after creating folder for {username}: {e}")
- # Rollback the change in memory if save failed
- if new_folder_name in parent_folder_obj['folders']:
- del parent_folder_obj['folders'][new_folder_name]
- return redirect(url_for('dashboard', path=current_path_str))
+@app.route('/delete_item', methods=['POST'])
+def delete_item():
+ if 'username' not in session:
+ return jsonify({'status': 'error', 'message': 'Не авторизован'}), 401
-@app.route('/delete_folder', methods=['POST'])
-@login_required
-def delete_folder():
username = session['username']
- folder_path_str = request.form.get('folder_path', '') # The path to the folder to delete
+ item_path = request.form.get('item_path') # This should be the full HF path
+ item_name = request.form.get('item_name') # Name displayed (original filename or folder name)
+ item_type = request.form.get('item_type') # 'file' or 'folder'
+ current_view_path = request.form.get('current_view_path', '') # Path of the dashboard view to redirect back to
- if not folder_path_str:
- flash('Не указан путь к папке для удаления.', 'error')
- return redirect(url_for('dashboard', path=''))
+ if not item_path or not item_name or not item_type:
+ flash('Недостаточно информации для удаления.', 'error')
+ return redirect(url_for('dashboard', current_folder_path=current_view_path))
data = load_data()
- user_data = data['users'][username]
- folder_path_list = parse_path(folder_path_str)
+ user_data = data['users'].get(username)
+ if not user_data:
+ flash('Ошибка: пользователь не найден.', 'error')
+ return redirect(url_for('login'))
- if folder_path_list is None:
- flash('Недопустимый путь к папке.', 'error')
- return redirect(url_for('dashboard', path=''))
+ root_node = user_data['root_folder']
- # Find the parent folder and the name of the folder to delete
- if not folder_path_list: # Cannot delete root
- flash('Нельзя удалить корневую папку.', 'error')
- return redirect(url_for('dashboard', path=''))
+ # Determine the parent path segments in the JSON structure based on the item's HF path
+ # Example: item_path = "cloud_files/user/folderA/file.txt" -> segments = ["folderA"]
+ # Example: item_path = "cloud_files/user/folderA/subB/" -> segments = ["folderA", "subB"]
+ base_user_path = f"cloud_files/{username}/"
+ relative_path = item_path.replace(base_user_path, '', 1)
+ path_segments_full = [seg for seg in relative_path.strip('/').split('/') if seg]
- folder_name_to_delete = folder_path_list[-1]
- parent_path_list = folder_path_list[:-1]
- parent_folder_obj = navigate_to_path(user_data['root'], parent_path_list)
+ # If deleting 'file.txt' in 'folderA', parent segments are ['folderA']
+ # If deleting folder 'subB' in 'folderA', parent segments are ['folderA']
+ # Need the name of the item being deleted to find it in the parent's children
+ # The item_name passed from the form should be correct
+ parent_segments = path_segments_full[:-1] if item_type == 'file' else path_segments_full[:-1] # For folders path includes its own name ending in /
- if parent_folder_obj is None or folder_name_to_delete not in parent_folder_obj.get('folders', {}):
- flash('Папка не найдена или не существует.', 'error')
- # Determine redirect path (parent path or root)
- parent_redirect_path = '/'.join(parent_path_list)
- return redirect(url_for('dashboard', path=parent_redirect_path))
+ logging.info(f"Attempting to delete '{item_name}' (type: {item_type}) with HF path '{item_path}'. Parent segments: {parent_segments}")
- # --- Deletion Logic ---
- api = _get_hf_api()
- if not api:
- flash('Ошибка конфигурации сервера: не удается по��учить доступ к хранилищу.', 'error')
- parent_redirect_path = '/'.join(parent_path_list)
- return redirect(url_for('dashboard', path=parent_redirect_path))
+ removed_node = remove_node_by_path(root_node, parent_segments, item_name)
- hf_folder_path = f"cloud_files/{username}/{folder_path_str}"
-
- try:
- logging.info(f"Attempting to delete HF folder: {hf_folder_path}")
- # Use HfApi.delete_folder - This should handle non-empty folders
- api.delete_folder(
- folder_path=hf_folder_path,
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE,
- commit_message=f"User {username} deleted folder {folder_path_str}"
- )
- logging.info(f"Successfully deleted HF folder: {hf_folder_path}")
+ if removed_node:
+ try:
+ # Delete from Hugging Face Hub
+ is_folder = removed_node.get("type") == "folder"
+ hf_object_path = removed_node.get("path") # Get the exact path from the removed node
+ if hf_object_path:
+ delete_hf_object(hf_object_path, is_folder=is_folder)
+ # If folder, HF deletion might handle recursion, or we'd need to list and delete contents first if API requires it. Assume delete_folder handles it.
+ else:
+ raise ValueError("Removed node is missing HF path information.")
- # If HF deletion is successful (or if it didn't exist - check API behavior), remove from JSON
- del parent_folder_obj['folders'][folder_name_to_delete]
- save_data(data)
- flash(f'Папка "{folder_name_to_delete}" и её содержимое успешно удалены.', 'success')
- logging.info(f"User {username} successfully deleted folder '{folder_path_str}'")
+ # Save the updated JSON data
+ save_data(data)
+ flash(f'{"Папка" if is_folder else "Файл"} "{item_name}" успешно удален(а).', 'success')
+ logging.info(f"Successfully deleted item '{item_name}' and saved state.")
- except Exception as e:
- # More specific error handling (e.g., huggingface_hub.utils.RepositoryNotFoundError)
- logging.error(f"Error deleting folder '{folder_path_str}' for user {username}: {e}", exc_info=True)
- # Check if it's a 'not found' error - maybe already deleted?
- if "404" in str(e) or "not found" in str(e).lower():
- logging.warning(f"Folder {hf_folder_path} likely already deleted on HF. Removing from local DB.")
- # Proceed to remove from JSON if it exists there
- if folder_name_to_delete in parent_folder_obj.get('folders', {}):
- del parent_folder_obj['folders'][folder_name_to_delete]
- try:
- save_data(data)
- flash(f'Папка "{folder_name_to_delete}" удалена (возможно, уже была удалена из хранилища).', 'warning')
- except Exception as save_e:
- flash('Ошибка при обновлении списка папок после удаления.', 'error')
- logging.error(f"Failed to save data after attempting local delete of folder {folder_name_to_delete}: {save_e}")
- else:
- flash(f'Папка "{folder_name_to_delete}" не найдена.', 'error')
+ except Exception as e:
+ flash(f'Ошибка при удалении {"папки" if is_folder else "файла"} "{item_name}" из хранилища или при сохранении: {e}', 'error')
+ logging.error(f"Error during deletion process for {item_name}: {e}")
+ # Attempt to rollback JSON change if HF deletion failed? Very tricky.
+ # Best effort: Log error, user might need manual cleanup on HF.
+ # Re-add node to keep JSON consistent with potential HF state?
+ # add_node_to_path(root_node, parent_segments, removed_node) # This might fail if parent was also affected etc. Risky.
+ # For now, leave JSON modified and report error.
+ else:
+ # remove_node_by_path already logged a warning/error
+ flash(f'Не удалось найти или удалить метаданные для "{item_name}".', 'error')
- else:
- # Other HF API error or save_data error
- flash(f'Ошибка при удалении папки "{folder_name_to_delete}". {e}', 'error')
+ return redirect(url_for('dashboard', current_folder_path=current_view_path))
- parent_redirect_path = '/'.join(parent_path_list)
- return redirect(url_for('dashboard', path=parent_redirect_path))
+@app.route('/download/')
+def download_file(item_hf_path):
+ if 'username' not in session:
+ flash('Пожалуйста, войдите для скачивания файлов.', 'warning')
+ # Store intended download path in session? Or just redirect to login.
+ return redirect(url_for('login'))
-@app.route('/download/')
-@login_required
-def download_file(file_path_encoded):
username = session['username']
- # Decode the path
- import base64
- try:
- file_path = base64.urlsafe_b64decode(file_path_encoded.encode()).decode()
- except Exception:
- flash('Неверная ссылка на файл.', 'error')
- return redirect(request.referrer or url_for('dashboard'))
-
- # Basic validation of the decoded path
- if not file_path.startswith(f"cloud_files/{username}/"):
- # Check if it's an admin request coming from the admin panel
- # This check is basic; a proper role/permission system is better
- is_admin_request = request.referrer and 'admhosto' in request.referrer
- if not is_admin_request:
- flash('Доступ запрещен.', 'error')
- logging.warning(f"User {username} attempted to access unauthorized path: {file_path}")
- # Redirect back to where they came from, or their root dashboard
- return redirect(request.referrer or url_for('dashboard'))
- else:
- # Admin access allowed (assuming admin is logged in, though no explicit check here)
- logging.info(f"Admin access granted for download: {file_path}")
- pass # Allow admin download
+ # The item_hf_path from the URL *is* the full HF path
+ # We need the original filename for the 'download_name' attribute.
+ # We have to find the file node in the user's data using the hf_path.
- # Find the file in the data structure to get the original filename
data = load_data()
- original_filename = "downloaded_file" # Default
- file_found = False
-
- # Need to search through the structure
- path_parts = file_path.split('/')[2:] # Get parts after cloud_files/username/
- unique_filename = path_parts[-1]
- folder_path_list = path_parts[:-1]
-
- # Navigate to the folder containing the file
- target_folder = navigate_to_path(data['users'].get(username, {}).get('root', {}), folder_path_list)
-
- if target_folder:
- for file_info in target_folder.get('files', []):
- if file_info.get('path') == file_path:
- original_filename = file_info.get('original_filename', unique_filename)
- file_found = True
- break
-
- # Admin check - if file wasn't found under session user, check the actual user in path
- if not file_found and 'is_admin_request' in locals() and is_admin_request:
- path_user = file_path.split('/')[1]
- if path_user != username and path_user in data['users']:
- target_folder_admin = navigate_to_path(data['users'][path_user]['root'], folder_path_list)
- if target_folder_admin:
- for file_info in target_folder_admin.get('files', []):
- if file_info.get('path') == file_path:
- original_filename = file_info.get('original_filename', unique_filename)
- file_found = True
- break
-
- if not file_found:
- # Even if file not in DB, maybe it exists on HF? Try downloading anyway?
- # Or just fail here. Let's fail for consistency.
- flash('Файл не найден в базе данных.', 'error')
- logging.warning(f"File metadata not found in DB for path: {file_path}")
- return redirect(request.referrer or url_for('dashboard'))
-
- # --- Proceed with HF Download ---
- token = _get_hf_token_read()
- headers = {}
- if token:
- headers["authorization"] = f"Bearer {token}"
-
- # Use hf_hub_url to get the correct download URL
- try:
- file_url = hf_hub_url(repo_id=REPO_ID, filename=file_path, repo_type="dataset")
- # Append download=true manually if hf_hub_url doesn't add it
- if '?' in file_url:
- file_url += '&download=true'
- else:
- file_url += '?download=true'
+ user_data = data['users'].get(username)
+ if not user_data:
+ flash('Ошибка: пользователь не найден.', 'error')
+ return redirect(url_for('login'))
+
+ # Function to search the tree for a file node by its HF path
+ def find_file_by_hf_path(node, target_hf_path):
+ if node.get("type") == "file" and node.get("path") == target_hf_path:
+ return node
+ if node.get("type") == "folder":
+ for child in node.get("children", []):
+ found = find_file_by_hf_path(child, target_hf_path)
+ if found:
+ return found
+ return None
- logging.info(f"Attempting download from URL: {file_url.split('?')[0]}?...") # Log URL without query params
- response = requests.get(file_url, headers=headers, stream=True)
- response.raise_for_status() # Check for HTTP errors (4xx, 5xx)
+ file_node = find_file_by_hf_path(user_data.get('root_folder'), item_hf_path)
- # Get content type if possible
- content_type = response.headers.get('Content-Type', 'application/octet-stream')
+ if not file_node:
+ flash('Файл не найден в ваших записях или у вас нет доступа.', 'error')
+ # Redirect back to dashboard or a relevant path? Hard to know original location.
+ return redirect(url_for('dashboard'))
- # Stream the download
- return send_file(
- BytesIO(response.content), # Consider streaming for large files if needed
- mimetype=content_type,
- as_attachment=True,
- download_name=original_filename # Use the original filename for the user
- )
+ original_filename = file_node.get('original_filename', 'downloaded_file')
+ # Use the direct HF download URL construction
+ file_url = get_hf_download_url(file_node)
+
+ if not file_url:
+ flash('Не удалось создать URL для скачивания.', 'error')
+ return redirect(url_for('dashboard'))
+
+ try:
+ # Use requests to stream the download
+ headers = {}
+ if HF_TOKEN_READ:
+ headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
+
+ response = requests.get(file_url, headers=headers, stream=True, timeout=30) # Add timeout
+ response.raise_for_status() # Check for HTTP errors (4xx, 5xx)
+ # Stream the response
+ return Response(response.iter_content(chunk_size=8192),
+ content_type=response.headers.get('Content-Type', 'application/octet-stream'),
+ headers={"Content-Disposition": f"attachment; filename=\"{original_filename}\""})
+
+ except requests.exceptions.Timeout:
+ logging.error(f"Timeout downloading file from HF: {file_url}")
+ flash('Ошибка скачивания: превышено время ожидания от сервера.', 'error')
+ except requests.exceptions.HTTPError as e:
+ logging.error(f"HTTP error downloading file from HF: {e.response.status_code} for {file_url}")
+ if e.response.status_code == 404:
+ flash('Ошибка скачивания: файл не найден в хранилище.', 'error')
+ # Consider removing the inconsistent node from JSON data here
+ elif e.response.status_code == 401 or e.response.status_code == 403:
+ flash('Ошибка скачивания: недостаточно прав для доступа к файлу (проверьте токен).', 'error')
+ else:
+ flash(f'Ошибка скачивания: {e.response.status_code}', 'error')
except requests.exceptions.RequestException as e:
- logging.error(f"Error downloading file '{file_path}' from HF: {e}")
- flash(f'Ошибка скачивания файла с сервера ({e}).', 'error')
- return redirect(request.referrer or url_for('dashboard'))
+ logging.error(f"Network error downloading file from HF: {e}")
+ flash('Ошибка сети при скачивании файла.', 'error')
except Exception as e:
- logging.error(f"Unexpected error during download of '{file_path}': {e}", exc_info=True)
+ logging.error(f"Unexpected error during download preparation for {original_filename}: {e}")
flash('Произошла непредвиденная ошибка при скачивании файла.', 'error')
- return redirect(request.referrer or url_for('dashboard'))
+ # If any error occurred, redirect back
+ # Redirecting back requires knowing the original context (current_folder_path) which isn't available here easily
+ # Redirect to root dashboard as a fallback
+ return redirect(url_for('dashboard'))
+
+@app.route('/view_text/')
+def view_text_file(item_hf_path):
+ # Similar auth and node finding logic as download
+ if 'username' not in session:
+ return Response("Unauthorized", status=401)
-@app.route('/delete_file', methods=['POST'])
-@login_required
-def delete_file():
username = session['username']
- # Decode the path
- import base64
- file_path_encoded = request.form.get('file_path_encoded')
- if not file_path_encoded:
- flash('Не указан файл для удаления.', 'error')
- return redirect(request.referrer or url_for('dashboard'))
+ data = load_data()
+ user_data = data['users'].get(username)
+ if not user_data:
+ return Response("User not found", status=403)
+
+ # Reuse find_file_by_hf_path from download logic
+ def find_file_by_hf_path(node, target_hf_path):
+ if node.get("type") == "file" and node.get("path") == target_hf_path:
+ return node
+ if node.get("type") == "folder":
+ for child in node.get("children", []):
+ found = find_file_by_hf_path(child, target_hf_path)
+ if found:
+ return found
+ return None
- try:
- file_path = base64.urlsafe_b64decode(file_path_encoded.encode()).decode()
- except Exception:
- flash('Неверная ссылка на файл.', 'error')
- return redirect(request.referrer or url_for('dashboard'))
+ file_node = find_file_by_hf_path(user_data.get('root_folder'), item_hf_path)
- # Validate ownership (user must own the file path)
- if not file_path.startswith(f"cloud_files/{username}/"):
- flash('Доступ запрещен.', 'error')
- logging.warning(f"User {username} attempted unauthorized delete on path: {file_path}")
- return redirect(request.referrer or url_for('dashboard'))
+ if not file_node or file_node.get("file_type") != 'text':
+ return Response("File not found or not a text file", status=404)
- data = load_data()
- user_data = data['users'][username]
+ file_url = get_hf_download_url(file_node)
+ if not file_url:
+ return Response("Could not generate download URL", status=500)
- # Find the file's parent folder in the data structure
- path_parts = file_path.split('/')[2:] # Get parts after cloud_files/username/
- unique_filename = path_parts[-1]
- folder_path_list = path_parts[:-1]
+ try:
+ headers = {}
+ if HF_TOKEN_READ:
+ headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
+
+ # Limit download size for text preview
+ response = requests.get(file_url, headers=headers, stream=True, timeout=15)
+ response.raise_for_status()
+
+ # Read a limited amount of content
+ max_preview_size = 1 * 1024 * 1024 # 1MB limit for preview
+ content = b''
+ for chunk in response.iter_content(chunk_size=8192):
+ content += chunk
+ if len(content) > max_preview_size:
+ content = content[:max_preview_size] + b'\n... (file truncated for preview)'
+ break
- target_folder_obj = navigate_to_path(user_data['root'], folder_path_list)
- file_info_index = -1
- original_filename = unique_filename # Fallback name
+ # Decode assuming UTF-8, fallback to latin-1
+ try:
+ text_content = content.decode('utf-8')
+ except UnicodeDecodeError:
+ text_content = content.decode('latin-1', errors='replace')
- if target_folder_obj and 'files' in target_folder_obj:
- for i, file_info in enumerate(target_folder_obj['files']):
- if file_info.get('path') == file_path:
- file_info_index = i
- original_filename = file_info.get('original_filename', unique_filename)
- break
+ # Return as plain text
+ return Response(text_content, mimetype='text/plain; charset=utf-8')
- # --- Deletion Logic ---
- api = _get_hf_api()
- if not api:
- flash('Ошибка конфигурации сервера: не удается получить доступ к хранилищу.', 'error')
- return redirect(request.referrer or url_for('dashboard', path='/'.join(folder_path_list)))
+ except Exception as e:
+ logging.error(f"Error fetching text content for {item_hf_path}: {e}")
+ return Response(f"Error fetching file content: {e}", status=500)
- try:
- logging.info(f"Attempting to delete HF file: {file_path}")
- api.delete_file(
- path_in_repo=file_path,
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE,
- commit_message=f"User {username} deleted file {original_filename}"
- )
- logging.info(f"Successfully deleted HF file: {file_path}")
- # If HF deletion successful (or file not found on HF), remove from JSON
- if file_info_index != -1:
- del target_folder_obj['files'][file_info_index]
- save_data(data)
- flash(f'Файл "{original_filename}" успешно удален.', 'success')
- logging.info(f"User {username} successfully deleted file record: {file_path}")
- else:
- # File was deleted from HF, but not found in local DB (maybe inconsistent state?)
- flash(f'Файл "{original_filename}" удален из хранилища, но не найден в локальном списке.', 'warning')
- logging.warning(f"File record not found in DB for deleted path: {file_path}")
- # Optionally, still save data if other changes occurred? For now, no save needed.
+# --- Admin Routes (Simplified) ---
- except Exception as e:
- logging.error(f"Error deleting file '{file_path}' for user {username}: {e}", exc_info=True)
- # Check if it's a 'not found' error - maybe already deleted?
- if "404" in str(e) or "not found" in str(e).lower():
- logging.warning(f"File {file_path} likely already deleted on HF. Removing from local DB if exists.")
- if file_info_index != -1:
- del target_folder_obj['files'][file_info_index]
- try:
- save_data(data)
- flash(f'Файл "{original_filename}" удален (возможно, уже был удален из хранилища).', 'warning')
- except Exception as save_e:
- flash('Ошибка при обновлении списка файлов после удаления.', 'error')
- logging.error(f"Failed to save data after attempting local delete of file {file_path}: {save_e}")
- else:
- flash(f'Файл "{original_filename}" не найден.', 'error')
- else:
- flash(f'Ошибка при удалении файла "{original_filename}". {e}', 'error')
+@app.route('/admhosto')
+def admin_panel():
+ # IMPORTANT: Add robust admin authentication here!
+ # Example: Check if session['username'] is in a predefined admin list
+ # if session.get('username') not in ['admin_user1', 'admin_user2']:
+ # flash('Доступ запрещен.', 'error')
+ # return redirect(url_for('login'))
- return redirect(url_for('dashboard', path='/'.join(folder_path_list)))
+ data = load_data()
+ users_list = []
+ for username, user_data in data.get('users', {}).items():
+ root_folder = user_data.get('root_folder', {})
+ # Count items recursively (simple count for now)
+ def count_items(node):
+ count = 0
+ if node.get("type") == "file":
+ return 1
+ if node.get("type") == "folder":
+ count += sum(count_items(child) for child in node.get("children", []))
+ return count
-@app.route('/logout')
-def logout():
- username = session.pop('username', None)
- # No need for client-side local storage clearing here anymore,
- # the login page JS handles clearing on failed auto-login or successful manual login.
- # Manual logout on server is sufficient.
- if username:
- flash('Вы успешно вышли из системы.', 'success')
- logging.info(f"User {username} logged out.")
- return redirect(url_for('login'))
+ total_files = count_items(root_folder) # This counts only files now
+ users_list.append({
+ 'username': username,
+ 'created_at': user_data.get('created_at', 'N/A'),
+ 'file_count': total_files # Or adjust counting logic as needed
+ })
-# --- Admin Routes (Keep basic for now) ---
-# WARNING: These routes lack proper admin authentication! Add checks!
-def admin_required(f):
- @wraps(f)
- def decorated_function(*args, **kwargs):
- # FIXME: Implement actual admin check here!
- # Example: if session.get('is_admin') != True:
- if session.get('username') != 'admin': # Example: only user 'admin' is admin
- flash('Доступ запрещен. Требуются права администратора.', 'error')
- return redirect(url_for('login'))
- return f(*args, **kwargs)
- return decorated_function
+ users_list.sort(key=lambda x: x['username'].lower())
+ return render_template_string(ADMIN_PANEL_TEMPLATE, users=users_list, BASE_STYLE=BASE_STYLE)
-@app.route('/admhosto')
-@admin_required
-def admin_panel():
- data = load_data()
- users = data.get('users', {})
-
- # Calculate total files per user (recursive helper needed)
- def count_files_recursive(folder_obj):
- count = len(folder_obj.get('files', []))
- for sub_folder in folder_obj.get('folders', {}).values():
- count += count_files_recursive(sub_folder)
- return count
-
- users_with_counts = {}
- for uname, udata in users.items():
- file_count = count_files_recursive(udata.get('root', initialize_user_root()))
- users_with_counts[uname] = {
- 'created_at': udata.get('created_at', 'N/A'),
- 'file_count': file_count
- }
- html = '''
-
-
-
-
- Админ-панель - Zeus Cloud V2
-
-
-
-
-
-
-
Админ-панель
- Назад в Дашборд
- {% with messages = get_flashed_messages(with_categories=true) %}
- {% if messages %}
- {% for category, message in messages %}
{{ message }}
{% endfor %}
-
{% endif %}
- {% endwith %}
-
Список пользователей
-
- {% for username, user_info in users_with_counts.items() %}
-
+ Назад к пользователям
+
+ {% with messages = get_flashed_messages(with_categories=true) %}
+ {% if messages %}
+ {% for category, message in messages %}
+
- # Initial check/creation of data file and uploads dir
- if not os.path.exists(DATA_FILE):
- logging.info(f"{DATA_FILE} not found, attempting initial download or creation.")
- # Try downloading first, it will create empty if download fails/no token
- download_db_from_hf()
- if not os.path.exists(DATA_FILE): # Still doesn't exist
- with open(DATA_FILE, 'w', encoding='utf-8') as f:
- json.dump({'users': {}}, f)
- logging.info(f"Created empty {DATA_FILE}.")
+
+ {% if not items %}
+
У этого пользователя пока нет файлов или папок.
+
+ {% if item.type == 'folder' %}
+ {# Admin folder navigation would go here #}
+ {# Placeholder link #}
+