import streamlit as st import streamlit_authenticator as stauth import yaml from yaml.loader import SafeLoader import json import os import stat # <--- NEW IMPORT for permission handling import uuid from datetime import datetime import pytz from huggingface_hub import HfApi, hf_hub_download, snapshot_download, CommitScheduler from pathlib import Path import bcrypt # --- CONFIGURATION --- DATASET_REPO_ID = "NavyDevilDoc/navy-ai-logs" LOG_FILE = "usage_log.json" CONFIG_FILE = "config.yaml" CHROMA_ROOT = "chroma_db" HF_TOKEN = os.getenv("HF_TOKEN") INVITE_CODE = os.getenv("INVITE_CODE", "CHANGE_ME_IN_SETTINGS") TIMEZONE = pytz.timezone("US/Eastern") # --- DATA PERSISTENCE SETUP --- LOCAL_DATA_DIR = Path("data_persistence") LOCAL_DATA_DIR.mkdir(exist_ok=True) scheduler = CommitScheduler( repo_id=DATASET_REPO_ID, repo_type="dataset", folder_path=LOCAL_DATA_DIR, path_in_repo=".", every=1, token=HF_TOKEN ) # --- PATH HELPERS --- def get_config_path(): return LOCAL_DATA_DIR / CONFIG_FILE def get_log_path(): return LOCAL_DATA_DIR / LOG_FILE # --- GENERIC FILE SYNC --- @st.cache_data(ttl=60) def download_config_if_needed(): if not HF_TOKEN: return try: hf_hub_download( repo_id=DATASET_REPO_ID, filename=CONFIG_FILE, repo_type="dataset", local_dir=LOCAL_DATA_DIR, token=HF_TOKEN, force_download=True ) print("✅ Config refreshed from cloud.") except Exception as e: print(f"⚠️ Cloud pull failed for config: {e}") def ensure_log_exists(): if not (LOCAL_DATA_DIR / LOG_FILE).exists(): try: hf_hub_download( repo_id=DATASET_REPO_ID, filename=LOG_FILE, repo_type="dataset", local_dir=LOCAL_DATA_DIR, token=HF_TOKEN ) except: with open(LOCAL_DATA_DIR / LOG_FILE, "w") as f: json.dump({}, f) # --- USER DB SYNC (THE FIX IS HERE) --- def download_user_db(username): """Restores ONLY the specific user's Knowledge Base and unlocks permissions.""" if not HF_TOKEN: return target_dir = os.path.dirname(os.path.abspath(__file__)) user_db_path = f"{CHROMA_ROOT}/{username}" try: print(f"📥 Syncing Knowledge Base for {username}...") snapshot_download( repo_id=DATASET_REPO_ID, repo_type="dataset", allow_patterns=[f"{user_db_path}/*"], local_dir=target_dir, token=HF_TOKEN ) # --- PERMISSION FIX --- # Force-add Write permissions to the downloaded folder and files. # This solves the "readonly database" (Error 1032). user_folder = Path(target_dir) / CHROMA_ROOT / username if user_folder.exists(): # 1. Unlock the User Directory itself current_mode = user_folder.stat().st_mode user_folder.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR) # 2. Unlock every file and subfolder inside for item in user_folder.rglob('*'): current_mode = item.stat().st_mode if item.is_dir(): # Directories need Execute (IX) to be traversable item.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR) else: # Files need Write (IW) to be modifiable item.chmod(current_mode | stat.S_IWUSR) print("✅ User Knowledge Base Restored & Unlocked.") except Exception as e: print(f"⚠️ New user or sync error: {e}") def upload_user_db(username): """Backs up ONLY the specific user's Knowledge Base.""" if not HF_TOKEN: return target_dir = os.path.dirname(os.path.abspath(__file__)) user_db_rel_path = os.path.join(CHROMA_ROOT, username) user_db_abs_path = os.path.join(target_dir, user_db_rel_path) if not os.path.exists(user_db_abs_path): return try: api = HfApi(token=HF_TOKEN) api.upload_folder( folder_path=user_db_abs_path, path_in_repo=user_db_rel_path, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"KB Update ({username}): {datetime.now(TIMEZONE)}" ) print(f"✅ Knowledge Base Saved for {username}.") except Exception as e: print(f"⚠️ DB sync failed: {e}") # --- AUTHENTICATION --- def check_login(): download_config_if_needed() try: config_path = get_config_path() if not config_path.exists(): st.error(f"🚨 CRITICAL: Config not found at {config_path}") return False with open(config_path) as file: config = yaml.load(file, Loader=SafeLoader) except Exception as e: st.error(f"🚨 Config Error: {e}") return False authenticator = stauth.Authenticate( config['credentials'], config['cookie']['name'], config['cookie']['key'], config['cookie']['expiry_days'] ) authenticator.login(location='main') if st.session_state["authentication_status"]: username = st.session_state["username"] try: user_data = config['credentials']['usernames'].get(username, {}) user_roles = user_data.get('roles', []) except Exception as e: user_roles = [] st.session_state.roles = user_roles st.session_state.username = username st.session_state.name = st.session_state.get("name") st.session_state.authenticator = authenticator return True elif st.session_state["authentication_status"] is False: st.error('Username/password is incorrect') return False elif st.session_state["authentication_status"] is None: st.warning('Please enter your username and password') return False # --- REGISTRATION --- def register_user(new_email, new_username, new_name, new_password, invite_code): if invite_code != INVITE_CODE: return False, "Invalid Invite Code." download_config_if_needed() config_path = get_config_path() with scheduler.lock: with open(config_path) as file: config = yaml.load(file, Loader=SafeLoader) if new_username in config['credentials']['usernames']: return False, "Username already exists." hashed_bytes = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) hashed_pwd = hashed_bytes.decode('utf-8') new_user_entry = { "email": new_email, "name": new_name, "password": hashed_pwd, "roles": ["user"] } config['credentials']['usernames'][new_username] = new_user_entry with open(config_path, 'w') as file: yaml.dump(config, file, default_flow_style=False) return True, "Account created! Please log in." # --- LOGGING --- def log_usage(model_name, input_tokens, output_tokens): ensure_log_exists() log_path = get_log_path() username = st.session_state.get("username", "anonymous") now_est = datetime.now(TIMEZONE) today = now_est.strftime("%Y-%m-%d") with scheduler.lock: data = {} if log_path.exists(): with open(log_path, "r") as f: try: data = json.load(f) except: data = {} if today not in data: data[today] = {"total_tokens": 0, "users": {}} if username not in data[today]["users"]: data[today]["users"][username] = {"input": 0, "output": 0, "calls": 0} data[today]["total_tokens"] += (input_tokens + output_tokens) data[today]["users"][username]["input"] += input_tokens data[today]["users"][username]["output"] += output_tokens data[today]["users"][username]["calls"] += 1 with open(log_path, "w") as f: json.dump(data, f, indent=2) def get_daily_stats(): ensure_log_exists() log_path = get_log_path() now_est = datetime.now(TIMEZONE) today = now_est.strftime("%Y-%m-%d") if log_path.exists(): with open(log_path, "r") as f: try: data = json.load(f) if today in data: return data[today] except: pass return {"total_tokens": 0, "users": {}}