import os import json import logging from typing import Any, Dict, List, Optional from datetime import datetime import uuid from huggingface_hub import HfApi, CommitOperationAdd, create_commit, hf_hub_url logger = logging.getLogger(__name__) USERS_FILE_PATH = "auth/users.json" REQUESTS_FILE_PATH = "auth/requests.json" def build_dataset_resolve_url(repo_id: str, path_in_repo: str, revision: str = "main") -> str: """ Build a CDN-resolved URL for a file stored in a Hugging Face dataset repo. """ return hf_hub_url(repo_id=repo_id, filename=path_in_repo, repo_type="dataset", revision=revision) class AuthStorageClient: """ Helper for managing user authentication data and permission requests in a Hugging Face dataset repository. Repo format: - auth/users.json - auth/requests.json """ def __init__(self, dataset_repo: str, hf_token: Optional[str] = None, revision: str = "main"): if not dataset_repo: raise ValueError("HF_DATASET_REPO is not set. Please configure the dataset repository id.") self.dataset_repo = dataset_repo self.revision = revision self.api = HfApi(token=hf_token) if hf_token else HfApi() def load_users(self) -> List[Dict[str, Any]]: """ Download and parse users.json from the dataset. If missing, return []. """ try: url = build_dataset_resolve_url(self.dataset_repo, USERS_FILE_PATH, self.revision) import requests headers = {} if self.api.token: headers["Authorization"] = f"Bearer {self.api.token}" resp = requests.get(url, timeout=10, headers=headers) if resp.status_code == 200: data = resp.json() return data.get("users", []) logger.info("Users file not found at %s (status %s). Initializing empty users list.", url, resp.status_code) return [] except Exception as e: logger.error("Failed to load users from HF: %s", str(e)) return [] def save_users(self, users: List[Dict[str, Any]]) -> None: """ Commit a new version of users.json to the dataset repo. """ try: payload = json.dumps({"users": users}, ensure_ascii=False, separators=(",", ":")).encode("utf-8") operations = [ CommitOperationAdd(path_in_repo=USERS_FILE_PATH, path_or_fileobj=payload) ] create_commit( repo_id=self.dataset_repo, repo_type="dataset", operations=operations, commit_message="Update users.json", revision=self.revision, token=self.api.token, ) except Exception as e: logger.error("Failed to save users to HF: %s", str(e)) raise def load_requests(self) -> List[Dict[str, Any]]: """ Download and parse requests.json from the dataset. If missing, return []. """ try: url = build_dataset_resolve_url(self.dataset_repo, REQUESTS_FILE_PATH, self.revision) import requests headers = {} if self.api.token: headers["Authorization"] = f"Bearer {self.api.token}" resp = requests.get(url, timeout=10, headers=headers) if resp.status_code == 200: data = resp.json() return data.get("requests", []) logger.info("Requests file not found at %s (status %s). Initializing empty requests list.", url, resp.status_code) return [] except Exception as e: logger.error("Failed to load requests from HF: %s", str(e)) return [] def save_requests(self, requests: List[Dict[str, Any]]) -> None: """ Commit a new version of requests.json to the dataset repo. """ try: payload = json.dumps({"requests": requests}, ensure_ascii=False, separators=(",", ":")).encode("utf-8") operations = [ CommitOperationAdd(path_in_repo=REQUESTS_FILE_PATH, path_or_fileobj=payload) ] create_commit( repo_id=self.dataset_repo, repo_type="dataset", operations=operations, commit_message="Update requests.json", revision=self.revision, token=self.api.token, ) except Exception as e: logger.error("Failed to save requests to HF: %s", str(e)) raise def add_user(self, email: str, password_hash: str) -> None: """ Add a new user to the users list. """ users = self.load_users() # Check if user already exists if any(user.get("email") == email for user in users): raise ValueError(f"User with email {email} already exists") users.append({ "email": email, "password_hash": password_hash }) self.save_users(users) def get_user(self, email: str) -> Optional[Dict[str, Any]]: """ Get a user by email. """ users = self.load_users() return next((user for user in users if user.get("email") == email), None) def delete_user(self, email: str) -> None: """ Delete a user by email. """ users = self.load_users() users = [user for user in users if user.get("email") != email] self.save_users(users) def add_request(self, name: str, email: str, reason: str) -> str: """ Add a new permission request. Returns the request ID. """ requests = self.load_requests() request_id = str(uuid.uuid4()) new_request = { "id": request_id, "name": name, "email": email, "reason": reason, "timestamp": datetime.utcnow().isoformat(), "status": "pending", "reviewed_at": None, "rejection_reason": None } requests.append(new_request) self.save_requests(requests) return request_id def get_request(self, request_id: str) -> Optional[Dict[str, Any]]: """ Get a request by ID. """ requests = self.load_requests() return next((req for req in requests if req.get("id") == request_id), None) def update_request_status(self, request_id: str, status: str, rejection_reason: Optional[str] = None) -> None: """ Update the status of a request. """ requests = self.load_requests() for req in requests: if req.get("id") == request_id: req["status"] = status req["reviewed_at"] = datetime.utcnow().isoformat() if rejection_reason: req["rejection_reason"] = rejection_reason break self.save_requests(requests) def delete_request(self, request_id: str) -> None: """ Delete a request by ID. """ requests = self.load_requests() requests = [req for req in requests if req.get("id") != request_id] self.save_requests(requests)