datapass-server / subscriptions_ledger.py
waroca's picture
Upload folder using huggingface_hub
f1b8a40 verified
import json
import os
import secrets
import tempfile
from datetime import datetime
from typing import Dict, Any, Optional
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
# Configuration for HF Dataset-based ledger
LEDGER_REPO = os.getenv("LEDGER_DATASET_ID", "")
LEDGER_FILE = "subscriptions.jsonl"
HF_TOKEN = os.getenv("HF_TOKEN")
# Fallback to local file if LEDGER_DATASET_ID not set (for local dev)
LOCAL_LEDGER_FILE = "../subscriptions_ledger/subscriptions.jsonl"
# Initialize HF API
api = HfApi(token=HF_TOKEN) if HF_TOKEN else None
def _use_hf_storage() -> bool:
"""Check if we should use HF Dataset storage."""
return bool(LEDGER_REPO and HF_TOKEN and api)
def _download_ledger() -> Optional[str]:
"""Download current ledger from HF Dataset."""
if not _use_hf_storage():
return None
try:
path = hf_hub_download(
repo_id=LEDGER_REPO,
filename=LEDGER_FILE,
repo_type="dataset",
token=HF_TOKEN
)
return path
except EntryNotFoundError:
# File doesn't exist yet in the dataset
return None
except Exception as e:
print(f"Error downloading ledger: {e}")
return None
def _upload_ledger(local_path: str) -> bool:
"""Upload ledger to HF Dataset."""
if not _use_hf_storage():
return False
try:
api.upload_file(
path_or_fileobj=local_path,
path_in_repo=LEDGER_FILE,
repo_id=LEDGER_REPO,
repo_type="dataset",
token=HF_TOKEN,
commit_message=f"Update subscriptions ledger - {datetime.utcnow().isoformat()}"
)
return True
except Exception as e:
print(f"Error uploading ledger: {e}")
return False
def _get_ledger_path() -> str:
"""Get the path to read the ledger from."""
if _use_hf_storage():
hf_path = _download_ledger()
if hf_path:
return hf_path
# Fallback to local file
return LOCAL_LEDGER_FILE
def append_subscription_event(event: Dict[str, Any]) -> bool:
"""
Appends a subscription event to the ledger.
If using HF Dataset, downloads, appends, and re-uploads.
"""
# Ensure timestamp is present
if "created_at" not in event:
event["created_at"] = datetime.utcnow().isoformat() + "Z"
if _use_hf_storage():
# Download current ledger
current_path = _download_ledger()
# Create temp file to work with
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as tmp:
tmp_path = tmp.name
# Copy existing content if any
if current_path and os.path.exists(current_path):
with open(current_path, 'r') as f:
tmp.write(f.read())
# Append new event
tmp.write(json.dumps(event) + "\n")
# Upload back to HF
success = _upload_ledger(tmp_path)
# Clean up temp file
try:
os.unlink(tmp_path)
except:
pass
return success
else:
# Local file storage
parent_dir = os.path.dirname(LOCAL_LEDGER_FILE)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
with open(LOCAL_LEDGER_FILE, "a") as f:
f.write(json.dumps(event) + "\n")
return True
def get_all_subscriptions() -> Dict[tuple, Dict[str, Any]]:
"""
Reads the ledger and folds events to find the latest state for each (user, dataset).
Returns a dict: key=(hf_user, dataset_id), value=subscription_record
"""
subscriptions = {}
ledger_path = _get_ledger_path()
if not ledger_path or not os.path.exists(ledger_path):
return subscriptions
try:
with open(ledger_path, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
hf_user = event.get("hf_user")
dataset_id = event.get("dataset_id")
if hf_user and dataset_id:
key = (hf_user, dataset_id)
# Append-only ledger: latest record is current state
subscriptions[key] = event
except json.JSONDecodeError:
continue
except Exception as e:
print(f"Error reading ledger: {e}")
return subscriptions
def get_active_subscription(hf_user: str, dataset_id: str) -> Optional[Dict[str, Any]]:
"""Checks if a user has an active subscription to a dataset."""
all_subs = get_all_subscriptions()
sub = all_subs.get((hf_user, dataset_id))
if not sub:
return None
# Check expiry
subscription_end = sub.get("subscription_end")
if not subscription_end:
return None
try:
# Handle Z suffix if present
if subscription_end.endswith("Z"):
subscription_end = subscription_end[:-1]
end_date = datetime.fromisoformat(subscription_end)
if end_date > datetime.utcnow():
return sub
except ValueError:
print(f"Error parsing date: {subscription_end}")
return None
return None
def get_user_subscriptions(hf_user: str) -> list:
"""Get all subscriptions for a specific user."""
all_subs = get_all_subscriptions()
user_subs = []
for (user, dataset_id), sub in all_subs.items():
if user == hf_user:
# Add active status
end_str = sub.get("subscription_end", "")
is_active = False
if end_str:
try:
if end_str.endswith("Z"):
end_str = end_str[:-1]
end_date = datetime.fromisoformat(end_str)
is_active = end_date > datetime.utcnow()
except:
pass
user_subs.append({
**sub,
"is_active": is_active
})
return user_subs
def generate_access_token() -> str:
"""Generate a secure random access token."""
return f"hfdata_{secrets.token_urlsafe(32)}"
def validate_access_token(access_token: str) -> Optional[Dict[str, Any]]:
"""
Validate an access token and return the subscription info if valid.
Returns None if token is invalid or subscription expired.
"""
all_subs = get_all_subscriptions()
for (hf_user, dataset_id), sub in all_subs.items():
if sub.get("access_token") == access_token:
# Check if subscription is still active
end_str = sub.get("subscription_end", "")
if end_str:
try:
if end_str.endswith("Z"):
end_str = end_str[:-1]
end_date = datetime.fromisoformat(end_str)
if end_date > datetime.utcnow():
return {
"hf_user": hf_user,
"dataset_id": dataset_id,
"subscription": sub
}
except:
pass
return None # Token found but subscription expired
return None # Token not found
def get_subscription_by_token(access_token: str) -> Optional[Dict[str, Any]]:
"""
Get subscription details by access token.
Alias for validate_access_token for clarity.
"""
return validate_access_token(access_token)