File size: 7,315 Bytes
c4a0359
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import os
import json
import logging
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid

from huggingface_hub import HfApi, CommitOperationAdd, create_commit, hf_hub_url

logger = logging.getLogger(__name__)

USERS_FILE_PATH = "auth/users.json"
REQUESTS_FILE_PATH = "auth/requests.json"


def build_dataset_resolve_url(repo_id: str, path_in_repo: str, revision: str = "main") -> str:
    """
    Build a CDN-resolved URL for a file stored in a Hugging Face dataset repo.
    """
    return hf_hub_url(repo_id=repo_id, filename=path_in_repo, repo_type="dataset", revision=revision)


class AuthStorageClient:
    """
    Helper for managing user authentication data and permission requests
    in a Hugging Face dataset repository.
    
    Repo format:
      - auth/users.json
      - auth/requests.json
    """

    def __init__(self, dataset_repo: str, hf_token: Optional[str] = None, revision: str = "main"):
        if not dataset_repo:
            raise ValueError("HF_DATASET_REPO is not set. Please configure the dataset repository id.")
        self.dataset_repo = dataset_repo
        self.revision = revision
        self.api = HfApi(token=hf_token) if hf_token else HfApi()

    def load_users(self) -> List[Dict[str, Any]]:
        """
        Download and parse users.json from the dataset. If missing, return [].
        """
        try:
            url = build_dataset_resolve_url(self.dataset_repo, USERS_FILE_PATH, self.revision)
            import requests
            headers = {}
            if self.api.token:
                headers["Authorization"] = f"Bearer {self.api.token}"
            resp = requests.get(url, timeout=10, headers=headers)
            if resp.status_code == 200:
                data = resp.json()
                return data.get("users", [])
            logger.info("Users file not found at %s (status %s). Initializing empty users list.", url, resp.status_code)
            return []
        except Exception as e:
            logger.error("Failed to load users from HF: %s", str(e))
            return []

    def save_users(self, users: List[Dict[str, Any]]) -> None:
        """
        Commit a new version of users.json to the dataset repo.
        """
        try:
            payload = json.dumps({"users": users}, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
            operations = [
                CommitOperationAdd(path_in_repo=USERS_FILE_PATH, path_or_fileobj=payload)
            ]
            create_commit(
                repo_id=self.dataset_repo,
                repo_type="dataset",
                operations=operations,
                commit_message="Update users.json",
                revision=self.revision,
                token=self.api.token,
            )
        except Exception as e:
            logger.error("Failed to save users to HF: %s", str(e))
            raise

    def load_requests(self) -> List[Dict[str, Any]]:
        """
        Download and parse requests.json from the dataset. If missing, return [].
        """
        try:
            url = build_dataset_resolve_url(self.dataset_repo, REQUESTS_FILE_PATH, self.revision)
            import requests
            headers = {}
            if self.api.token:
                headers["Authorization"] = f"Bearer {self.api.token}"
            resp = requests.get(url, timeout=10, headers=headers)
            if resp.status_code == 200:
                data = resp.json()
                return data.get("requests", [])
            logger.info("Requests file not found at %s (status %s). Initializing empty requests list.", url, resp.status_code)
            return []
        except Exception as e:
            logger.error("Failed to load requests from HF: %s", str(e))
            return []

    def save_requests(self, requests: List[Dict[str, Any]]) -> None:
        """
        Commit a new version of requests.json to the dataset repo.
        """
        try:
            payload = json.dumps({"requests": requests}, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
            operations = [
                CommitOperationAdd(path_in_repo=REQUESTS_FILE_PATH, path_or_fileobj=payload)
            ]
            create_commit(
                repo_id=self.dataset_repo,
                repo_type="dataset",
                operations=operations,
                commit_message="Update requests.json",
                revision=self.revision,
                token=self.api.token,
            )
        except Exception as e:
            logger.error("Failed to save requests to HF: %s", str(e))
            raise

    def add_user(self, email: str, password_hash: str) -> None:
        """
        Add a new user to the users list.
        """
        users = self.load_users()
        # Check if user already exists
        if any(user.get("email") == email for user in users):
            raise ValueError(f"User with email {email} already exists")
        
        users.append({
            "email": email,
            "password_hash": password_hash
        })
        self.save_users(users)

    def get_user(self, email: str) -> Optional[Dict[str, Any]]:
        """
        Get a user by email.
        """
        users = self.load_users()
        return next((user for user in users if user.get("email") == email), None)

    def delete_user(self, email: str) -> None:
        """
        Delete a user by email.
        """
        users = self.load_users()
        users = [user for user in users if user.get("email") != email]
        self.save_users(users)

    def add_request(self, name: str, email: str, reason: str) -> str:
        """
        Add a new permission request. Returns the request ID.
        """
        requests = self.load_requests()
        request_id = str(uuid.uuid4())
        
        new_request = {
            "id": request_id,
            "name": name,
            "email": email,
            "reason": reason,
            "timestamp": datetime.utcnow().isoformat(),
            "status": "pending",
            "reviewed_at": None,
            "rejection_reason": None
        }
        
        requests.append(new_request)
        self.save_requests(requests)
        return request_id

    def get_request(self, request_id: str) -> Optional[Dict[str, Any]]:
        """
        Get a request by ID.
        """
        requests = self.load_requests()
        return next((req for req in requests if req.get("id") == request_id), None)

    def update_request_status(self, request_id: str, status: str, rejection_reason: Optional[str] = None) -> None:
        """
        Update the status of a request.
        """
        requests = self.load_requests()
        for req in requests:
            if req.get("id") == request_id:
                req["status"] = status
                req["reviewed_at"] = datetime.utcnow().isoformat()
                if rejection_reason:
                    req["rejection_reason"] = rejection_reason
                break
        self.save_requests(requests)

    def delete_request(self, request_id: str) -> None:
        """
        Delete a request by ID.
        """
        requests = self.load_requests()
        requests = [req for req in requests if req.get("id") != request_id]
        self.save_requests(requests)