import json import os import secrets import tempfile from datetime import datetime from typing import Dict, Any, Optional from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError # Configuration for HF Dataset-based ledger LEDGER_REPO = os.getenv("LEDGER_DATASET_ID", "") LEDGER_FILE = "subscriptions.jsonl" HF_TOKEN = os.getenv("HF_TOKEN") # Fallback to local file if LEDGER_DATASET_ID not set (for local dev) LOCAL_LEDGER_FILE = "../subscriptions_ledger/subscriptions.jsonl" # Initialize HF API api = HfApi(token=HF_TOKEN) if HF_TOKEN else None def _use_hf_storage() -> bool: """Check if we should use HF Dataset storage.""" return bool(LEDGER_REPO and HF_TOKEN and api) def _download_ledger() -> Optional[str]: """Download current ledger from HF Dataset.""" if not _use_hf_storage(): return None try: path = hf_hub_download( repo_id=LEDGER_REPO, filename=LEDGER_FILE, repo_type="dataset", token=HF_TOKEN ) return path except EntryNotFoundError: # File doesn't exist yet in the dataset return None except Exception as e: print(f"Error downloading ledger: {e}") return None def _upload_ledger(local_path: str) -> bool: """Upload ledger to HF Dataset.""" if not _use_hf_storage(): return False try: api.upload_file( path_or_fileobj=local_path, path_in_repo=LEDGER_FILE, repo_id=LEDGER_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Update subscriptions ledger - {datetime.utcnow().isoformat()}" ) return True except Exception as e: print(f"Error uploading ledger: {e}") return False def _get_ledger_path() -> str: """Get the path to read the ledger from.""" if _use_hf_storage(): hf_path = _download_ledger() if hf_path: return hf_path # Fallback to local file return LOCAL_LEDGER_FILE def append_subscription_event(event: Dict[str, Any]) -> bool: """ Appends a subscription event to the ledger. If using HF Dataset, downloads, appends, and re-uploads. """ # Ensure timestamp is present if "created_at" not in event: event["created_at"] = datetime.utcnow().isoformat() + "Z" if _use_hf_storage(): # Download current ledger current_path = _download_ledger() # Create temp file to work with with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as tmp: tmp_path = tmp.name # Copy existing content if any if current_path and os.path.exists(current_path): with open(current_path, 'r') as f: tmp.write(f.read()) # Append new event tmp.write(json.dumps(event) + "\n") # Upload back to HF success = _upload_ledger(tmp_path) # Clean up temp file try: os.unlink(tmp_path) except: pass return success else: # Local file storage parent_dir = os.path.dirname(LOCAL_LEDGER_FILE) if parent_dir: os.makedirs(parent_dir, exist_ok=True) with open(LOCAL_LEDGER_FILE, "a") as f: f.write(json.dumps(event) + "\n") return True def get_all_subscriptions() -> Dict[tuple, Dict[str, Any]]: """ Reads the ledger and folds events to find the latest state for each (user, dataset). Returns a dict: key=(hf_user, dataset_id), value=subscription_record """ subscriptions = {} ledger_path = _get_ledger_path() if not ledger_path or not os.path.exists(ledger_path): return subscriptions try: with open(ledger_path, "r") as f: for line in f: line = line.strip() if not line: continue try: event = json.loads(line) hf_user = event.get("hf_user") dataset_id = event.get("dataset_id") if hf_user and dataset_id: key = (hf_user, dataset_id) # Append-only ledger: latest record is current state subscriptions[key] = event except json.JSONDecodeError: continue except Exception as e: print(f"Error reading ledger: {e}") return subscriptions def get_active_subscription(hf_user: str, dataset_id: str) -> Optional[Dict[str, Any]]: """Checks if a user has an active subscription to a dataset.""" all_subs = get_all_subscriptions() sub = all_subs.get((hf_user, dataset_id)) if not sub: return None # Check expiry subscription_end = sub.get("subscription_end") if not subscription_end: return None try: # Handle Z suffix if present if subscription_end.endswith("Z"): subscription_end = subscription_end[:-1] end_date = datetime.fromisoformat(subscription_end) if end_date > datetime.utcnow(): return sub except ValueError: print(f"Error parsing date: {subscription_end}") return None return None def get_user_subscriptions(hf_user: str) -> list: """Get all subscriptions for a specific user.""" all_subs = get_all_subscriptions() user_subs = [] for (user, dataset_id), sub in all_subs.items(): if user == hf_user: # Add active status end_str = sub.get("subscription_end", "") is_active = False if end_str: try: if end_str.endswith("Z"): end_str = end_str[:-1] end_date = datetime.fromisoformat(end_str) is_active = end_date > datetime.utcnow() except: pass user_subs.append({ **sub, "is_active": is_active }) return user_subs def generate_access_token() -> str: """Generate a secure random access token.""" return f"hfdata_{secrets.token_urlsafe(32)}" def validate_access_token(access_token: str) -> Optional[Dict[str, Any]]: """ Validate an access token and return the subscription info if valid. Returns None if token is invalid or subscription expired. """ all_subs = get_all_subscriptions() for (hf_user, dataset_id), sub in all_subs.items(): if sub.get("access_token") == access_token: # Check if subscription is still active end_str = sub.get("subscription_end", "") if end_str: try: if end_str.endswith("Z"): end_str = end_str[:-1] end_date = datetime.fromisoformat(end_str) if end_date > datetime.utcnow(): return { "hf_user": hf_user, "dataset_id": dataset_id, "subscription": sub } except: pass return None # Token found but subscription expired return None # Token not found def get_subscription_by_token(access_token: str) -> Optional[Dict[str, Any]]: """ Get subscription details by access token. Alias for validate_access_token for clarity. """ return validate_access_token(access_token)