dummyQuantum / hf_storage.py
Apurva Tiwari
feature: sessions, init
ca961b4
"""
HuggingFace Persistent Storage Utilities
Provides helper functions for safely reading/writing session data
to HF Spaces persistent directory with atomic operations and locking.
"""
import os
import json
import tempfile
import shutil
from pathlib import Path
from typing import Any, Dict, Optional
import time
# Cross-platform locking
try:
import fcntl # Unix/Linux/Mac
HAS_FCNTL = True
except ImportError:
HAS_FCNTL = False
# HuggingFace Spaces persistent directory
HF_PERSISTENT_DIR = Path("/tmp/outputs")
SESSIONS_DIR = HF_PERSISTENT_DIR / "sessions"
def _acquire_lock(file_obj):
"""Acquire a lock on a file (cross-platform compatible)."""
if HAS_FCNTL:
fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX)
else:
# On Windows, just add a small delay to reduce contention
time.sleep(0.01)
def _release_lock(file_obj):
"""Release a lock on a file (cross-platform compatible)."""
if HAS_FCNTL:
fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN)
def _acquire_shared_lock(file_obj):
"""Acquire a shared lock on a file (cross-platform compatible)."""
if HAS_FCNTL:
fcntl.flock(file_obj.fileno(), fcntl.LOCK_SH)
else:
time.sleep(0.01)
def ensure_session_dir() -> Path:
"""Ensure the sessions directory exists."""
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
return SESSIONS_DIR
def get_user_dir(user_id: str) -> Path:
"""Get the user-specific directory."""
user_dir = SESSIONS_DIR / user_id
user_dir.mkdir(parents=True, exist_ok=True)
return user_dir
def get_session_dir(user_id: str, session_id: str) -> Path:
"""Get the session-specific directory."""
session_dir = get_user_dir(user_id) / session_id
session_dir.mkdir(parents=True, exist_ok=True)
return session_dir
def read_json_safe(filepath: Path, default: Optional[Any] = None) -> Any:
"""
Safely read a JSON file with locking.
Args:
filepath: Path to JSON file
default: Default value if file doesn't exist
Returns:
Parsed JSON data or default
"""
if not filepath.exists():
return default if default is not None else {}
try:
with open(filepath, 'r', encoding='utf-8') as f:
_acquire_shared_lock(f)
try:
data = json.load(f)
finally:
_release_lock(f)
return data
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Failed to read {filepath}: {e}")
return default if default is not None else {}
def write_json_safe(filepath: Path, data: Any, indent: int = 2) -> bool:
"""
Safely write a JSON file with atomic operations and locking.
Args:
filepath: Path to JSON file
data: Data to write
indent: JSON indentation level
Returns:
True if successful, False otherwise
"""
try:
# Ensure parent directory exists
filepath.parent.mkdir(parents=True, exist_ok=True)
# Write to temporary file
with tempfile.NamedTemporaryFile(
mode='w',
dir=filepath.parent,
suffix='.tmp',
delete=False,
encoding='utf-8'
) as tmp:
_acquire_lock(tmp)
try:
json.dump(data, tmp, indent=indent, ensure_ascii=False)
tmp.flush()
os.fsync(tmp.fileno())
finally:
_release_lock(tmp)
tmp_path = tmp.name
# Atomic rename
shutil.move(tmp_path, filepath)
return True
except Exception as e:
print(f"Error: Failed to write {filepath}: {e}")
if 'tmp_path' in locals() and os.path.exists(tmp_path):
try:
os.remove(tmp_path)
except:
pass
return False
def read_json_locked(filepath: Path, default: Optional[Any] = None) -> Any:
"""Read JSON with exclusive lock (for critical reads)."""
if not filepath.exists():
return default if default is not None else {}
try:
with open(filepath, 'r', encoding='utf-8') as f:
_acquire_lock(f)
try:
data = json.load(f)
finally:
_release_lock(f)
return data
except (json.JSONDecodeError, IOError) as e:
print(f"Warning: Failed to read {filepath}: {e}")
return default if default is not None else {}
def delete_session_dir(user_id: str, session_id: str) -> bool:
"""Delete a session directory."""
try:
session_dir = get_session_dir(user_id, session_id)
if session_dir.exists():
shutil.rmtree(session_dir)
return True
except Exception as e:
print(f"Error: Failed to delete session {session_id}: {e}")
return False
def list_user_session_dirs(user_id: str) -> list:
"""List all session IDs for a user."""
user_dir = get_user_dir(user_id)
if not user_dir.exists():
return []
return [d.name for d in user_dir.iterdir() if d.is_dir()]
def cleanup_old_session_files(user_id: str, session_id: str) -> None:
"""Remove temporary/backup files from a session directory."""
session_dir = get_session_dir(user_id, session_id)
patterns = ['*.tmp', '*.bak', '*.lock']
for pattern in patterns:
for f in session_dir.glob(pattern):
try:
f.unlink()
except:
pass