Spaces:
Sleeping
Sleeping
File size: 9,382 Bytes
6ad61bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
"""
Hugging Face Storage Manager
=============================
Handles syncing storage files and uploaded PDFs with HF private repository.
Functions:
- sync_storage_from_hf() β Download storage/ and uploaded_pdfs/ on startup
- push_storage_to_hf() β Upload storage/ and uploaded_pdfs/ after changes
"""
import os
from typing import Optional
from huggingface_hub import HfApi, hf_hub_download, login
class HFStorageManager:
"""Manages persistent storage sync with Hugging Face repository."""
def __init__(self, hf_token: Optional[str], hf_repo: str):
"""
Initialize HF Storage Manager.
Args:
hf_token: Hugging Face API token with write access
hf_repo: HF repository ID (e.g., "username/repo-name")
"""
self.hf_token = hf_token
self.hf_repo = hf_repo
self.enabled = bool(hf_token and hf_repo)
self.api = None
if self.enabled:
try:
login(token=hf_token, add_to_git_credential=True)
self.api = HfApi()
print(f"β
HF Storage Manager initialized: {hf_repo}")
except Exception as e:
print(f"β οΈ HF login failed: {e}")
self.enabled = False
else:
print("β οΈ HF Storage disabled (HF_TOKEN or HF_REPO not set)")
def sync_storage_from_hf(self, user_id: str) -> bool:
"""
Download storage files and uploaded PDFs for a specific user from HF repo.
Downloads:
- users/{user_id}/storage/faiss.index
- users/{user_id}/storage/metadata.json
- users/{user_id}/storage/documents.json
- users/{user_id}/uploaded_pdfs/*.pdf
Args:
user_id: User identifier (12-char hash from API key)
Returns:
bool: True if sync successful, False otherwise
"""
if not self.enabled:
print(f"β οΈ HF Storage sync skipped for user {user_id} (disabled)")
return False
try:
# Setup local directories for this user
base_dir = os.path.dirname(os.path.abspath(__file__))
user_base = os.path.join(base_dir, "users", user_id)
storage_dir = os.path.join(user_base, "storage")
uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs")
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(uploaded_pdfs_dir, exist_ok=True)
print(f"π₯ Syncing storage for user {user_id} from HF repo: {self.hf_repo}")
# Download storage files (FAISS index and metadata)
storage_files = ["faiss.index", "metadata.json", "documents.json"]
downloaded_count = 0
for filename in storage_files:
try:
downloaded_path = hf_hub_download(
repo_id=self.hf_repo,
filename=f"users/{user_id}/storage/{filename}",
token=self.hf_token,
repo_type="model",
local_dir=base_dir,
local_dir_use_symlinks=False
)
downloaded_count += 1
print(f" β Downloaded: users/{user_id}/storage/{filename}")
except Exception as e:
# File doesn't exist yet in HF repo (first run is okay)
print(f" β οΈ Could not download users/{user_id}/storage/{filename}: {str(e)[:100]}")
# Download all uploaded PDF files for this user
try:
# List all files in user's uploaded_pdfs/ folder
files_in_repo = self.api.list_repo_files(
repo_id=self.hf_repo,
token=self.hf_token
)
pdf_files = [
f for f in files_in_repo
if f.startswith(f"users/{user_id}/uploaded_pdfs/") and f.endswith(".pdf")
]
print(f" Found {len(pdf_files)} PDF files for user {user_id}")
for pdf_file in pdf_files:
try:
hf_hub_download(
repo_id=self.hf_repo,
filename=pdf_file,
token=self.hf_token,
repo_type="model",
local_dir=base_dir,
local_dir_use_symlinks=False
)
print(f" β Downloaded: {pdf_file}")
except Exception as e:
print(f" β οΈ Could not download {pdf_file}: {str(e)[:100]}")
except Exception as e:
# uploaded_pdfs folder doesn't exist yet in repo
print(f" β οΈ Could not list PDF files for user {user_id}: {str(e)[:100]}")
print(f"β
HF Storage sync complete for user {user_id} ({downloaded_count} storage files)")
return True
except Exception as e:
print(f"β HF Storage sync failed for user {user_id}: {e}")
return False
def push_storage_to_hf(self, user_id: str, commit_message: str = "Update storage") -> bool:
"""
Upload storage files and uploaded PDFs for a specific user to HF repo.
Uploads:
- users/{user_id}/storage/ folder (FAISS index and metadata)
- users/{user_id}/uploaded_pdfs/ folder (PDF files)
Args:
user_id: User identifier (12-char hash from API key)
commit_message: Commit message for the upload
Returns:
bool: True if push successful, False otherwise
"""
if not self.enabled:
print(f"β οΈ HF Storage push skipped for user {user_id} (disabled)")
return False
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
user_base = os.path.join(base_dir, "users", user_id)
storage_dir = os.path.join(user_base, "storage")
uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs")
print(f"π€ Pushing storage for user {user_id} to HF repo: {self.hf_repo}")
upload_count = 0
# Upload storage folder (FAISS index and metadata)
if os.path.exists(storage_dir) and os.listdir(storage_dir):
try:
self.api.upload_folder(
folder_path=storage_dir,
repo_id=self.hf_repo,
path_in_repo=f"users/{user_id}/storage",
token=self.hf_token,
repo_type="model",
commit_message=f"[User {user_id}] {commit_message}"
)
upload_count += 1
print(f" β Uploaded: users/{user_id}/storage/ folder")
except Exception as e:
print(f" β Failed to upload storage for user {user_id}: {str(e)[:100]}")
# Upload uploaded_pdfs folder
if os.path.exists(uploaded_pdfs_dir) and os.listdir(uploaded_pdfs_dir):
try:
self.api.upload_folder(
folder_path=uploaded_pdfs_dir,
repo_id=self.hf_repo,
path_in_repo=f"users/{user_id}/uploaded_pdfs",
token=self.hf_token,
repo_type="model",
commit_message=f"[User {user_id}] {commit_message}"
)
upload_count += 1
print(f" β Uploaded: users/{user_id}/uploaded_pdfs/ folder")
except Exception as e:
print(f" β Failed to upload PDFs for user {user_id}: {str(e)[:100]}")
print(f"β
HF Storage push complete for user {user_id} ({upload_count} folders)")
return True
except Exception as e:
print(f"β HF Storage push failed for user {user_id}: {e}")
return False
# ============================================
# CONVENIENCE FUNCTIONS
# ============================================
def create_hf_storage_manager(
hf_token: Optional[str] = None,
hf_repo: Optional[str] = None
) -> HFStorageManager:
"""
Create and return an HF Storage Manager instance.
Args:
hf_token: HF token (reads from env if not provided)
hf_repo: HF repo ID (reads from env if not provided)
Returns:
HFStorageManager instance
"""
if hf_token is None:
hf_token = os.environ.get("HF_TOKEN")
if hf_repo is None:
hf_repo = os.environ.get("HF_REPO", "Hamza4100/multi-pdf-storage")
return HFStorageManager(hf_token=hf_token, hf_repo=hf_repo)
|