multi-pdf-rag-api / hf_storage.py
Hamza4100's picture
Upload 7 files
6ad61bb verified
"""
Hugging Face Storage Manager
=============================
Handles syncing storage files and uploaded PDFs with HF private repository.
Functions:
- sync_storage_from_hf() β†’ Download storage/ and uploaded_pdfs/ on startup
- push_storage_to_hf() β†’ Upload storage/ and uploaded_pdfs/ after changes
"""
import os
from typing import Optional
from huggingface_hub import HfApi, hf_hub_download, login
class HFStorageManager:
"""Manages persistent storage sync with Hugging Face repository."""
def __init__(self, hf_token: Optional[str], hf_repo: str):
"""
Initialize HF Storage Manager.
Args:
hf_token: Hugging Face API token with write access
hf_repo: HF repository ID (e.g., "username/repo-name")
"""
self.hf_token = hf_token
self.hf_repo = hf_repo
self.enabled = bool(hf_token and hf_repo)
self.api = None
if self.enabled:
try:
login(token=hf_token, add_to_git_credential=True)
self.api = HfApi()
print(f"βœ… HF Storage Manager initialized: {hf_repo}")
except Exception as e:
print(f"⚠️ HF login failed: {e}")
self.enabled = False
else:
print("⚠️ HF Storage disabled (HF_TOKEN or HF_REPO not set)")
def sync_storage_from_hf(self, user_id: str) -> bool:
"""
Download storage files and uploaded PDFs for a specific user from HF repo.
Downloads:
- users/{user_id}/storage/faiss.index
- users/{user_id}/storage/metadata.json
- users/{user_id}/storage/documents.json
- users/{user_id}/uploaded_pdfs/*.pdf
Args:
user_id: User identifier (12-char hash from API key)
Returns:
bool: True if sync successful, False otherwise
"""
if not self.enabled:
print(f"⚠️ HF Storage sync skipped for user {user_id} (disabled)")
return False
try:
# Setup local directories for this user
base_dir = os.path.dirname(os.path.abspath(__file__))
user_base = os.path.join(base_dir, "users", user_id)
storage_dir = os.path.join(user_base, "storage")
uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs")
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(uploaded_pdfs_dir, exist_ok=True)
print(f"πŸ“₯ Syncing storage for user {user_id} from HF repo: {self.hf_repo}")
# Download storage files (FAISS index and metadata)
storage_files = ["faiss.index", "metadata.json", "documents.json"]
downloaded_count = 0
for filename in storage_files:
try:
downloaded_path = hf_hub_download(
repo_id=self.hf_repo,
filename=f"users/{user_id}/storage/{filename}",
token=self.hf_token,
repo_type="model",
local_dir=base_dir,
local_dir_use_symlinks=False
)
downloaded_count += 1
print(f" βœ“ Downloaded: users/{user_id}/storage/{filename}")
except Exception as e:
# File doesn't exist yet in HF repo (first run is okay)
print(f" ⚠️ Could not download users/{user_id}/storage/{filename}: {str(e)[:100]}")
# Download all uploaded PDF files for this user
try:
# List all files in user's uploaded_pdfs/ folder
files_in_repo = self.api.list_repo_files(
repo_id=self.hf_repo,
token=self.hf_token
)
pdf_files = [
f for f in files_in_repo
if f.startswith(f"users/{user_id}/uploaded_pdfs/") and f.endswith(".pdf")
]
print(f" Found {len(pdf_files)} PDF files for user {user_id}")
for pdf_file in pdf_files:
try:
hf_hub_download(
repo_id=self.hf_repo,
filename=pdf_file,
token=self.hf_token,
repo_type="model",
local_dir=base_dir,
local_dir_use_symlinks=False
)
print(f" βœ“ Downloaded: {pdf_file}")
except Exception as e:
print(f" ⚠️ Could not download {pdf_file}: {str(e)[:100]}")
except Exception as e:
# uploaded_pdfs folder doesn't exist yet in repo
print(f" ⚠️ Could not list PDF files for user {user_id}: {str(e)[:100]}")
print(f"βœ… HF Storage sync complete for user {user_id} ({downloaded_count} storage files)")
return True
except Exception as e:
print(f"❌ HF Storage sync failed for user {user_id}: {e}")
return False
def push_storage_to_hf(self, user_id: str, commit_message: str = "Update storage") -> bool:
"""
Upload storage files and uploaded PDFs for a specific user to HF repo.
Uploads:
- users/{user_id}/storage/ folder (FAISS index and metadata)
- users/{user_id}/uploaded_pdfs/ folder (PDF files)
Args:
user_id: User identifier (12-char hash from API key)
commit_message: Commit message for the upload
Returns:
bool: True if push successful, False otherwise
"""
if not self.enabled:
print(f"⚠️ HF Storage push skipped for user {user_id} (disabled)")
return False
try:
base_dir = os.path.dirname(os.path.abspath(__file__))
user_base = os.path.join(base_dir, "users", user_id)
storage_dir = os.path.join(user_base, "storage")
uploaded_pdfs_dir = os.path.join(user_base, "uploaded_pdfs")
print(f"πŸ“€ Pushing storage for user {user_id} to HF repo: {self.hf_repo}")
upload_count = 0
# Upload storage folder (FAISS index and metadata)
if os.path.exists(storage_dir) and os.listdir(storage_dir):
try:
self.api.upload_folder(
folder_path=storage_dir,
repo_id=self.hf_repo,
path_in_repo=f"users/{user_id}/storage",
token=self.hf_token,
repo_type="model",
commit_message=f"[User {user_id}] {commit_message}"
)
upload_count += 1
print(f" βœ“ Uploaded: users/{user_id}/storage/ folder")
except Exception as e:
print(f" ❌ Failed to upload storage for user {user_id}: {str(e)[:100]}")
# Upload uploaded_pdfs folder
if os.path.exists(uploaded_pdfs_dir) and os.listdir(uploaded_pdfs_dir):
try:
self.api.upload_folder(
folder_path=uploaded_pdfs_dir,
repo_id=self.hf_repo,
path_in_repo=f"users/{user_id}/uploaded_pdfs",
token=self.hf_token,
repo_type="model",
commit_message=f"[User {user_id}] {commit_message}"
)
upload_count += 1
print(f" βœ“ Uploaded: users/{user_id}/uploaded_pdfs/ folder")
except Exception as e:
print(f" ❌ Failed to upload PDFs for user {user_id}: {str(e)[:100]}")
print(f"βœ… HF Storage push complete for user {user_id} ({upload_count} folders)")
return True
except Exception as e:
print(f"❌ HF Storage push failed for user {user_id}: {e}")
return False
# ============================================
# CONVENIENCE FUNCTIONS
# ============================================
def create_hf_storage_manager(
hf_token: Optional[str] = None,
hf_repo: Optional[str] = None
) -> HFStorageManager:
"""
Create and return an HF Storage Manager instance.
Args:
hf_token: HF token (reads from env if not provided)
hf_repo: HF repo ID (reads from env if not provided)
Returns:
HFStorageManager instance
"""
if hf_token is None:
hf_token = os.environ.get("HF_TOKEN")
if hf_repo is None:
hf_repo = os.environ.get("HF_REPO", "Hamza4100/multi-pdf-storage")
return HFStorageManager(hf_token=hf_token, hf_repo=hf_repo)