import streamlit as st import streamlit_authenticator as stauth import yaml from yaml.loader import SafeLoader import json import os import uuid from datetime import datetime import pytz from huggingface_hub import HfApi, hf_hub_download, snapshot_download, CommitScheduler from pathlib import Path import bcrypt # --- CONFIGURATION --- DATASET_REPO_ID = "NavyDevilDoc/navy-ai-logs" LOG_FILE = "usage_log.json" CONFIG_FILE = "config.yaml" CHROMA_ROOT = "chroma_db" HF_TOKEN = os.getenv("HF_TOKEN") INVITE_CODE = os.getenv("INVITE_CODE", "CHANGE_ME_IN_SETTINGS") # Security Fix TIMEZONE = pytz.timezone("US/Eastern") # --- DATA PERSISTENCE SETUP (The Fix) --- # Create a local directory that the Scheduler will watch LOCAL_DATA_DIR = Path("data_persistence") LOCAL_DATA_DIR.mkdir(exist_ok=True) # Initialize the Scheduler # This runs in the background and pushes changes every 1 minute scheduler = CommitScheduler( repo_id=DATASET_REPO_ID, repo_type="dataset", folder_path=LOCAL_DATA_DIR, path_in_repo=".", # Sync to root of dataset every=1, # Sync every 1 minute token=HF_TOKEN ) # --- PATH HELPERS --- def get_config_path(): """Returns the path to the LOCAL config file in the persistence folder.""" return LOCAL_DATA_DIR / CONFIG_FILE def get_log_path(): """Returns the path to the LOCAL log file in the persistence folder.""" return LOCAL_DATA_DIR / LOG_FILE # --- GENERIC FILE SYNC (Cached!) --- @st.cache_data(ttl=60) # Only check cloud every 60 seconds def download_config_if_needed(): """Downloads config from HF only if cache is stale.""" if not HF_TOKEN: return try: hf_hub_download( repo_id=DATASET_REPO_ID, filename=CONFIG_FILE, repo_type="dataset", local_dir=LOCAL_DATA_DIR, # Download directly to watched folder token=HF_TOKEN, force_download=True ) print("✅ Config refreshed from cloud.") except Exception as e: print(f"⚠️ Cloud pull failed for config: {e}") # We don't cache logs because we need to write to them frequently def ensure_log_exists(): if not (LOCAL_DATA_DIR / LOG_FILE).exists(): try: hf_hub_download( repo_id=DATASET_REPO_ID, filename=LOG_FILE, repo_type="dataset", local_dir=LOCAL_DATA_DIR, token=HF_TOKEN ) except: # Create empty log if it doesn't exist on cloud yet with open(LOCAL_DATA_DIR / LOG_FILE, "w") as f: json.dump({}, f) # --- USER DB SYNC (For ChromaDB) --- def download_user_db(username): """Restores ONLY the specific user's Knowledge Base.""" if not HF_TOKEN: return target_dir = os.path.dirname(os.path.abspath(__file__)) user_db_path = f"{CHROMA_ROOT}/{username}" try: # We don't use the scheduler for ChromaDB yet (too large) # We stick to snapshot_download for now print(f"📥 Syncing Knowledge Base for {username}...") snapshot_download( repo_id=DATASET_REPO_ID, repo_type="dataset", allow_patterns=[f"{user_db_path}/*"], local_dir=target_dir, token=HF_TOKEN ) print("✅ User Knowledge Base Restored.") except Exception as e: print(f"⚠️ New user or sync error: {e}") def upload_user_db(username): """Backs up ONLY the specific user's Knowledge Base.""" if not HF_TOKEN: return target_dir = os.path.dirname(os.path.abspath(__file__)) user_db_rel_path = os.path.join(CHROMA_ROOT, username) user_db_abs_path = os.path.join(target_dir, user_db_rel_path) if not os.path.exists(user_db_abs_path): return try: api = HfApi(token=HF_TOKEN) api.upload_folder( folder_path=user_db_abs_path, path_in_repo=user_db_rel_path, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"KB Update ({username}): {datetime.now(TIMEZONE)}" ) print(f"✅ Knowledge Base Saved for {username}.") except Exception as e: print(f"⚠️ DB sync failed: {e}") # --- AUTHENTICATION --- def check_login(): # 1. Cached Download download_config_if_needed() try: config_path = get_config_path() if not config_path.exists(): st.error(f"🚨 CRITICAL: Config not found at {config_path}") return False with open(config_path) as file: config = yaml.load(file, Loader=SafeLoader) except Exception as e: st.error(f"🚨 Config Error: {e}") return False authenticator = stauth.Authenticate( config['credentials'], config['cookie']['name'], config['cookie']['key'], config['cookie']['expiry_days'] ) authenticator.login(location='main') if st.session_state["authentication_status"]: username = st.session_state["username"] try: user_data = config['credentials']['usernames'].get(username, {}) user_roles = user_data.get('roles', []) except Exception as e: user_roles = [] st.session_state.roles = user_roles st.session_state.username = username st.session_state.name = st.session_state.get("name") st.session_state.authenticator = authenticator return True elif st.session_state["authentication_status"] is False: st.error('Username/password is incorrect') return False elif st.session_state["authentication_status"] is None: st.warning('Please enter your username and password') return False # --- REGISTRATION --- def register_user(new_email, new_username, new_name, new_password, invite_code): if invite_code != INVITE_CODE: return False, "Invalid Invite Code." # Ensure we have the latest config before writing download_config_if_needed() config_path = get_config_path() # Lock the file for reading/writing # (The Scheduler handles the cloud sync, but we need to handle local consistency) with scheduler.lock: with open(config_path) as file: config = yaml.load(file, Loader=SafeLoader) if new_username in config['credentials']['usernames']: return False, "Username already exists." hashed_bytes = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) hashed_pwd = hashed_bytes.decode('utf-8') new_user_entry = { "email": new_email, "name": new_name, "password": hashed_pwd, "roles": ["user"] } config['credentials']['usernames'][new_username] = new_user_entry with open(config_path, 'w') as file: yaml.dump(config, file, default_flow_style=False) return True, "Account created! Please log in." # --- LOGGING --- def log_usage(model_name, input_tokens, output_tokens): ensure_log_exists() log_path = get_log_path() username = st.session_state.get("username", "anonymous") now_est = datetime.now(TIMEZONE) today = now_est.strftime("%Y-%m-%d") # Scheduler Lock guarantees atomic writes locally with scheduler.lock: data = {} if log_path.exists(): with open(log_path, "r") as f: try: data = json.load(f) except: data = {} if today not in data: data[today] = {"total_tokens": 0, "users": {}} if username not in data[today]["users"]: data[today]["users"][username] = {"input": 0, "output": 0, "calls": 0} data[today]["total_tokens"] += (input_tokens + output_tokens) data[today]["users"][username]["input"] += input_tokens data[today]["users"][username]["output"] += output_tokens data[today]["users"][username]["calls"] += 1 with open(log_path, "w") as f: json.dump(data, f, indent=2) # No need to call upload_file() manually! # The Scheduler detects the file change and uploads it automatically. def get_daily_stats(): ensure_log_exists() log_path = get_log_path() now_est = datetime.now(TIMEZONE) today = now_est.strftime("%Y-%m-%d") if log_path.exists(): with open(log_path, "r") as f: try: data = json.load(f) if today in data: return data[today] except: pass return {"total_tokens": 0, "users": {}}