legislation-tracker / user_management.py
ramanna's picture
Update user_management.py
72c0b00 verified
"""
User Management Module for HuggingFace-backed authentication
Stores user credentials in HuggingFace dataset for persistence
"""
import json
import os
from pathlib import Path
import requests
from huggingface_hub import HfApi
import streamlit as st
def get_secret(key: str, default=None):
"""Get a value from st.secrets if available, otherwise from environment."""
# Try Streamlit secrets (local / Streamlit Cloud)
try:
# Accessing st.secrets may raise if there is no secrets.toml
_ = st.secrets # just to trigger loading
if key in st.secrets:
return st.secrets[key]
except Exception:
pass
# Fallback: Hugging Face Spaces exposes them as env vars
return os.getenv(key, default)
def has_secret(key: str) -> bool:
"""Check if a key exists in st.secrets or environment."""
try:
_ = st.secrets
if key in st.secrets:
return True
except Exception:
pass
return key in os.environ
class HuggingFaceUserManager:
"""Manage users stored in HuggingFace dataset"""
def __init__(self):
"""Initialize the user manager for HuggingFace Spaces"""
# Get token + repo from secrets OR environment
self.token = get_secret("HUGGINGFACE_HUB_TOKEN")
if not self.token:
raise ValueError("Missing HUGGINGFACE_HUB_TOKEN in secrets/env")
repo = get_secret("dataset_repo")
if not repo:
raise ValueError("Missing dataset_repo in secrets/env")
self.repo_id = repo
self.users_file = "users.json"
self.api = HfApi()
def get_users_url(self):
"""Get the URL to the users.json file"""
return f"https://huggingface.co/datasets/{self.repo_id}/resolve/main/{self.users_file}"
def load_users(self, use_cache=True):
"""
Load users from HuggingFace dataset
"""
try:
url = self.get_users_url()
if not use_cache:
import time
url = f"{url}?t={int(time.time())}"
response = requests.get(url, timeout=30)
response.raise_for_status()
users_data = response.json()
if "usernames" not in users_data:
users_data = {"usernames": {}}
return users_data
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
print("users.json not found on HuggingFace, creating new structure")
return {"usernames": {}}
else:
raise
except Exception as e:
print(f"Error loading users from HuggingFace: {e}")
return {"usernames": {}}
def save_users(self, users_data):
"""
Save users to HuggingFace dataset
"""
temp_file = Path("temp_users.json")
try:
with temp_file.open("w", encoding="utf-8") as f:
json.dump(users_data, f, indent=2, ensure_ascii=False)
# Upload to HuggingFace
commit_info = self.api.upload_file(
path_or_fileobj=str(temp_file),
path_in_repo=self.users_file,
repo_id=self.repo_id,
repo_type="dataset",
token=self.token,
commit_message="Update user credentials",
)
temp_file.unlink()
if isinstance(commit_info, str):
return commit_info
elif hasattr(commit_info, "commit_url"):
return commit_info.commit_url
else:
return f"https://huggingface.co/datasets/{self.repo_id}"
except Exception as e:
if temp_file.exists():
temp_file.unlink()
raise Exception(f"Failed to save users to HuggingFace: {str(e)}")
def add_user(self, username, email, name, hashed_password):
"""
Add a new user
"""
try:
users_data = self.load_users()
if username in users_data["usernames"]:
return False, f"Username '{username}' already exists", None
users_data["usernames"][username] = {
"email": email,
"name": name,
"password": hashed_password,
}
commit_url = self.save_users(users_data)
return True, f"User '{username}' added successfully", commit_url
except Exception as e:
return False, f"Error adding user: {str(e)}", None
def remove_user(self, username):
"""
Remove a user
"""
try:
users_data = self.load_users()
if username not in users_data["usernames"]:
return False, f"Username '{username}' not found", None
del users_data["usernames"][username]
commit_url = self.save_users(users_data)
return True, f"User '{username}' removed successfully", commit_url
except Exception as e:
return False, f"Error removing user: {str(e)}", None
def update_user_password(self, username, new_hashed_password):
"""
Update a user's password
"""
try:
users_data = self.load_users()
if username not in users_data["usernames"]:
return False, f"Username '{username}' not found", None
users_data["usernames"][username]["password"] = new_hashed_password
commit_url = self.save_users(users_data)
return True, f"Password updated for '{username}'", commit_url
except Exception as e:
return False, f"Error updating password: {str(e)}", None
def update_user(self, username, new_email=None, new_name=None, new_password=None):
"""
Update a user's details
"""
try:
users_data = self.load_users()
if username not in users_data["usernames"]:
return False, f"Username '{username}' not found", None
if new_email is not None:
users_data["usernames"][username]["email"] = new_email
if new_name is not None:
users_data["usernames"][username]["name"] = new_name
if new_password is not None:
users_data["usernames"][username]["password"] = new_password
commit_url = self.save_users(users_data)
return True, f"User '{username}' updated successfully", commit_url
except Exception as e:
return False, f"Error updating user: {str(e)}", None
def get_config_for_authenticator(self):
"""
Get user data formatted for streamlit-authenticator
"""
users_data = self.load_users()
# Cookie config from flat secrets/env (not nested auth.cookie anymore)
cookie_config = {
"name": get_secret("COOKIE_NAME", "ai_tracker_auth_v2"),
"key": get_secret("COOKIE_KEY", "random_key"),
"expiry_days": int(get_secret("COOKIE_EXPIRY_DAYS", "30")),
}
return {
"credentials": users_data,
"cookie": cookie_config,
"preauthorized": {"emails": []},
}
@st.cache_data(ttl=300)
def load_user_config():
"""
Load user authentication config.
On Hugging Face Spaces, secrets come from environment variables.
Locally / on Streamlit Cloud, they come from st.secrets.
"""
# ---- 1. Try HuggingFace-managed users (if token + repo are set) ----
if has_secret("HUGGINGFACE_HUB_TOKEN") and has_secret("dataset_repo"):
try:
manager = HuggingFaceUserManager()
config = manager.get_config_for_authenticator()
return config, True
except Exception as e:
print("HF user manager failed:", e)
# ---- 2. Fallback: single local user from AUTH_* and COOKIE_* ----
required = [
"AUTH_EMAIL",
"AUTH_NAME",
"AUTH_PASSWORD_HASH",
"COOKIE_NAME",
"COOKIE_KEY",
"COOKIE_EXPIRY_DAYS",
]
if all(has_secret(k) for k in required):
credentials = {
"usernames": {
"ramanna": {
"email": get_secret("AUTH_EMAIL"),
"name": get_secret("AUTH_NAME"),
"password": get_secret("AUTH_PASSWORD_HASH"),
}
}
}
cookie = {
"name": get_secret("COOKIE_NAME"),
"key": get_secret("COOKIE_KEY"),
"expiry_days": int(get_secret("COOKIE_EXPIRY_DAYS")),
}
return {
"credentials": credentials,
"cookie": cookie,
"preauthorized": {"emails": []},
}, False
# ---- 3. Nothing configured ----
return None, False