Spaces:
Running
Running
File size: 7,622 Bytes
f1b8a40 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
import json
import os
import secrets
import tempfile
from datetime import datetime
from typing import Dict, Any, Optional
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
# Configuration for HF Dataset-based ledger
LEDGER_REPO = os.getenv("LEDGER_DATASET_ID", "")
LEDGER_FILE = "subscriptions.jsonl"
HF_TOKEN = os.getenv("HF_TOKEN")
# Fallback to local file if LEDGER_DATASET_ID not set (for local dev)
LOCAL_LEDGER_FILE = "../subscriptions_ledger/subscriptions.jsonl"
# Initialize HF API
api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
def _use_hf_storage() -> bool:
"""Check if we should use HF Dataset storage."""
return bool(LEDGER_REPO and HF_TOKEN and api)
def _download_ledger() -> Optional[str]:
"""Download current ledger from HF Dataset."""
if not _use_hf_storage():
return None
try:
path = hf_hub_download(
repo_id=LEDGER_REPO,
filename=LEDGER_FILE,
repo_type="dataset",
token=HF_TOKEN
)
return path
except EntryNotFoundError:
# File doesn't exist yet in the dataset
return None
except Exception as e:
print(f"Error downloading ledger: {e}")
return None
def _upload_ledger(local_path: str) -> bool:
"""Upload ledger to HF Dataset."""
if not _use_hf_storage():
return False
try:
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=LEDGER_FILE,
repo_id=LEDGER_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Update subscriptions ledger - {datetime.utcnow().isoformat()}"
)
return True
except Exception as e:
print(f"Error uploading ledger: {e}")
return False
def _get_ledger_path() -> str:
"""Get the path to read the ledger from."""
if _use_hf_storage():
hf_path = _download_ledger()
if hf_path:
return hf_path
# Fallback to local file
return LOCAL_LEDGER_FILE
def append_subscription_event(event: Dict[str, Any]) -> bool:
"""
Appends a subscription event to the ledger.
If using HF Dataset, downloads, appends, and re-uploads.
"""
# Ensure timestamp is present
if "created_at" not in event:
event["created_at"] = datetime.utcnow().isoformat() + "Z"
if _use_hf_storage():
# Download current ledger
current_path = _download_ledger()
# Create temp file to work with
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as tmp:
tmp_path = tmp.name
# Copy existing content if any
if current_path and os.path.exists(current_path):
with open(current_path, 'r') as f:
tmp.write(f.read())
# Append new event
tmp.write(json.dumps(event) + "\n")
# Upload back to HF
success = _upload_ledger(tmp_path)
# Clean up temp file
try:
os.unlink(tmp_path)
except:
pass
return success
else:
# Local file storage
parent_dir = os.path.dirname(LOCAL_LEDGER_FILE)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
with open(LOCAL_LEDGER_FILE, "a") as f:
f.write(json.dumps(event) + "\n")
return True
def get_all_subscriptions() -> Dict[tuple, Dict[str, Any]]:
"""
Reads the ledger and folds events to find the latest state for each (user, dataset).
Returns a dict: key=(hf_user, dataset_id), value=subscription_record
"""
subscriptions = {}
ledger_path = _get_ledger_path()
if not ledger_path or not os.path.exists(ledger_path):
return subscriptions
try:
with open(ledger_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
hf_user = event.get("hf_user")
dataset_id = event.get("dataset_id")
if hf_user and dataset_id:
key = (hf_user, dataset_id)
# Append-only ledger: latest record is current state
subscriptions[key] = event
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Error reading ledger: {e}")
return subscriptions
def get_active_subscription(hf_user: str, dataset_id: str) -> Optional[Dict[str, Any]]:
"""Checks if a user has an active subscription to a dataset."""
all_subs = get_all_subscriptions()
sub = all_subs.get((hf_user, dataset_id))
if not sub:
return None
# Check expiry
subscription_end = sub.get("subscription_end")
if not subscription_end:
return None
try:
# Handle Z suffix if present
if subscription_end.endswith("Z"):
subscription_end = subscription_end[:-1]
end_date = datetime.fromisoformat(subscription_end)
if end_date > datetime.utcnow():
return sub
except ValueError:
print(f"Error parsing date: {subscription_end}")
return None
return None
def get_user_subscriptions(hf_user: str) -> list:
"""Get all subscriptions for a specific user."""
all_subs = get_all_subscriptions()
user_subs = []
for (user, dataset_id), sub in all_subs.items():
if user == hf_user:
# Add active status
end_str = sub.get("subscription_end", "")
is_active = False
if end_str:
try:
if end_str.endswith("Z"):
end_str = end_str[:-1]
end_date = datetime.fromisoformat(end_str)
is_active = end_date > datetime.utcnow()
except:
pass
user_subs.append({
**sub,
"is_active": is_active
})
return user_subs
def generate_access_token() -> str:
"""Generate a secure random access token."""
return f"hfdata_{secrets.token_urlsafe(32)}"
def validate_access_token(access_token: str) -> Optional[Dict[str, Any]]:
"""
Validate an access token and return the subscription info if valid.
Returns None if token is invalid or subscription expired.
"""
all_subs = get_all_subscriptions()
for (hf_user, dataset_id), sub in all_subs.items():
if sub.get("access_token") == access_token:
# Check if subscription is still active
end_str = sub.get("subscription_end", "")
if end_str:
try:
if end_str.endswith("Z"):
end_str = end_str[:-1]
end_date = datetime.fromisoformat(end_str)
if end_date > datetime.utcnow():
return {
"hf_user": hf_user,
"dataset_id": dataset_id,
"subscription": sub
}
except:
pass
return None # Token found but subscription expired
return None # Token not found
def get_subscription_by_token(access_token: str) -> Optional[Dict[str, Any]]:
"""
Get subscription details by access token.
Alias for validate_access_token for clarity.
"""
return validate_access_token(access_token)
|