+
'''
- return render_template_string(html, username=username, user_files=user_files, user_folders=user_folders, repo_id=REPO_ID, current_folder=current_folder, parent_folder=parent_folder)
+ # Pass helper functions/variables to template
+ from urllib.parse import quote, unquote
+
+ def encode_path_for_url(path):
+ # Simple base64 encoding for path to avoid issues with slashes in URL parameters
+ # Not true security, just for URL routing convenience.
+ import base64
+ return base64.urlsafe_b64encode(path.encode()).decode()
+
+ def get_hf_file_url_template(file_path):
+ # Use hf_hub_url which handles repo type and endpoint correctly
+ # Note: This doesn't automatically add tokens. Do that in JS if needed.
+ try:
+ return hf_hub_url(repo_id=REPO_ID, filename=file_path, repo_type="dataset")
+ except Exception as e:
+ logging.error(f"Error generating HF URL for {file_path}: {e}")
+ # Fallback or placeholder
+ return f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{file_path}" # Older method
+
+
+ return render_template_string(
+ html,
+ username=username,
+ user_files=sorted_files,
+ sorted_folders=sorted_folders,
+ path=path,
+ breadcrumbs=breadcrumbs,
+ REPO_ID=REPO_ID,
+ encode_path=encode_path_for_url, # Pass encoding function
+ get_hf_file_url=get_hf_file_url_template # Pass URL generation function
+ )
+
+@app.route('/create_folder', methods=['POST'])
+@login_required
+def create_folder():
+ username = session['username']
+ current_path_str = request.form.get('current_path', '')
+ new_folder_name = request.form.get('new_folder_name', '').strip()
+
+ if not is_safe_name(new_folder_name, MAX_FOLDER_NAME_LENGTH):
+ flash(f'Недопустимое имя папки. Имя не должно содержать символы {INVALID_CHARS} и быть длиннее {MAX_FOLDER_NAME_LENGTH} символов.', 'error')
+ return redirect(url_for('dashboard', path=current_path_str))
+
+ data = load_data()
+ user_data = data['users'][username]
+ current_path_list = parse_path(current_path_str)
+
+ if current_path_list is None:
+ flash('Недопустимый текущий путь!', 'error')
+ return redirect(url_for('dashboard', path=''))
+
+ parent_folder_obj = navigate_to_path(user_data['root'], current_path_list)
+
+ if parent_folder_obj is None:
+ flash('Родительская папка не найдена!', 'error')
+ return redirect(url_for('dashboard', path=''))
+
+ parent_folder_obj.setdefault('folders', {}) # Ensure 'folders' key exists
+
+ if new_folder_name in parent_folder_obj['folders']:
+ flash(f'Папка с именем "{new_folder_name}" уже существует здесь.', 'warning')
+ else:
+ parent_folder_obj['folders'][new_folder_name] = initialize_user_root() # Create new empty folder structure
+ try:
+ save_data(data)
+ flash(f'Папка "{new_folder_name}" успешно создана.', 'success')
+ logging.info(f"User {username} created folder '{new_folder_name}' at path '{current_path_str}'")
+ except Exception as e:
+ flash('Ошибка при сохранении данных. Не удалось создать папку.', 'error')
+ logging.error(f"Failed to save data after creating folder for {username}: {e}")
+ # Rollback the change in memory if save failed
+ if new_folder_name in parent_folder_obj['folders']:
+ del parent_folder_obj['folders'][new_folder_name]
+
+ return redirect(url_for('dashboard', path=current_path_str))
+
+@app.route('/delete_folder', methods=['POST'])
+@login_required
+def delete_folder():
+ username = session['username']
+ folder_path_str = request.form.get('folder_path', '') # The path to the folder to delete
+
+ if not folder_path_str:
+ flash('Не указан путь к папке для удаления.', 'error')
+ return redirect(url_for('dashboard', path=''))
+
+ data = load_data()
+ user_data = data['users'][username]
+ folder_path_list = parse_path(folder_path_str)
+
+ if folder_path_list is None:
+ flash('Недопустимый путь к папке.', 'error')
+ return redirect(url_for('dashboard', path=''))
+
+ # Find the parent folder and the name of the folder to delete
+ if not folder_path_list: # Cannot delete root
+ flash('Нельзя удалить корневую папку.', 'error')
+ return redirect(url_for('dashboard', path=''))
+
+ folder_name_to_delete = folder_path_list[-1]
+ parent_path_list = folder_path_list[:-1]
+ parent_folder_obj = navigate_to_path(user_data['root'], parent_path_list)
+
+ if parent_folder_obj is None or folder_name_to_delete not in parent_folder_obj.get('folders', {}):
+ flash('Папка не найдена или не существует.', 'error')
+ # Determine redirect path (parent path or root)
+ parent_redirect_path = '/'.join(parent_path_list)
+ return redirect(url_for('dashboard', path=parent_redirect_path))
+
+ # --- Deletion Logic ---
+ api = _get_hf_api()
+ if not api:
+ flash('Ошибка конфигурации сервера: не удается получить доступ к хранилищу.', 'error')
+ parent_redirect_path = '/'.join(parent_path_list)
+ return redirect(url_for('dashboard', path=parent_redirect_path))
+
+ hf_folder_path = f"cloud_files/{username}/{folder_path_str}"
-def get_text_file_content(file_path):
try:
- file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{file_path}?download=true"
- headers = {}
- if HF_TOKEN_READ:
- headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
- response = requests.get(file_url, headers=headers)
- response.raise_for_status()
- content = response.text
- return b64encode(content.encode('utf-8')).decode('utf-8')
+ logging.info(f"Attempting to delete HF folder: {hf_folder_path}")
+ # Use HfApi.delete_folder - This should handle non-empty folders
+ api.delete_folder(
+ folder_path=hf_folder_path,
+ repo_id=REPO_ID,
+ repo_type="dataset",
+ token=HF_TOKEN_WRITE,
+ commit_message=f"User {username} deleted folder {folder_path_str}"
+ )
+ logging.info(f"Successfully deleted HF folder: {hf_folder_path}")
+
+ # If HF deletion is successful (or if it didn't exist - check API behavior), remove from JSON
+ del parent_folder_obj['folders'][folder_name_to_delete]
+ save_data(data)
+ flash(f'Папка "{folder_name_to_delete}" и её содержимое успешно удалены.', 'success')
+ logging.info(f"User {username} successfully deleted folder '{folder_path_str}'")
+
except Exception as e:
- logging.error(f"Error fetching text file content: {e}")
- return b64encode("Ошибка загрузки содержимого файла.".encode('utf-8')).decode('utf-8')
-
-@app.route('/download//')
-def download_file(unique_filename, filename):
- if 'username' not in session:
- flash('Пожалуйста, войдите в систему!')
- return redirect(url_for('login'))
+ # More specific error handling (e.g., huggingface_hub.utils.RepositoryNotFoundError)
+ logging.error(f"Error deleting folder '{folder_path_str}' for user {username}: {e}", exc_info=True)
+ # Check if it's a 'not found' error - maybe already deleted?
+ if "404" in str(e) or "not found" in str(e).lower():
+ logging.warning(f"Folder {hf_folder_path} likely already deleted on HF. Removing from local DB.")
+ # Proceed to remove from JSON if it exists there
+ if folder_name_to_delete in parent_folder_obj.get('folders', {}):
+ del parent_folder_obj['folders'][folder_name_to_delete]
+ try:
+ save_data(data)
+ flash(f'Папка "{folder_name_to_delete}" удалена (возможно, уже была удалена из хранилища).', 'warning')
+ except Exception as save_e:
+ flash('Ошибка при обновлении списка папок после удаления.', 'error')
+ logging.error(f"Failed to save data after attempting local delete of folder {folder_name_to_delete}: {save_e}")
+ else:
+ flash(f'Папка "{folder_name_to_delete}" не найдена.', 'error')
+
+ else:
+ # Other HF API error or save_data error
+ flash(f'Ошибка при удалении папки "{folder_name_to_delete}". {e}', 'error')
+
+
+ parent_redirect_path = '/'.join(parent_path_list)
+ return redirect(url_for('dashboard', path=parent_redirect_path))
+
+
+@app.route('/download/')
+@login_required
+def download_file(file_path_encoded):
username = session['username']
+ # Decode the path
+ import base64
+ try:
+ file_path = base64.urlsafe_b64decode(file_path_encoded.encode()).decode()
+ except Exception:
+ flash('Неверная ссылка на файл.', 'error')
+ return redirect(request.referrer or url_for('dashboard'))
+
+ # Basic validation of the decoded path
+ if not file_path.startswith(f"cloud_files/{username}/"):
+ # Check if it's an admin request coming from the admin panel
+ # This check is basic; a proper role/permission system is better
+ is_admin_request = request.referrer and 'admhosto' in request.referrer
+ if not is_admin_request:
+ flash('Доступ запрещен.', 'error')
+ logging.warning(f"User {username} attempted to access unauthorized path: {file_path}")
+ # Redirect back to where they came from, or their root dashboard
+ return redirect(request.referrer or url_for('dashboard'))
+ else:
+ # Admin access allowed (assuming admin is logged in, though no explicit check here)
+ logging.info(f"Admin access granted for download: {file_path}")
+ pass # Allow admin download
+
+ # Find the file in the data structure to get the original filename
data = load_data()
- if username not in data['users']:
- session.pop('username', None)
- flash('Пользователь не найден!')
- return redirect(url_for('login'))
- user_files = data['users'][username]['files']
- file_info = next((file for file in user_files if file['unique_filename'] == unique_filename), None)
- if not file_info:
- is_admin_route = request.referrer and 'admhosto' in request.referrer
- if not is_admin_route:
- flash('У вас нет доступа к этому файлу!')
- return redirect(url_for('dashboard'))
- file_path = file_info['path'] if file_info else f"cloud_files/{username}/{unique_filename}"
- file_url = f"https://huggingface.co/datasets/{REPO_ID}/resolve/main/{file_path}?download=true"
+ original_filename = "downloaded_file" # Default
+ file_found = False
+
+ # Need to search through the structure
+ path_parts = file_path.split('/')[2:] # Get parts after cloud_files/username/
+ unique_filename = path_parts[-1]
+ folder_path_list = path_parts[:-1]
+
+ # Navigate to the folder containing the file
+ target_folder = navigate_to_path(data['users'].get(username, {}).get('root', {}), folder_path_list)
+
+ if target_folder:
+ for file_info in target_folder.get('files', []):
+ if file_info.get('path') == file_path:
+ original_filename = file_info.get('original_filename', unique_filename)
+ file_found = True
+ break
+
+ # Admin check - if file wasn't found under session user, check the actual user in path
+ if not file_found and 'is_admin_request' in locals() and is_admin_request:
+ path_user = file_path.split('/')[1]
+ if path_user != username and path_user in data['users']:
+ target_folder_admin = navigate_to_path(data['users'][path_user]['root'], folder_path_list)
+ if target_folder_admin:
+ for file_info in target_folder_admin.get('files', []):
+ if file_info.get('path') == file_path:
+ original_filename = file_info.get('original_filename', unique_filename)
+ file_found = True
+ break
+
+ if not file_found:
+ # Even if file not in DB, maybe it exists on HF? Try downloading anyway?
+ # Or just fail here. Let's fail for consistency.
+ flash('Файл не найден в базе данных.', 'error')
+ logging.warning(f"File metadata not found in DB for path: {file_path}")
+ return redirect(request.referrer or url_for('dashboard'))
+
+ # --- Proceed with HF Download ---
+ token = _get_hf_token_read()
+ headers = {}
+ if token:
+ headers["authorization"] = f"Bearer {token}"
+
+ # Use hf_hub_url to get the correct download URL
try:
- api = HfApi()
- headers = {}
- if HF_TOKEN_READ:
- headers["authorization"] = f"Bearer {HF_TOKEN_READ}"
+ file_url = hf_hub_url(repo_id=REPO_ID, filename=file_path, repo_type="dataset")
+ # Append download=true manually if hf_hub_url doesn't add it
+ if '?' in file_url:
+ file_url += '&download=true'
+ else:
+ file_url += '?download=true'
+
+ logging.info(f"Attempting download from URL: {file_url.split('?')[0]}?...") # Log URL without query params
response = requests.get(file_url, headers=headers, stream=True)
- response.raise_for_status()
- file_content = BytesIO(response.content)
+ response.raise_for_status() # Check for HTTP errors (4xx, 5xx)
+
+ # Get content type if possible
+ content_type = response.headers.get('Content-Type', 'application/octet-stream')
+
+ # Stream the download
return send_file(
- file_content,
- as_attachment=True,
- download_name=filename,
- mimetype='application/octet-stream'
+ BytesIO(response.content), # Consider streaming for large files if needed
+ mimetype=content_type,
+ as_attachment=True,
+ download_name=original_filename # Use the original filename for the user
)
+
except requests.exceptions.RequestException as e:
- logging.error(f"Error downloading file from HF: {e}")
- flash('Ошибка скачивания файла!')
- if request.referrer and 'admhosto' in request.referrer:
- return redirect(url_for('admin_user_files', username=username))
- else:
- return redirect(url_for('dashboard'))
+ logging.error(f"Error downloading file '{file_path}' from HF: {e}")
+ flash(f'Ошибка скачивания файла с сервера ({e}).', 'error')
+ return redirect(request.referrer or url_for('dashboard'))
except Exception as e:
- logging.error(f"Unexpected error during download: {e}")
- flash('Произошла непредвиденная ошибка при скачивании файла.')
- if request.referrer and 'admhosto' in request.referrer:
- return redirect(url_for('admin_user_files', username=username))
- else:
- return redirect(url_for('dashboard'))
+ logging.error(f"Unexpected error during download of '{file_path}': {e}", exc_info=True)
+ flash('Произошла непредвиденная ошибка при скачивании файла.', 'error')
+ return redirect(request.referrer or url_for('dashboard'))
+
-@app.route('/delete/')
-def delete_file(unique_filename):
- if 'username' not in session:
- flash('Пожалуйста, войдите в систему!')
- return redirect(url_for('login'))
+@app.route('/delete_file', methods=['POST'])
+@login_required
+def delete_file():
username = session['username']
+ # Decode the path
+ import base64
+ file_path_encoded = request.form.get('file_path_encoded')
+ if not file_path_encoded:
+ flash('Не указан файл для удаления.', 'error')
+ return redirect(request.referrer or url_for('dashboard'))
+
+ try:
+ file_path = base64.urlsafe_b64decode(file_path_encoded.encode()).decode()
+ except Exception:
+ flash('Неверная ссылка на файл.', 'error')
+ return redirect(request.referrer or url_for('dashboard'))
+
+ # Validate ownership (user must own the file path)
+ if not file_path.startswith(f"cloud_files/{username}/"):
+ flash('Доступ запрещен.', 'error')
+ logging.warning(f"User {username} attempted unauthorized delete on path: {file_path}")
+ return redirect(request.referrer or url_for('dashboard'))
+
data = load_data()
- if username not in data['users']:
- session.pop('username', None)
- flash('Пользователь не найден!')
- return redirect(url_for('login'))
- user_files = data['users'][username]['files']
- file_to_delete = next((file for file in user_files if file['unique_filename'] == unique_filename), None)
- if not file_to_delete:
- flash('Файл не найден!')
- return redirect(url_for('dashboard'))
+ user_data = data['users'][username]
+
+ # Find the file's parent folder in the data structure
+ path_parts = file_path.split('/')[2:] # Get parts after cloud_files/username/
+ unique_filename = path_parts[-1]
+ folder_path_list = path_parts[:-1]
+
+ target_folder_obj = navigate_to_path(user_data['root'], folder_path_list)
+ file_info_index = -1
+ original_filename = unique_filename # Fallback name
+
+ if target_folder_obj and 'files' in target_folder_obj:
+ for i, file_info in enumerate(target_folder_obj['files']):
+ if file_info.get('path') == file_path:
+ file_info_index = i
+ original_filename = file_info.get('original_filename', unique_filename)
+ break
+
+ # --- Deletion Logic ---
+ api = _get_hf_api()
+ if not api:
+ flash('Ошибка конфигурации сервера: не удается получить доступ к хранилищу.', 'error')
+ return redirect(request.referrer or url_for('dashboard', path='/'.join(folder_path_list)))
+
try:
- api = HfApi()
+ logging.info(f"Attempting to delete HF file: {file_path}")
api.delete_file(
- path_in_repo=file_to_delete['path'],
+ path_in_repo=file_path,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
- commit_message=f"Deleted file {file_to_delete['path']} for {username}"
+ commit_message=f"User {username} deleted file {original_filename}"
)
- data['users'][username]['files'] = [f for f in user_files if f['unique_filename'] != unique_filename]
- save_data(data)
- flash('Файл успешно удален!')
- except Exception as e:
- logging.error(f"Error deleting file: {e}")
- flash('Ошибка удаления файла!')
- return redirect(url_for('dashboard', folder_path=file_to_delete['folder']))
-
-@app.route('/delete_folder/')
-def delete_folder(folder_id):
- if 'username' not in session:
- flash('Пожалуйста, войдите в систему!')
- return redirect(url_for('login'))
- username = session['username']
- data = load_data()
- if username not in data['users']:
- session.pop('username', None)
- flash('Пользователь не найден!')
- return redirect(url_for('login'))
- user_folders = data['users'][username].get('folders', [])
- folder_to_delete = next((folder for folder in user_folders if folder['id'] == folder_id), None)
- if not folder_to_delete:
- flash('Папка не найдена!')
- return redirect(url_for('dashboard'))
- try:
- api = HfApi()
- files_to_delete = [f for f in data['users'][username]['files'] if f['folder'].startswith(folder_to_delete['path'])]
- for file in files_to_delete:
- api.delete_file(
- path_in_repo=file['path'],
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE,
- commit_message=f"Deleted file {file['path']} for {username} during folder deletion"
- )
- data['users'][username]['files'] = [f for f in data['users'][username]['files'] if not f['folder'].startswith(folder_to_delete['path'])]
- data['users'][username]['folders'] = [f for f in user_folders if f['id'] != folder_id]
- save_data(data)
- flash('Папка и все её содержимое успешно удалены!')
+ logging.info(f"Successfully deleted HF file: {file_path}")
+
+ # If HF deletion successful (or file not found on HF), remove from JSON
+ if file_info_index != -1:
+ del target_folder_obj['files'][file_info_index]
+ save_data(data)
+ flash(f'Файл "{original_filename}" успешно удален.', 'success')
+ logging.info(f"User {username} successfully deleted file record: {file_path}")
+ else:
+ # File was deleted from HF, but not found in local DB (maybe inconsistent state?)
+ flash(f'Файл "{original_filename}" удален из хранилища, но не найден в локальном списке.', 'warning')
+ logging.warning(f"File record not found in DB for deleted path: {file_path}")
+ # Optionally, still save data if other changes occurred? For now, no save needed.
+
except Exception as e:
- logging.error(f"Error deleting folder: {e}")
- flash('Ошибка удаления папки!')
- return redirect(url_for('dashboard', folder_path=folder_to_delete['parent']))
+ logging.error(f"Error deleting file '{file_path}' for user {username}: {e}", exc_info=True)
+ # Check if it's a 'not found' error - maybe already deleted?
+ if "404" in str(e) or "not found" in str(e).lower():
+ logging.warning(f"File {file_path} likely already deleted on HF. Removing from local DB if exists.")
+ if file_info_index != -1:
+ del target_folder_obj['files'][file_info_index]
+ try:
+ save_data(data)
+ flash(f'Файл "{original_filename}" удален (возможно, уже был удален из хранилища).', 'warning')
+ except Exception as save_e:
+ flash('Ошибка при обновлении списка файлов после удаления.', 'error')
+ logging.error(f"Failed to save data after attempting local delete of file {file_path}: {save_e}")
+ else:
+ flash(f'Файл "{original_filename}" не найден.', 'error')
+ else:
+ flash(f'Ошибка при удалении файла "{original_filename}". {e}', 'error')
+
+ return redirect(url_for('dashboard', path='/'.join(folder_path_list)))
@app.route('/logout')
def logout():
- session.pop('username', None)
+ username = session.pop('username', None)
+ # No need for client-side local storage clearing here anymore,
+ # the login page JS handles clearing on failed auto-login or successful manual login.
+ # Manual logout on server is sufficient.
+ if username:
+ flash('Вы успешно вышли из системы.', 'success')
+ logging.info(f"User {username} logged out.")
return redirect(url_for('login'))
+
+# --- Admin Routes (Keep basic for now) ---
+# WARNING: These routes lack proper admin authentication! Add checks!
+def admin_required(f):
+ @wraps(f)
+ def decorated_function(*args, **kwargs):
+ # FIXME: Implement actual admin check here!
+ # Example: if session.get('is_admin') != True:
+ if session.get('username') != 'admin': # Example: only user 'admin' is admin
+ flash('Доступ запрещен. Требуются права администратора.', 'error')
+ return redirect(url_for('login'))
+ return f(*args, **kwargs)
+ return decorated_function
+
+
@app.route('/admhosto')
+@admin_required
def admin_panel():
data = load_data()
- users = data['users']
+ users = data.get('users', {})
+
+ # Calculate total files per user (recursive helper needed)
+ def count_files_recursive(folder_obj):
+ count = len(folder_obj.get('files', []))
+ for sub_folder in folder_obj.get('folders', {}).values():
+ count += count_files_recursive(sub_folder)
+ return count
+
+ users_with_counts = {}
+ for uname, udata in users.items():
+ file_count = count_files_recursive(udata.get('root', initialize_user_root()))
+ users_with_counts[uname] = {
+ 'created_at': udata.get('created_at', 'N/A'),
+ 'file_count': file_count
+ }
+
html = '''
-
-
- Админ-панель - Zeus Cloud
-
+
+ Админ-панель - Zeus Cloud V2
+
+
-
Админ-панель Zeus Cloud
+
Админ-панель
+ Назад в Дашборд
+ {% with messages = get_flashed_messages(with_categories=true) %}
+ {% if messages %}
+ {% for category, message in messages %}
{{ message }}
{% endfor %}
+
{% endif %}
+ {% endwith %}
Список пользователей
- {% for username, user_data in users.items() %}
+ {% for username, user_info in users_with_counts.items() %}
-
-
-
-'''
- return render_template_string(html, username=username, user_files=user_files, user_folders=user_folders, repo_id=REPO_ID, current_folder=current_folder, parent_folder=parent_folder, os=os)
+ Назад к списку пользователей
+
+ '''
+ return render_template_string(html, username=username, path=path, sorted_files=sorted_files, sorted_folders=sorted_folders, admin_breadcrumbs=admin_breadcrumbs, REPO_ID=REPO_ID, encode_path=encode_path_for_url, get_hf_file_url=get_hf_file_url_template)
+
@app.route('/admhosto/delete_user/', methods=['POST'])
+@admin_required
def admin_delete_user(username):
+ admin_username = session['username'] # For logging
+ if username == admin_username: # Prevent admin from deleting themselves easily
+ flash('Администратор не может удалить сам себя через эту форму.', 'error')
+ return redirect(url_for('admin_panel'))
+
data = load_data()
- if username not in data['users']:
- flash('Пользователь не найден!')
+ if username not in data.get('users', {}):
+ flash(f'Пользователь "{username}" не найден!', 'error')
return redirect(url_for('admin_panel'))
- user_files_to_delete = data['users'][username].get('files', [])
- user_folders_to_delete = data['users'][username].get('folders', [])
+
+ api = _get_hf_api()
+ if not api:
+ flash('Ошибка конфигурации сервера: не удается получить доступ к хранилищу.', 'error')
+ return redirect(url_for('admin_panel'))
+
+ hf_user_folder_path = f"cloud_files/{username}"
+
+ try:
+ logging.warning(f"Admin '{admin_username}' initiating deletion of user '{username}' and folder '{hf_user_folder_path}'")
+ # Attempt to delete the entire user folder on Hugging Face
+ api.delete_folder(
+ folder_path=hf_user_folder_path,
+ repo_id=REPO_ID,
+ repo_type="dataset",
+ token=HF_TOKEN_WRITE,
+ commit_message=f"Admin {admin_username} deleted user {username} and all files"
+ # ignore_patterns=["*.keep"] # Example if needed
+ )
+ logging.info(f"Successfully deleted HF folder '{hf_user_folder_path}' for user '{username}'.")
+
+ except Exception as e:
+ # Check if folder not found (maybe already deleted or never existed)
+ if "404" in str(e) or "not found" in str(e).lower():
+ logging.warning(f"HF folder '{hf_user_folder_path}' not found during deletion for user '{username}'. Proceeding with DB removal.")
+ else:
+ # Log the error but proceed with DB deletion - admin action
+ logging.error(f"Error deleting HF folder '{hf_user_folder_path}' for user '{username}': {e}. Proceeding with DB removal anyway.", exc_info=True)
+ flash(f'Ошибка при удалении файлов пользователя {username} из хранилища, но пользователь будет удален из базы данных.', 'warning')
+
+ # Regardless of HF deletion result (unless it was a critical config error handled earlier), remove user from DB
try:
- api = HfApi()
- paths_to_delete = [file['path'] for file in user_files_to_delete]
- if paths_to_delete:
- try:
- api.delete_folder(
- folder_path=f"cloud_files/{username}",
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE,
- commit_message=f"Deleted all files for user {username} by admin"
- )
- logging.info(f"Successfully deleted folder for user {username}")
- except Exception as folder_delete_error:
- logging.warning(f"Could not delete folder for {username}, attempting individual file deletion: {folder_delete_error}")
- for file_path in paths_to_delete:
- try:
- api.delete_file(
- path_in_repo=file_path,
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE
- )
- except Exception as file_delete_error:
- logging.error(f"Error deleting file {file_path} for user {username}: {file_delete_error}")
del data['users'][username]
save_data(data)
- flash(f'Пользователь {username} и его файлы и папки успешно удалены!')
- logging.info(f"Admin deleted user {username} and their files and folders.")
+ flash(f'Пользователь "{username}" и его файлы (если были) успешно удалены.', 'success')
+ logging.info(f"Admin '{admin_username}' successfully deleted user '{username}' from database.")
except Exception as e:
- logging.error(f"Error deleting user {username}: {e}")
- flash(f'Ошибка при удалении пользователя {username}!')
+ flash(f'Ошибка при удалении пользователя "{username}" из базы данных.', 'error')
+ logging.error(f"Failed to save data after deleting user '{username}' from DB: {e}")
+
+
return redirect(url_for('admin_panel'))
-@app.route('/admhosto/delete_file//', methods=['POST'])
-def admin_delete_file(username, unique_filename):
+@app.route('/admhosto/delete_file/', methods=['POST'])
+@admin_required
+def admin_delete_file(username):
+ admin_username = session['username'] # For logging
+ import base64
+ file_path_encoded = request.form.get('file_path_encoded')
+ if not file_path_encoded:
+ flash('Не указан файл для удаления.', 'error')
+ return redirect(request.referrer or url_for('admin_user_files', username=username))
+
+ try:
+ file_path = base64.urlsafe_b64decode(file_path_encoded.encode()).decode()
+ except Exception:
+ flash('Неверная ссылка на файл.', 'error')
+ return redirect(request.referrer or url_for('admin_user_files', username=username))
+
+ # Validate the path structure belongs to the specified user
+ if not file_path.startswith(f"cloud_files/{username}/"):
+ flash('Путь к файлу не соответствует пользователю.', 'error')
+ logging.warning(f"Admin '{admin_username}' attempted delete with mismatched path/user: {file_path} for {username}")
+ return redirect(request.referrer or url_for('admin_user_files', username=username))
+
data = load_data()
- if username not in data['users']:
- flash('Пользователь не найден!')
- return redirect(url_for('admin_panel'))
- user_files = data['users'][username].get('files', [])
- file_to_delete = next((f for f in user_files if f['unique_filename'] == unique_filename), None)
- if not file_to_delete:
- flash('Файл не найден!')
- return redirect(url_for('admin_user_files', username=username))
+ if username not in data.get('users', {}):
+ flash(f'Пользователь "{username}" не найден!', 'error')
+ return redirect(url_for('admin_panel'))
+
+ user_data = data['users'][username]
+
+ # Find the file in the data structure
+ path_parts = file_path.split('/')[2:]
+ unique_filename = path_parts[-1]
+ folder_path_list = path_parts[:-1]
+ target_folder_obj = navigate_to_path(user_data.get('root', initialize_user_root()), folder_path_list)
+
+ file_info_index = -1
+ original_filename = unique_filename
+ if target_folder_obj and 'files' in target_folder_obj:
+ for i, file_info in enumerate(target_folder_obj['files']):
+ if file_info.get('path') == file_path:
+ file_info_index = i
+ original_filename = file_info.get('original_filename', unique_filename)
+ break
+
+ # --- Deletion Logic ---
+ api = _get_hf_api()
+ if not api:
+ flash('Ошибка конфигурации сервера: не удается получить доступ к хранилищу.', 'error')
+ return redirect(request.referrer or url_for('admin_user_files', username=username, path='/'.join(folder_path_list)))
+
try:
- api = HfApi()
+ logging.info(f"Admin '{admin_username}' attempting to delete HF file: {file_path} for user '{username}'")
api.delete_file(
- path_in_repo=file_to_delete['path'],
+ path_in_repo=file_path,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_TOKEN_WRITE,
- commit_message=f"Admin deleted file {file_to_delete['path']} for user {username}"
+ commit_message=f"Admin {admin_username} deleted file {original_filename} for user {username}"
)
- data['users'][username]['files'] = [f for f in user_files if f['unique_filename'] != unique_filename]
- save_data(data)
- flash('Файл успешно удален!')
- logging.info(f"Admin deleted file {unique_filename} for user {username}")
- except Exception as e:
- logging.error(f"Error deleting file {unique_filename} by admin: {e}")
- flash('Ошибка удаления файла!')
- return redirect(url_for('admin_user_files', username=username))
+ logging.info(f"Admin '{admin_username}' successfully deleted HF file: {file_path}")
+
+ # Remove from DB if found
+ if file_info_index != -1:
+ del target_folder_obj['files'][file_info_index]
+ save_data(data)
+ flash(f'Файл "{original_filename}" пользователя "{username}" успешно удален.', 'success')
+ logging.info(f"Admin '{admin_username}' successfully deleted file record: {file_path}")
+ else:
+ flash(f'Файл "{original_filename}" удален из хранилища, но не найден в базе данных пользователя "{username}".', 'warning')
+ logging.warning(f"Admin '{admin_username}' deleted file {file_path} from HF, but record not found in DB.")
-@app.route('/admhosto/delete_folder//', methods=['POST'])
-def admin_delete_folder(username, folder_id):
- data = load_data()
- if username not in data['users']:
- flash('Пользователь не найден!')
- return redirect(url_for('admin_panel'))
- user_folders = data['users'][username].get('folders', [])
- folder_to_delete = next((folder for folder in user_folders if folder['id'] == folder_id), None)
- if not folder_to_delete:
- flash('Папка не найдена!')
- return redirect(url_for('admin_user_files', username=username))
- try:
- api = HfApi()
- files_to_delete = [f for f in data['users'][username]['files'] if f['folder'].startswith(folder_to_delete['path'])]
- for file in files_to_delete:
- api.delete_file(
- path_in_repo=file['path'],
- repo_id=REPO_ID,
- repo_type="dataset",
- token=HF_TOKEN_WRITE,
- commit_message=f"Deleted file {file['path']} for {username} during folder deletion"
- )
- data['users'][username]['files'] = [f for f in data['users'][username]['files'] if not f['folder'].startswith(folder_to_delete['path'])]
- data['users'][username]['folders'] = [f for f in user_folders if f['id'] != folder_id]
- save_data(data)
- flash('Папка и все её содержимое успешно удалены!')
- logging.info(f"Admin deleted folder {folder_to_delete['name']} for user {username}")
except Exception as e:
- logging.error(f"Error deleting folder {folder_id} by admin: {e}")
- flash('Ошибка удаления папки!')
- return redirect(url_for('admin_user_files', username=username))
+ logging.error(f"Admin '{admin_username}' error deleting file '{file_path}' for user '{username}': {e}", exc_info=True)
+ if "404" in str(e) or "not found" in str(e).lower():
+ logging.warning(f"File {file_path} already deleted on HF (Admin action). Removing from local DB if exists.")
+ if file_info_index != -1:
+ del target_folder_obj['files'][file_info_index]
+ try:
+ save_data(data)
+ flash(f'Файл "{original_filename}" удален (был удален из хранилища).', 'warning')
+ except Exception as save_e:
+ flash('Ошибка при обновлении списка файлов после удаления.', 'error')
+ logging.error(f"Failed to save data after admin attempted local delete of file {file_path}: {save_e}")
+ else:
+ flash(f'Файл "{original_filename}" не найден.', 'error')
+ else:
+ flash(f'Ошибка при удалении файла "{original_filename}" пользователя "{username}". {e}', 'error')
+
+ # Redirect back to the user's file view in admin panel
+ return redirect(url_for('admin_user_files', username=username, path='/'.join(folder_path_list)))
+
+
+# --- App Initialization ---
+from datetime import timedelta
if __name__ == '__main__':
+ app.permanent_session_lifetime = timedelta(days=30) # Set session timeout
+
+ # Initial check/creation of data file and uploads dir
+ if not os.path.exists(DATA_FILE):
+ logging.info(f"{DATA_FILE} not found, attempting initial download or creation.")
+ # Try downloading first, it will create empty if download fails/no token
+ download_db_from_hf()
+ if not os.path.exists(DATA_FILE): # Still doesn't exist
+ with open(DATA_FILE, 'w', encoding='utf-8') as f:
+ json.dump({'users': {}}, f)
+ logging.info(f"Created empty {DATA_FILE}.")
+
+ os.makedirs('uploads', exist_ok=True)
+
+ # Token checks
if not HF_TOKEN_WRITE:
- logging.warning("HF_TOKEN (write access) is not set. File uploads and deletions will fail.")
+ logging.warning("!!! HF_TOKEN (write access) is not set. File uploads, deletions, and backups WILL FAIL.")
if not HF_TOKEN_READ:
- logging.warning("HF_TOKEN_READ is not set. Falling back to HF_TOKEN. File downloads might fail for private repos if HF_TOKEN is not set.")
+ logging.warning("!!! HF_TOKEN_READ is not set. Falling back to HF_TOKEN. File downloads/previews might fail for private repos if HF_TOKEN is also not set.")
+
+ # Start periodic backup thread only if write token exists
if HF_TOKEN_WRITE:
- threading.Thread(target=periodic_backup, daemon=True).start()
+ backup_thread = threading.Thread(target=periodic_backup, daemon=True)
+ backup_thread.start()
else:
- logging.warning("Periodic backup disabled because HF_TOKEN (write access) is not set.")
- app.run(debug=False, host='0.0.0.0', port=7860)
\ No newline at end of file
+ logging.warning("Periodic backup thread NOT started because HF_TOKEN (write access) is not set.")
+
+ # Run Flask app
+ # Use Waitress or Gunicorn in production instead of Flask's built-in server
+ logging.info("Starting Flask application server...")
+ app.run(debug=False, host='0.0.0.0', port=7860)
+
+# --- END OF FILE app.py ---
\ No newline at end of file