File size: 7,622 Bytes
f1b8a40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import json
import os
import secrets
import tempfile
from datetime import datetime
from typing import Dict, Any, Optional
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError

# Configuration for HF Dataset-based ledger
LEDGER_REPO = os.getenv("LEDGER_DATASET_ID", "")
LEDGER_FILE = "subscriptions.jsonl"
HF_TOKEN = os.getenv("HF_TOKEN")

# Fallback to local file if LEDGER_DATASET_ID not set (for local dev)
LOCAL_LEDGER_FILE = "../subscriptions_ledger/subscriptions.jsonl"

# Initialize HF API
api = HfApi(token=HF_TOKEN) if HF_TOKEN else None


def _use_hf_storage() -> bool:
    """Check if we should use HF Dataset storage."""
    return bool(LEDGER_REPO and HF_TOKEN and api)


def _download_ledger() -> Optional[str]:
    """Download current ledger from HF Dataset."""
    if not _use_hf_storage():
        return None

    try:
        path = hf_hub_download(
            repo_id=LEDGER_REPO,
            filename=LEDGER_FILE,
            repo_type="dataset",
            token=HF_TOKEN
        )
        return path
    except EntryNotFoundError:
        # File doesn't exist yet in the dataset
        return None
    except Exception as e:
        print(f"Error downloading ledger: {e}")
        return None


def _upload_ledger(local_path: str) -> bool:
    """Upload ledger to HF Dataset."""
    if not _use_hf_storage():
        return False

    try:
        api.upload_file(
            path_or_fileobj=local_path,
            path_in_repo=LEDGER_FILE,
            repo_id=LEDGER_REPO,
            repo_type="dataset",
            token=HF_TOKEN,
            commit_message=f"Update subscriptions ledger - {datetime.utcnow().isoformat()}"
        )
        return True
    except Exception as e:
        print(f"Error uploading ledger: {e}")
        return False


def _get_ledger_path() -> str:
    """Get the path to read the ledger from."""
    if _use_hf_storage():
        hf_path = _download_ledger()
        if hf_path:
            return hf_path

    # Fallback to local file
    return LOCAL_LEDGER_FILE


def append_subscription_event(event: Dict[str, Any]) -> bool:
    """
    Appends a subscription event to the ledger.
    If using HF Dataset, downloads, appends, and re-uploads.
    """
    # Ensure timestamp is present
    if "created_at" not in event:
        event["created_at"] = datetime.utcnow().isoformat() + "Z"

    if _use_hf_storage():
        # Download current ledger
        current_path = _download_ledger()

        # Create temp file to work with
        with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as tmp:
            tmp_path = tmp.name

            # Copy existing content if any
            if current_path and os.path.exists(current_path):
                with open(current_path, 'r') as f:
                    tmp.write(f.read())

            # Append new event
            tmp.write(json.dumps(event) + "\n")

        # Upload back to HF
        success = _upload_ledger(tmp_path)

        # Clean up temp file
        try:
            os.unlink(tmp_path)
        except:
            pass

        return success
    else:
        # Local file storage
        parent_dir = os.path.dirname(LOCAL_LEDGER_FILE)
        if parent_dir:
            os.makedirs(parent_dir, exist_ok=True)
        with open(LOCAL_LEDGER_FILE, "a") as f:
            f.write(json.dumps(event) + "\n")
        return True


def get_all_subscriptions() -> Dict[tuple, Dict[str, Any]]:
    """
    Reads the ledger and folds events to find the latest state for each (user, dataset).
    Returns a dict: key=(hf_user, dataset_id), value=subscription_record
    """
    subscriptions = {}

    ledger_path = _get_ledger_path()

    if not ledger_path or not os.path.exists(ledger_path):
        return subscriptions

    try:
        with open(ledger_path, "r") as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    event = json.loads(line)
                    hf_user = event.get("hf_user")
                    dataset_id = event.get("dataset_id")

                    if hf_user and dataset_id:
                        key = (hf_user, dataset_id)
                        # Append-only ledger: latest record is current state
                        subscriptions[key] = event
                except json.JSONDecodeError:
                    continue
    except Exception as e:
        print(f"Error reading ledger: {e}")

    return subscriptions


def get_active_subscription(hf_user: str, dataset_id: str) -> Optional[Dict[str, Any]]:
    """Checks if a user has an active subscription to a dataset."""
    all_subs = get_all_subscriptions()
    sub = all_subs.get((hf_user, dataset_id))

    if not sub:
        return None

    # Check expiry
    subscription_end = sub.get("subscription_end")
    if not subscription_end:
        return None

    try:
        # Handle Z suffix if present
        if subscription_end.endswith("Z"):
            subscription_end = subscription_end[:-1]
        end_date = datetime.fromisoformat(subscription_end)
        if end_date > datetime.utcnow():
            return sub
    except ValueError:
        print(f"Error parsing date: {subscription_end}")
        return None

    return None


def get_user_subscriptions(hf_user: str) -> list:
    """Get all subscriptions for a specific user."""
    all_subs = get_all_subscriptions()
    user_subs = []

    for (user, dataset_id), sub in all_subs.items():
        if user == hf_user:
            # Add active status
            end_str = sub.get("subscription_end", "")
            is_active = False
            if end_str:
                try:
                    if end_str.endswith("Z"):
                        end_str = end_str[:-1]
                    end_date = datetime.fromisoformat(end_str)
                    is_active = end_date > datetime.utcnow()
                except:
                    pass

            user_subs.append({
                **sub,
                "is_active": is_active
            })

    return user_subs


def generate_access_token() -> str:
    """Generate a secure random access token."""
    return f"hfdata_{secrets.token_urlsafe(32)}"


def validate_access_token(access_token: str) -> Optional[Dict[str, Any]]:
    """
    Validate an access token and return the subscription info if valid.
    Returns None if token is invalid or subscription expired.
    """
    all_subs = get_all_subscriptions()

    for (hf_user, dataset_id), sub in all_subs.items():
        if sub.get("access_token") == access_token:
            # Check if subscription is still active
            end_str = sub.get("subscription_end", "")
            if end_str:
                try:
                    if end_str.endswith("Z"):
                        end_str = end_str[:-1]
                    end_date = datetime.fromisoformat(end_str)
                    if end_date > datetime.utcnow():
                        return {
                            "hf_user": hf_user,
                            "dataset_id": dataset_id,
                            "subscription": sub
                        }
                except:
                    pass
            return None  # Token found but subscription expired

    return None  # Token not found


def get_subscription_by_token(access_token: str) -> Optional[Dict[str, Any]]:
    """
    Get subscription details by access token.
    Alias for validate_access_token for clarity.
    """
    return validate_access_token(access_token)