Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Storage Manager | |
| ============================= | |
| Handles syncing storage files and uploaded PDFs with HF private repository. | |
| Functions: | |
| - sync_storage_from_hf() β Download storage/ and uploaded_pdfs/ on startup | |
| - push_storage_to_hf() β Upload storage/ and uploaded_pdfs/ after changes | |
| """ | |
| import os | |
| from typing import Optional | |
| from huggingface_hub import HfApi, hf_hub_download, login | |
| class HFStorageManager: | |
| """Manages persistent storage sync with Hugging Face repository.""" | |
| def __init__(self, hf_token: Optional[str], hf_repo: str): | |
| """ | |
| Initialize HF Storage Manager. | |
| Args: | |
| hf_token: Hugging Face API token with write access | |
| hf_repo: HF repository ID (e.g., "username/repo-name") | |
| """ | |
| self.hf_token = hf_token | |
| self.hf_repo = hf_repo | |
| self.enabled = bool(hf_token and hf_repo) | |
| self.api = None | |
| if self.enabled: | |
| try: | |
| login(token=hf_token, add_to_git_credential=True) | |
| self.api = HfApi() | |
| print(f"β HF Storage Manager initialized: {hf_repo}") | |
| except Exception as e: | |
| print(f"β οΈ HF login failed: {e}") | |
| self.enabled = False | |
| else: | |
| print("β οΈ HF Storage disabled (HF_TOKEN or HF_REPO not set)") | |
| def sync_storage_from_hf(self, user_id: str) -> bool: | |
| """ | |
| Download storage files and uploaded PDFs for a specific user from HF repo. | |
| Downloads: | |
| - users/{user_id}/storage/faiss.index | |
| - users/{user_id}/storage/metadata.json | |
| - users/{user_id}/storage/documents.json | |
| - users/{user_id}/uploaded_pdfs/*.pdf | |
| Args: | |
| user_id: User identifier (12-char hash from API key) | |
| Returns: | |
| bool: True if sync successful, False otherwise | |
| """ | |
| if not self.enabled: | |
| print(f"β οΈ HF Storage sync skipped for user {user_id} (disabled)") | |
| return False | |
| try: | |
| # Setup local directories for this user | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| user_base = os.path.join(base_dir, "users", user_id) | |
| storage_dir = os.path.join(user_base, "storage") | |
| uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs") | |
| os.makedirs(storage_dir, exist_ok=True) | |
| os.makedirs(uploaded_pdfs_dir, exist_ok=True) | |
| print(f"π₯ Syncing storage for user {user_id} from HF repo: {self.hf_repo}") | |
| # Download storage files (FAISS index and metadata) | |
| storage_files = ["faiss.index", "metadata.json", "documents.json"] | |
| downloaded_count = 0 | |
| for filename in storage_files: | |
| try: | |
| downloaded_path = hf_hub_download( | |
| repo_id=self.hf_repo, | |
| filename=f"users/{user_id}/storage/{filename}", | |
| token=self.hf_token, | |
| repo_type="model", | |
| local_dir=base_dir, | |
| local_dir_use_symlinks=False | |
| ) | |
| downloaded_count += 1 | |
| print(f" β Downloaded: users/{user_id}/storage/{filename}") | |
| except Exception as e: | |
| # File doesn't exist yet in HF repo (first run is okay) | |
| print(f" β οΈ Could not download users/{user_id}/storage/{filename}: {str(e)[:100]}") | |
| # Download all uploaded PDF files for this user | |
| try: | |
| # List all files in user's uploaded_pdfs/ folder | |
| files_in_repo = self.api.list_repo_files( | |
| repo_id=self.hf_repo, | |
| token=self.hf_token | |
| ) | |
| pdf_files = [ | |
| f for f in files_in_repo | |
| if f.startswith(f"users/{user_id}/uploaded_pdfs/") and f.endswith(".pdf") | |
| ] | |
| print(f" Found {len(pdf_files)} PDF files for user {user_id}") | |
| for pdf_file in pdf_files: | |
| try: | |
| hf_hub_download( | |
| repo_id=self.hf_repo, | |
| filename=pdf_file, | |
| token=self.hf_token, | |
| repo_type="model", | |
| local_dir=base_dir, | |
| local_dir_use_symlinks=False | |
| ) | |
| print(f" β Downloaded: {pdf_file}") | |
| except Exception as e: | |
| print(f" β οΈ Could not download {pdf_file}: {str(e)[:100]}") | |
| except Exception as e: | |
| # uploaded_pdfs folder doesn't exist yet in repo | |
| print(f" β οΈ Could not list PDF files for user {user_id}: {str(e)[:100]}") | |
| print(f"β HF Storage sync complete for user {user_id} ({downloaded_count} storage files)") | |
| return True | |
| except Exception as e: | |
| print(f"β HF Storage sync failed for user {user_id}: {e}") | |
| return False | |
| def push_storage_to_hf(self, user_id: str, commit_message: str = "Update storage") -> bool: | |
| """ | |
| Upload storage files and uploaded PDFs for a specific user to HF repo. | |
| Uploads: | |
| - users/{user_id}/storage/ folder (FAISS index and metadata) | |
| - users/{user_id}/uploaded_pdfs/ folder (PDF files) | |
| Args: | |
| user_id: User identifier (12-char hash from API key) | |
| commit_message: Commit message for the upload | |
| Returns: | |
| bool: True if push successful, False otherwise | |
| """ | |
| if not self.enabled: | |
| print(f"β οΈ HF Storage push skipped for user {user_id} (disabled)") | |
| return False | |
| try: | |
| base_dir = os.path.dirname(os.path.abspath(__file__)) | |
| user_base = os.path.join(base_dir, "users", user_id) | |
| storage_dir = os.path.join(user_base, "storage") | |
| uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs") | |
| print(f"π€ Pushing storage for user {user_id} to HF repo: {self.hf_repo}") | |
| upload_count = 0 | |
| # Upload storage folder (FAISS index and metadata) | |
| if os.path.exists(storage_dir) and os.listdir(storage_dir): | |
| try: | |
| self.api.upload_folder( | |
| folder_path=storage_dir, | |
| repo_id=self.hf_repo, | |
| path_in_repo=f"users/{user_id}/storage", | |
| token=self.hf_token, | |
| repo_type="model", | |
| commit_message=f"[User {user_id}] {commit_message}" | |
| ) | |
| upload_count += 1 | |
| print(f" β Uploaded: users/{user_id}/storage/ folder") | |
| except Exception as e: | |
| print(f" β Failed to upload storage for user {user_id}: {str(e)[:100]}") | |
| # Upload uploaded_pdfs folder | |
| if os.path.exists(uploaded_pdfs_dir) and os.listdir(uploaded_pdfs_dir): | |
| try: | |
| self.api.upload_folder( | |
| folder_path=uploaded_pdfs_dir, | |
| repo_id=self.hf_repo, | |
| path_in_repo=f"users/{user_id}/uploaded_pdfs", | |
| token=self.hf_token, | |
| repo_type="model", | |
| commit_message=f"[User {user_id}] {commit_message}" | |
| ) | |
| upload_count += 1 | |
| print(f" β Uploaded: users/{user_id}/uploaded_pdfs/ folder") | |
| except Exception as e: | |
| print(f" β Failed to upload PDFs for user {user_id}: {str(e)[:100]}") | |
| print(f"β HF Storage push complete for user {user_id} ({upload_count} folders)") | |
| return True | |
| except Exception as e: | |
| print(f"β HF Storage push failed for user {user_id}: {e}") | |
| return False | |
| # ============================================ | |
| # CONVENIENCE FUNCTIONS | |
| # ============================================ | |
| def create_hf_storage_manager( | |
| hf_token: Optional[str] = None, | |
| hf_repo: Optional[str] = None | |
| ) -> HFStorageManager: | |
| """ | |
| Create and return an HF Storage Manager instance. | |
| Args: | |
| hf_token: HF token (reads from env if not provided) | |
| hf_repo: HF repo ID (reads from env if not provided) | |
| Returns: | |
| HFStorageManager instance | |
| """ | |
| if hf_token is None: | |
| hf_token = os.environ.get("HF_TOKEN") | |
| if hf_repo is None: | |
| hf_repo = os.environ.get("HF_REPO", "Hamza4100/multi-pdf-storage") | |
| return HFStorageManager(hf_token=hf_token, hf_repo=hf_repo) | |