Spaces:
Sleeping
Sleeping
| """Hugging Face Hub storage backend for DocVault.""" | |
| from __future__ import annotations | |
| import os | |
| from datetime import datetime, timezone | |
| from typing import Any, Dict, List | |
| from huggingface_hub import ( | |
| CommitOperationAdd, | |
| CommitOperationCopy, | |
| CommitOperationDelete, | |
| HfApi, | |
| hf_hub_url, | |
| ) | |
| from server import config | |
| from server.storage.interface import StorageInterface | |
| from server.utils.logger import setup_logger | |
| from server.utils.validators import PathValidator, sanitize_filename | |
| logger = setup_logger(__name__) | |
| class HuggingFaceStorageManager(StorageInterface): | |
| """Stores all files and folders in a Hugging Face dataset repository.""" | |
| def __init__(self) -> None: | |
| self.api = HfApi(token=config.HF_TOKEN) if config.HF_TOKEN else HfApi() | |
| self._repo_ready = False | |
| if config.HF_TOKEN: | |
| self._ensure_repo_exists() | |
| def _ensure_token(self) -> None: | |
| if not config.HF_TOKEN: | |
| raise RuntimeError( | |
| "HF_TOKEN is required for write operations. Set HF_TOKEN or HUGGING_FACE_HUB_TOKEN in the Space secrets." | |
| ) | |
| def _ensure_repo_exists(self) -> None: | |
| self._ensure_token() | |
| try: | |
| self.api.repo_info(repo_id=config.HF_REPO_ID, repo_type=config.HF_REPO_TYPE) | |
| self._repo_ready = True | |
| except Exception: | |
| self.api.create_repo( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| private=True, | |
| exist_ok=True, | |
| ) | |
| self._repo_ready = True | |
| def _timestamp(self) -> str: | |
| return datetime.now(timezone.utc).isoformat() | |
| def _validate_relative_path(self, path: str, label: str = "path") -> str: | |
| normalized = PathValidator._normalize_relative_path(path) | |
| if not PathValidator.is_valid_path(normalized): | |
| raise ValueError(f"Invalid {label}: {path}") | |
| return normalized | |
| def _user_repo_path(self, user_id: str, path: str = "") -> str: | |
| normalized = self._validate_relative_path(path) | |
| base = self._validate_relative_path(user_id, "user_id") | |
| return "/".join(part for part in [base, normalized] if part) | |
| def _folder_marker_path(self, user_id: str, folder_path: str) -> str: | |
| repo_folder = self._user_repo_path(user_id, folder_path) | |
| return "/".join([repo_folder, config.FOLDER_MARKER]) if repo_folder else config.FOLDER_MARKER | |
| def _list_repo_files(self) -> List[str]: | |
| try: | |
| return self.api.list_repo_files( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| ) | |
| except Exception: | |
| return [] | |
| def _file_exists(self, repo_path: str, repo_files: List[str] | None = None) -> bool: | |
| repo_files = repo_files if repo_files is not None else self._list_repo_files() | |
| return repo_path in repo_files | |
| def _folder_exists(self, repo_folder_path: str, repo_files: List[str] | None = None) -> bool: | |
| repo_files = repo_files if repo_files is not None else self._list_repo_files() | |
| marker = f"{repo_folder_path}/{config.FOLDER_MARKER}" | |
| prefix = f"{repo_folder_path}/" | |
| return marker in repo_files or any(item.startswith(prefix) for item in repo_files) | |
| def _split_parent(self, path: str) -> tuple[str, str]: | |
| normalized = self._validate_relative_path(path) | |
| parent = os.path.dirname(normalized).replace("\\", "/").strip(".") | |
| parent = parent.strip("/") | |
| return parent, os.path.basename(normalized) | |
| def get_item_type(self, user_id: str, path: str) -> str | None: | |
| normalized = self._validate_relative_path(path) | |
| repo_path = self._user_repo_path(user_id, normalized) | |
| repo_files = self._list_repo_files() | |
| if repo_path in repo_files: | |
| return "file" | |
| if any(item.startswith(f"{repo_path}/") for item in repo_files): | |
| return "folder" | |
| return None | |
| def _next_available_file_path( | |
| self, user_id: str, folder_path: str, filename: str | |
| ) -> tuple[str, str]: | |
| folder_path = self._validate_relative_path(folder_path) | |
| filename = sanitize_filename(filename) | |
| if not PathValidator.is_valid_filename(filename): | |
| raise ValueError("Invalid filename.") | |
| repo_files = self._list_repo_files() | |
| name, ext = os.path.splitext(filename) | |
| counter = 0 | |
| while True: | |
| candidate_name = filename if counter == 0 else f"{name}_{counter}{ext}" | |
| candidate_repo_path = self._user_repo_path( | |
| user_id, | |
| "/".join(part for part in [folder_path, candidate_name] if part), | |
| ) | |
| if candidate_repo_path not in repo_files: | |
| relative_path = "/".join(part for part in [folder_path, candidate_name] if part) | |
| return relative_path, candidate_repo_path | |
| counter += 1 | |
| def create_folder(self, user_id: str, folder_path: str) -> Dict[str, Any]: | |
| self._ensure_token() | |
| folder_path = self._validate_relative_path(folder_path, "folder_path") | |
| if not folder_path: | |
| return {"success": False, "error": "folder_path is required", "code": "INVALID_FOLDER"} | |
| repo_folder_path = self._user_repo_path(user_id, folder_path) | |
| repo_files = self._list_repo_files() | |
| if self._folder_exists(repo_folder_path, repo_files): | |
| return {"success": False, "error": "Folder already exists", "code": "FOLDER_EXISTS"} | |
| marker_path = self._folder_marker_path(user_id, folder_path) | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=[CommitOperationAdd(path_in_repo=marker_path, path_or_fileobj=b"")], | |
| commit_message=f"Create folder {repo_folder_path}", | |
| ) | |
| folder_name = folder_path.split("/")[-1] | |
| return { | |
| "success": True, | |
| "message": f"Folder created: {folder_path}", | |
| "folder": self.standardize_folder( | |
| name=folder_name, | |
| path=folder_path, | |
| created_at=self._timestamp(), | |
| storage_type="hf", | |
| ), | |
| } | |
| def upload_file( | |
| self, user_id: str, folder_path: str, filename: str, file_obj: Any | |
| ) -> Dict[str, Any]: | |
| self._ensure_token() | |
| folder_path = self._validate_relative_path(folder_path, "folder_path") | |
| relative_path, repo_path = self._next_available_file_path(user_id, folder_path, filename) | |
| final_filename = relative_path.split("/")[-1] | |
| file_data = file_obj.read() if hasattr(file_obj, "read") else file_obj | |
| if not isinstance(file_data, (bytes, bytearray)): | |
| raise TypeError("Uploaded content must be bytes.") | |
| operations = [] | |
| if folder_path: | |
| marker_path = self._folder_marker_path(user_id, folder_path) | |
| if not self._file_exists(marker_path): | |
| operations.append( | |
| CommitOperationAdd(path_in_repo=marker_path, path_or_fileobj=b"") | |
| ) | |
| operations.append(CommitOperationAdd(path_in_repo=repo_path, path_or_fileobj=file_data)) | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=operations, | |
| commit_message=f"Upload {repo_path}", | |
| ) | |
| return { | |
| "success": True, | |
| "message": f"Uploaded file: {final_filename}", | |
| "file": self.standardize_file( | |
| name=final_filename, | |
| path=relative_path, | |
| size=len(file_data), | |
| created_at=self._timestamp(), | |
| storage_type="hf", | |
| ), | |
| } | |
| def delete_file(self, user_id: str, file_path: str) -> Dict[str, Any]: | |
| self._ensure_token() | |
| file_path = self._validate_relative_path(file_path, "file_path") | |
| repo_path = self._user_repo_path(user_id, file_path) | |
| if not self._file_exists(repo_path): | |
| return {"success": False, "error": "File not found", "code": "FILE_NOT_FOUND"} | |
| self.api.delete_file( | |
| path_in_repo=repo_path, | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| commit_message=f"Delete file {repo_path}", | |
| ) | |
| return {"success": True, "message": f"Deleted file: {file_path}"} | |
| def rename_file(self, user_id: str, file_path: str, new_name: str) -> Dict[str, Any]: | |
| self._ensure_token() | |
| file_path = self._validate_relative_path(file_path, "file_path") | |
| new_name = sanitize_filename(new_name) | |
| if not PathValidator.is_valid_filename(new_name): | |
| return {"success": False, "error": "Invalid characters in name", "code": "INVALID_NAME"} | |
| parent, _ = self._split_parent(file_path) | |
| new_relative_path = "/".join(part for part in [parent, new_name] if part) | |
| source_repo_path = self._user_repo_path(user_id, file_path) | |
| target_repo_path = self._user_repo_path(user_id, new_relative_path) | |
| repo_files = self._list_repo_files() | |
| if source_repo_path not in repo_files: | |
| return {"success": False, "error": "File not found", "code": "NOT_FOUND"} | |
| if target_repo_path in repo_files: | |
| return {"success": False, "error": "An item with this name already exists", "code": "CONFLICT"} | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=[ | |
| CommitOperationCopy( | |
| src_path_in_repo=source_repo_path, path_in_repo=target_repo_path | |
| ), | |
| CommitOperationDelete(path_in_repo=source_repo_path), | |
| ], | |
| commit_message=f"Rename file {source_repo_path} -> {target_repo_path}", | |
| ) | |
| return { | |
| "success": True, | |
| "message": f"Renamed to {new_name}", | |
| "item": {"name": new_name, "type": "file", "path": new_relative_path}, | |
| } | |
| def delete_folder(self, user_id: str, folder_path: str) -> Dict[str, Any]: | |
| self._ensure_token() | |
| folder_path = self._validate_relative_path(folder_path, "folder_path") | |
| repo_folder_path = self._user_repo_path(user_id, folder_path) | |
| repo_files = self._list_repo_files() | |
| prefix = f"{repo_folder_path}/" | |
| matches = [item for item in repo_files if item.startswith(prefix)] | |
| if not matches: | |
| return { | |
| "success": False, | |
| "error": "Folder not found", | |
| "code": "FOLDER_NOT_FOUND", | |
| } | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=[CommitOperationDelete(path_in_repo=item) for item in matches], | |
| commit_message=f"Delete folder {repo_folder_path}", | |
| ) | |
| return {"success": True, "message": f"Deleted folder: {folder_path}"} | |
| def rename_folder( | |
| self, user_id: str, folder_path: str, new_name: str | |
| ) -> Dict[str, Any]: | |
| self._ensure_token() | |
| folder_path = self._validate_relative_path(folder_path, "folder_path") | |
| new_name = sanitize_filename(new_name) | |
| if not PathValidator.is_valid_filename(new_name): | |
| return {"success": False, "error": "Invalid characters in name", "code": "INVALID_NAME"} | |
| parent, _ = self._split_parent(folder_path) | |
| new_relative_path = "/".join(part for part in [parent, new_name] if part) | |
| source_repo_path = self._user_repo_path(user_id, folder_path) | |
| target_repo_path = self._user_repo_path(user_id, new_relative_path) | |
| repo_files = self._list_repo_files() | |
| source_prefix = f"{source_repo_path}/" | |
| target_prefix = f"{target_repo_path}/" | |
| matches = [item for item in repo_files if item.startswith(source_prefix)] | |
| if not matches: | |
| return {"success": False, "error": "Folder not found", "code": "NOT_FOUND"} | |
| if any(item.startswith(target_prefix) for item in repo_files): | |
| return {"success": False, "error": "An item with this name already exists", "code": "CONFLICT"} | |
| operations = [] | |
| for item in matches: | |
| suffix = item[len(source_prefix) :] | |
| operations.append( | |
| CommitOperationCopy( | |
| src_path_in_repo=item, path_in_repo=f"{target_prefix}{suffix}" | |
| ) | |
| ) | |
| operations.append(CommitOperationDelete(path_in_repo=item)) | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=operations, | |
| commit_message=f"Rename folder {source_repo_path} -> {target_repo_path}", | |
| ) | |
| return { | |
| "success": True, | |
| "message": f"Renamed to {new_name}", | |
| "item": {"name": new_name, "type": "folder", "path": new_relative_path}, | |
| } | |
| def download(self, user_id: str, file_path: str) -> Dict[str, Any]: | |
| file_path = self._validate_relative_path(file_path, "file_path") | |
| repo_path = self._user_repo_path(user_id, file_path) | |
| if not self._file_exists(repo_path): | |
| raise FileNotFoundError(f"File not found: {file_path}") | |
| return { | |
| "url": hf_hub_url( | |
| repo_id=config.HF_REPO_ID, | |
| filename=repo_path, | |
| repo_type=config.HF_REPO_TYPE, | |
| ), | |
| "path": file_path, | |
| } | |
| def list(self, user_id: str, prefix: str = "") -> Dict[str, List[Dict[str, Any]]]: | |
| prefix = self._validate_relative_path(prefix, "folder_path") | |
| repo_prefix = self._user_repo_path(user_id, prefix) | |
| search_prefix = f"{repo_prefix}/" if repo_prefix else f"{self._user_repo_path(user_id)}/" | |
| repo_files = self._list_repo_files() | |
| folders_map: Dict[str, Dict[str, Any]] = {} | |
| files: List[Dict[str, Any]] = [] | |
| for repo_item in repo_files: | |
| if not repo_item.startswith(search_prefix): | |
| continue | |
| relative = repo_item[len(search_prefix) :] | |
| if not relative: | |
| continue | |
| parts = relative.split("/") | |
| if len(parts) > 1: | |
| folder_name = parts[0] | |
| folder_path = "/".join(part for part in [prefix, folder_name] if part) | |
| folders_map.setdefault( | |
| folder_name, | |
| self.standardize_folder( | |
| name=folder_name, | |
| path=folder_path, | |
| created_at=self._timestamp(), | |
| storage_type="hf", | |
| ), | |
| ) | |
| continue | |
| if parts[0] == config.FOLDER_MARKER: | |
| continue | |
| file_path = "/".join(part for part in [prefix, parts[0]] if part) | |
| files.append( | |
| self.standardize_file( | |
| name=parts[0], | |
| path=file_path, | |
| size=0, | |
| created_at=self._timestamp(), | |
| storage_type="hf", | |
| ) | |
| ) | |
| folders = sorted(folders_map.values(), key=lambda item: item["name"].lower()) | |
| files.sort(key=lambda item: item["name"].lower()) | |
| return { | |
| "success": True, | |
| "folders": folders, | |
| "files": files, | |
| "total_folders": len(folders), | |
| "total_files": len(files), | |
| } | |
| def exists(self, user_id: str, path: str) -> bool: | |
| try: | |
| normalized = self._validate_relative_path(path) | |
| repo_path = self._user_repo_path(user_id, normalized) | |
| repo_files = self._list_repo_files() | |
| return repo_path in repo_files or any( | |
| item.startswith(f"{repo_path}/") for item in repo_files | |
| ) | |
| except Exception: | |
| return False | |
| def get_stats(self, user_id: str) -> Dict[str, Any]: | |
| repo_prefix = f"{self._user_repo_path(user_id)}/" | |
| repo_files = self._list_repo_files() | |
| file_count = 0 | |
| folder_names = set() | |
| for repo_item in repo_files: | |
| if not repo_item.startswith(repo_prefix): | |
| continue | |
| relative = repo_item[len(repo_prefix) :] | |
| if relative.endswith(f"/{config.FOLDER_MARKER}"): | |
| folder_names.add(relative[: -len(f'/{config.FOLDER_MARKER}')]) | |
| continue | |
| if relative == config.FOLDER_MARKER: | |
| continue | |
| if "/" in relative: | |
| parts = relative.split("/")[:-1] | |
| for index in range(1, len(parts) + 1): | |
| folder_names.add("/".join(parts[:index])) | |
| file_count += 1 | |
| return { | |
| "success": True, | |
| "total_size": 0, | |
| "total_size_formatted": "0 B", | |
| "total_files": file_count, | |
| "total_folders": len(folder_names), | |
| "storage_type": "hf", | |
| "repo_id": config.HF_REPO_ID, | |
| } | |
| def get_history(self, user_id: str, path: str) -> List[Dict[str, Any]]: | |
| self._ensure_token() | |
| repo_path = self._user_repo_path(user_id, path) | |
| commits = self.api.list_repo_commits( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| ) | |
| history = [] | |
| for commit in commits: | |
| title = getattr(commit, "title", "") or getattr(commit, "message", "") | |
| if repo_path not in title: | |
| continue | |
| history.append( | |
| { | |
| "id": commit.commit_id, | |
| "message": title, | |
| "timestamp": commit.created_at.isoformat(), | |
| "author": commit.authors[0] if getattr(commit, "authors", None) else "unknown", | |
| } | |
| ) | |
| return history | |
| def restore( | |
| self, user_id: str, path: str, revision: str, as_copy: bool = False | |
| ) -> Dict[str, Any]: | |
| self._ensure_token() | |
| path = self._validate_relative_path(path) | |
| source_repo_path = self._user_repo_path(user_id, path) | |
| destination_repo_path = source_repo_path | |
| destination_relative_path = path | |
| if as_copy: | |
| parent, filename = self._split_parent(path) | |
| stem, ext = os.path.splitext(filename) | |
| timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") | |
| copy_name = f"{stem}_{timestamp}{ext}" | |
| destination_relative_path = "/".join( | |
| part for part in [parent, copy_name] if part | |
| ) | |
| destination_repo_path = self._user_repo_path(user_id, destination_relative_path) | |
| self.api.create_commit( | |
| repo_id=config.HF_REPO_ID, | |
| repo_type=config.HF_REPO_TYPE, | |
| operations=[ | |
| CommitOperationCopy( | |
| src_path_in_repo=source_repo_path, | |
| path_in_repo=destination_repo_path, | |
| src_revision=revision, | |
| ) | |
| ], | |
| commit_message=f"Restore {source_repo_path} from {revision}", | |
| ) | |
| return { | |
| "success": True, | |
| "message": f"Restored {path}", | |
| "item": { | |
| "name": destination_relative_path.split("/")[-1], | |
| "path": destination_relative_path, | |
| }, | |
| } | |