Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| """ | |
| User Management Module for HuggingFace-backed authentication | |
| Stores user credentials in HuggingFace dataset for persistence | |
| """ | |
| import json | |
| import os | |
| from pathlib import Path | |
| import requests | |
| from huggingface_hub import HfApi | |
| import streamlit as st | |
| def get_secret(key: str, default=None): | |
| """Get a value from st.secrets if available, otherwise from environment.""" | |
| # Try Streamlit secrets (local / Streamlit Cloud) | |
| try: | |
| # Accessing st.secrets may raise if there is no secrets.toml | |
| _ = st.secrets # just to trigger loading | |
| if key in st.secrets: | |
| return st.secrets[key] | |
| except Exception: | |
| pass | |
| # Fallback: Hugging Face Spaces exposes them as env vars | |
| return os.getenv(key, default) | |
| def has_secret(key: str) -> bool: | |
| """Check if a key exists in st.secrets or environment.""" | |
| try: | |
| _ = st.secrets | |
| if key in st.secrets: | |
| return True | |
| except Exception: | |
| pass | |
| return key in os.environ | |
| class HuggingFaceUserManager: | |
| """Manage users stored in HuggingFace dataset""" | |
| def __init__(self): | |
| """Initialize the user manager for HuggingFace Spaces""" | |
| # Get token + repo from secrets OR environment | |
| self.token = get_secret("HUGGINGFACE_HUB_TOKEN") | |
| if not self.token: | |
| raise ValueError("Missing HUGGINGFACE_HUB_TOKEN in secrets/env") | |
| repo = get_secret("dataset_repo") | |
| if not repo: | |
| raise ValueError("Missing dataset_repo in secrets/env") | |
| self.repo_id = repo | |
| self.users_file = "users.json" | |
| self.api = HfApi() | |
| def get_users_url(self): | |
| """Get the URL to the users.json file""" | |
| return f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{self.users_file}" | |
| def load_users(self, use_cache=True): | |
| """ | |
| Load users from HuggingFace dataset | |
| """ | |
| try: | |
| url = self.get_users_url() | |
| if not use_cache: | |
| import time | |
| url = f"{url}?t={int(time.time())}" | |
| response = requests.get(url, timeout=30) | |
| response.raise_for_status() | |
| users_data = response.json() | |
| if "usernames" not in users_data: | |
| users_data = {"usernames": {}} | |
| return users_data | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 404: | |
| print("users.json not found on HuggingFace, creating new structure") | |
| return {"usernames": {}} | |
| else: | |
| raise | |
| except Exception as e: | |
| print(f"Error loading users from HuggingFace: {e}") | |
| return {"usernames": {}} | |
| def save_users(self, users_data): | |
| """ | |
| Save users to HuggingFace dataset | |
| """ | |
| temp_file = Path("temp_users.json") | |
| try: | |
| with temp_file.open("w", encoding="utf-8") as f: | |
| json.dump(users_data, f, indent=2, ensure_ascii=False) | |
| # Upload to HuggingFace | |
| commit_info = self.api.upload_file( | |
| path_or_fileobj=str(temp_file), | |
| path_in_repo=self.users_file, | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| token=self.token, | |
| commit_message="Update user credentials", | |
| ) | |
| temp_file.unlink() | |
| if isinstance(commit_info, str): | |
| return commit_info | |
| elif hasattr(commit_info, "commit_url"): | |
| return commit_info.commit_url | |
| else: | |
| return f"https://huggingface.co/datasets/{self.repo_id}" | |
| except Exception as e: | |
| if temp_file.exists(): | |
| temp_file.unlink() | |
| raise Exception(f"Failed to save users to HuggingFace: {str(e)}") | |
| def add_user(self, username, email, name, hashed_password): | |
| """ | |
| Add a new user | |
| """ | |
| try: | |
| users_data = self.load_users() | |
| if username in users_data["usernames"]: | |
| return False, f"Username '{username}' already exists", None | |
| users_data["usernames"][username] = { | |
| "email": email, | |
| "name": name, | |
| "password": hashed_password, | |
| } | |
| commit_url = self.save_users(users_data) | |
| return True, f"User '{username}' added successfully", commit_url | |
| except Exception as e: | |
| return False, f"Error adding user: {str(e)}", None | |
| def remove_user(self, username): | |
| """ | |
| Remove a user | |
| """ | |
| try: | |
| users_data = self.load_users() | |
| if username not in users_data["usernames"]: | |
| return False, f"Username '{username}' not found", None | |
| del users_data["usernames"][username] | |
| commit_url = self.save_users(users_data) | |
| return True, f"User '{username}' removed successfully", commit_url | |
| except Exception as e: | |
| return False, f"Error removing user: {str(e)}", None | |
| def update_user_password(self, username, new_hashed_password): | |
| """ | |
| Update a user's password | |
| """ | |
| try: | |
| users_data = self.load_users() | |
| if username not in users_data["usernames"]: | |
| return False, f"Username '{username}' not found", None | |
| users_data["usernames"][username]["password"] = new_hashed_password | |
| commit_url = self.save_users(users_data) | |
| return True, f"Password updated for '{username}'", commit_url | |
| except Exception as e: | |
| return False, f"Error updating password: {str(e)}", None | |
| def update_user(self, username, new_email=None, new_name=None, new_password=None): | |
| """ | |
| Update a user's details | |
| """ | |
| try: | |
| users_data = self.load_users() | |
| if username not in users_data["usernames"]: | |
| return False, f"Username '{username}' not found", None | |
| if new_email is not None: | |
| users_data["usernames"][username]["email"] = new_email | |
| if new_name is not None: | |
| users_data["usernames"][username]["name"] = new_name | |
| if new_password is not None: | |
| users_data["usernames"][username]["password"] = new_password | |
| commit_url = self.save_users(users_data) | |
| return True, f"User '{username}' updated successfully", commit_url | |
| except Exception as e: | |
| return False, f"Error updating user: {str(e)}", None | |
| def get_config_for_authenticator(self): | |
| """ | |
| Get user data formatted for streamlit-authenticator | |
| """ | |
| users_data = self.load_users() | |
| # Cookie config from flat secrets/env (not nested auth.cookie anymore) | |
| cookie_config = { | |
| "name": get_secret("COOKIE_NAME", "ai_tracker_auth_v2"), | |
| "key": get_secret("COOKIE_KEY", "random_key"), | |
| "expiry_days": int(get_secret("COOKIE_EXPIRY_DAYS", "30")), | |
| } | |
| return { | |
| "credentials": users_data, | |
| "cookie": cookie_config, | |
| "preauthorized": {"emails": []}, | |
| } | |
| def load_user_config(): | |
| """ | |
| Load user authentication config. | |
| On Hugging Face Spaces, secrets come from environment variables. | |
| Locally / on Streamlit Cloud, they come from st.secrets. | |
| """ | |
| # ---- 1. Try HuggingFace-managed users (if token + repo are set) ---- | |
| if has_secret("HUGGINGFACE_HUB_TOKEN") and has_secret("dataset_repo"): | |
| try: | |
| manager = HuggingFaceUserManager() | |
| config = manager.get_config_for_authenticator() | |
| return config, True | |
| except Exception as e: | |
| print("HF user manager failed:", e) | |
| # ---- 2. Fallback: single local user from AUTH_* and COOKIE_* ---- | |
| required = [ | |
| "AUTH_EMAIL", | |
| "AUTH_NAME", | |
| "AUTH_PASSWORD_HASH", | |
| "COOKIE_NAME", | |
| "COOKIE_KEY", | |
| "COOKIE_EXPIRY_DAYS", | |
| ] | |
| if all(has_secret(k) for k in required): | |
| credentials = { | |
| "usernames": { | |
| "ramanna": { | |
| "email": get_secret("AUTH_EMAIL"), | |
| "name": get_secret("AUTH_NAME"), | |
| "password": get_secret("AUTH_PASSWORD_HASH"), | |
| } | |
| } | |
| } | |
| cookie = { | |
| "name": get_secret("COOKIE_NAME"), | |
| "key": get_secret("COOKIE_KEY"), | |
| "expiry_days": int(get_secret("COOKIE_EXPIRY_DAYS")), | |
| } | |
| return { | |
| "credentials": credentials, | |
| "cookie": cookie, | |
| "preauthorized": {"emails": []}, | |
| }, False | |
| # ---- 3. Nothing configured ---- | |
| return None, False | |