Spaces:
Runtime error
Runtime error
| """ | |
| HuggingFace Persistent Storage Utilities | |
| Provides helper functions for safely reading/writing session data | |
| to HF Spaces persistent directory with atomic operations and locking. | |
| """ | |
| import os | |
| import json | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from typing import Any, Dict, Optional | |
| import time | |
| # Cross-platform locking | |
| try: | |
| import fcntl # Unix/Linux/Mac | |
| HAS_FCNTL = True | |
| except ImportError: | |
| HAS_FCNTL = False | |
| # HuggingFace Spaces persistent directory | |
| HF_PERSISTENT_DIR = Path("/tmp/outputs") | |
| SESSIONS_DIR = HF_PERSISTENT_DIR / "sessions" | |
| def _acquire_lock(file_obj): | |
| """Acquire a lock on a file (cross-platform compatible).""" | |
| if HAS_FCNTL: | |
| fcntl.flock(file_obj.fileno(), fcntl.LOCK_EX) | |
| else: | |
| # On Windows, just add a small delay to reduce contention | |
| time.sleep(0.01) | |
| def _release_lock(file_obj): | |
| """Release a lock on a file (cross-platform compatible).""" | |
| if HAS_FCNTL: | |
| fcntl.flock(file_obj.fileno(), fcntl.LOCK_UN) | |
| def _acquire_shared_lock(file_obj): | |
| """Acquire a shared lock on a file (cross-platform compatible).""" | |
| if HAS_FCNTL: | |
| fcntl.flock(file_obj.fileno(), fcntl.LOCK_SH) | |
| else: | |
| time.sleep(0.01) | |
| def ensure_session_dir() -> Path: | |
| """Ensure the sessions directory exists.""" | |
| SESSIONS_DIR.mkdir(parents=True, exist_ok=True) | |
| return SESSIONS_DIR | |
| def get_user_dir(user_id: str) -> Path: | |
| """Get the user-specific directory.""" | |
| user_dir = SESSIONS_DIR / user_id | |
| user_dir.mkdir(parents=True, exist_ok=True) | |
| return user_dir | |
| def get_session_dir(user_id: str, session_id: str) -> Path: | |
| """Get the session-specific directory.""" | |
| session_dir = get_user_dir(user_id) / session_id | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| return session_dir | |
| def read_json_safe(filepath: Path, default: Optional[Any] = None) -> Any: | |
| """ | |
| Safely read a JSON file with locking. | |
| Args: | |
| filepath: Path to JSON file | |
| default: Default value if file doesn't exist | |
| Returns: | |
| Parsed JSON data or default | |
| """ | |
| if not filepath.exists(): | |
| return default if default is not None else {} | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| _acquire_shared_lock(f) | |
| try: | |
| data = json.load(f) | |
| finally: | |
| _release_lock(f) | |
| return data | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"Warning: Failed to read {filepath}: {e}") | |
| return default if default is not None else {} | |
| def write_json_safe(filepath: Path, data: Any, indent: int = 2) -> bool: | |
| """ | |
| Safely write a JSON file with atomic operations and locking. | |
| Args: | |
| filepath: Path to JSON file | |
| data: Data to write | |
| indent: JSON indentation level | |
| Returns: | |
| True if successful, False otherwise | |
| """ | |
| try: | |
| # Ensure parent directory exists | |
| filepath.parent.mkdir(parents=True, exist_ok=True) | |
| # Write to temporary file | |
| with tempfile.NamedTemporaryFile( | |
| mode='w', | |
| dir=filepath.parent, | |
| suffix='.tmp', | |
| delete=False, | |
| encoding='utf-8' | |
| ) as tmp: | |
| _acquire_lock(tmp) | |
| try: | |
| json.dump(data, tmp, indent=indent, ensure_ascii=False) | |
| tmp.flush() | |
| os.fsync(tmp.fileno()) | |
| finally: | |
| _release_lock(tmp) | |
| tmp_path = tmp.name | |
| # Atomic rename | |
| shutil.move(tmp_path, filepath) | |
| return True | |
| except Exception as e: | |
| print(f"Error: Failed to write {filepath}: {e}") | |
| if 'tmp_path' in locals() and os.path.exists(tmp_path): | |
| try: | |
| os.remove(tmp_path) | |
| except: | |
| pass | |
| return False | |
| def read_json_locked(filepath: Path, default: Optional[Any] = None) -> Any: | |
| """Read JSON with exclusive lock (for critical reads).""" | |
| if not filepath.exists(): | |
| return default if default is not None else {} | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| _acquire_lock(f) | |
| try: | |
| data = json.load(f) | |
| finally: | |
| _release_lock(f) | |
| return data | |
| except (json.JSONDecodeError, IOError) as e: | |
| print(f"Warning: Failed to read {filepath}: {e}") | |
| return default if default is not None else {} | |
| def delete_session_dir(user_id: str, session_id: str) -> bool: | |
| """Delete a session directory.""" | |
| try: | |
| session_dir = get_session_dir(user_id, session_id) | |
| if session_dir.exists(): | |
| shutil.rmtree(session_dir) | |
| return True | |
| except Exception as e: | |
| print(f"Error: Failed to delete session {session_id}: {e}") | |
| return False | |
| def list_user_session_dirs(user_id: str) -> list: | |
| """List all session IDs for a user.""" | |
| user_dir = get_user_dir(user_id) | |
| if not user_dir.exists(): | |
| return [] | |
| return [d.name for d in user_dir.iterdir() if d.is_dir()] | |
| def cleanup_old_session_files(user_id: str, session_id: str) -> None: | |
| """Remove temporary/backup files from a session directory.""" | |
| session_dir = get_session_dir(user_id, session_id) | |
| patterns = ['*.tmp', '*.bak', '*.lock'] | |
| for pattern in patterns: | |
| for f in session_dir.glob(pattern): | |
| try: | |
| f.unlink() | |
| except: | |
| pass | |