Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import streamlit_authenticator as stauth | |
| import yaml | |
| from yaml.loader import SafeLoader | |
| import json | |
| import os | |
| import stat # <--- NEW IMPORT for permission handling | |
| import uuid | |
| from datetime import datetime | |
| import pytz | |
| from huggingface_hub import HfApi, hf_hub_download, snapshot_download, CommitScheduler | |
| from pathlib import Path | |
| import bcrypt | |
| # --- CONFIGURATION --- | |
| DATASET_REPO_ID = "NavyDevilDoc/navy-ai-logs" | |
| LOG_FILE = "usage_log.json" | |
| CONFIG_FILE = "config.yaml" | |
| CHROMA_ROOT = "chroma_db" | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| INVITE_CODE = os.getenv("INVITE_CODE", "CHANGE_ME_IN_SETTINGS") | |
| TIMEZONE = pytz.timezone("US/Eastern") | |
| # --- DATA PERSISTENCE SETUP --- | |
| LOCAL_DATA_DIR = Path("data_persistence") | |
| LOCAL_DATA_DIR.mkdir(exist_ok=True) | |
| scheduler = CommitScheduler( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| folder_path=LOCAL_DATA_DIR, | |
| path_in_repo=".", | |
| every=1, | |
| token=HF_TOKEN | |
| ) | |
| # --- PATH HELPERS --- | |
| def get_config_path(): | |
| return LOCAL_DATA_DIR / CONFIG_FILE | |
| def get_log_path(): | |
| return LOCAL_DATA_DIR / LOG_FILE | |
| # --- GENERIC FILE SYNC --- | |
| def download_config_if_needed(): | |
| if not HF_TOKEN: return | |
| try: | |
| hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename=CONFIG_FILE, | |
| repo_type="dataset", | |
| local_dir=LOCAL_DATA_DIR, | |
| token=HF_TOKEN, | |
| force_download=True | |
| ) | |
| print("✅ Config refreshed from cloud.") | |
| except Exception as e: | |
| print(f"⚠️ Cloud pull failed for config: {e}") | |
| def ensure_log_exists(): | |
| if not (LOCAL_DATA_DIR / LOG_FILE).exists(): | |
| try: | |
| hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename=LOG_FILE, | |
| repo_type="dataset", | |
| local_dir=LOCAL_DATA_DIR, | |
| token=HF_TOKEN | |
| ) | |
| except: | |
| with open(LOCAL_DATA_DIR / LOG_FILE, "w") as f: | |
| json.dump({}, f) | |
| # --- USER DB SYNC (THE FIX IS HERE) --- | |
| def download_user_db(username): | |
| """Restores ONLY the specific user's Knowledge Base and unlocks permissions.""" | |
| if not HF_TOKEN: return | |
| target_dir = os.path.dirname(os.path.abspath(__file__)) | |
| user_db_path = f"{CHROMA_ROOT}/{username}" | |
| try: | |
| print(f"📥 Syncing Knowledge Base for {username}...") | |
| snapshot_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| allow_patterns=[f"{user_db_path}/*"], | |
| local_dir=target_dir, | |
| token=HF_TOKEN | |
| ) | |
| # --- PERMISSION FIX --- | |
| # Force-add Write permissions to the downloaded folder and files. | |
| # This solves the "readonly database" (Error 1032). | |
| user_folder = Path(target_dir) / CHROMA_ROOT / username | |
| if user_folder.exists(): | |
| # 1. Unlock the User Directory itself | |
| current_mode = user_folder.stat().st_mode | |
| user_folder.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR) | |
| # 2. Unlock every file and subfolder inside | |
| for item in user_folder.rglob('*'): | |
| current_mode = item.stat().st_mode | |
| if item.is_dir(): | |
| # Directories need Execute (IX) to be traversable | |
| item.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR) | |
| else: | |
| # Files need Write (IW) to be modifiable | |
| item.chmod(current_mode | stat.S_IWUSR) | |
| print("✅ User Knowledge Base Restored & Unlocked.") | |
| except Exception as e: | |
| print(f"⚠️ New user or sync error: {e}") | |
| def upload_user_db(username): | |
| """Backs up ONLY the specific user's Knowledge Base.""" | |
| if not HF_TOKEN: return | |
| target_dir = os.path.dirname(os.path.abspath(__file__)) | |
| user_db_rel_path = os.path.join(CHROMA_ROOT, username) | |
| user_db_abs_path = os.path.join(target_dir, user_db_rel_path) | |
| if not os.path.exists(user_db_abs_path): | |
| return | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_folder( | |
| folder_path=user_db_abs_path, | |
| path_in_repo=user_db_rel_path, | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"KB Update ({username}): {datetime.now(TIMEZONE)}" | |
| ) | |
| print(f"✅ Knowledge Base Saved for {username}.") | |
| except Exception as e: | |
| print(f"⚠️ DB sync failed: {e}") | |
| # --- AUTHENTICATION --- | |
| def check_login(): | |
| download_config_if_needed() | |
| try: | |
| config_path = get_config_path() | |
| if not config_path.exists(): | |
| st.error(f"🚨 CRITICAL: Config not found at {config_path}") | |
| return False | |
| with open(config_path) as file: | |
| config = yaml.load(file, Loader=SafeLoader) | |
| except Exception as e: | |
| st.error(f"🚨 Config Error: {e}") | |
| return False | |
| authenticator = stauth.Authenticate( | |
| config['credentials'], | |
| config['cookie']['name'], | |
| config['cookie']['key'], | |
| config['cookie']['expiry_days'] | |
| ) | |
| authenticator.login(location='main') | |
| if st.session_state["authentication_status"]: | |
| username = st.session_state["username"] | |
| try: | |
| user_data = config['credentials']['usernames'].get(username, {}) | |
| user_roles = user_data.get('roles', []) | |
| except Exception as e: | |
| user_roles = [] | |
| st.session_state.roles = user_roles | |
| st.session_state.username = username | |
| st.session_state.name = st.session_state.get("name") | |
| st.session_state.authenticator = authenticator | |
| return True | |
| elif st.session_state["authentication_status"] is False: | |
| st.error('Username/password is incorrect') | |
| return False | |
| elif st.session_state["authentication_status"] is None: | |
| st.warning('Please enter your username and password') | |
| return False | |
| # --- REGISTRATION --- | |
| def register_user(new_email, new_username, new_name, new_password, invite_code): | |
| if invite_code != INVITE_CODE: | |
| return False, "Invalid Invite Code." | |
| download_config_if_needed() | |
| config_path = get_config_path() | |
| with scheduler.lock: | |
| with open(config_path) as file: | |
| config = yaml.load(file, Loader=SafeLoader) | |
| if new_username in config['credentials']['usernames']: | |
| return False, "Username already exists." | |
| hashed_bytes = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()) | |
| hashed_pwd = hashed_bytes.decode('utf-8') | |
| new_user_entry = { | |
| "email": new_email, | |
| "name": new_name, | |
| "password": hashed_pwd, | |
| "roles": ["user"] | |
| } | |
| config['credentials']['usernames'][new_username] = new_user_entry | |
| with open(config_path, 'w') as file: | |
| yaml.dump(config, file, default_flow_style=False) | |
| return True, "Account created! Please log in." | |
| # --- LOGGING --- | |
| def log_usage(model_name, input_tokens, output_tokens): | |
| ensure_log_exists() | |
| log_path = get_log_path() | |
| username = st.session_state.get("username", "anonymous") | |
| now_est = datetime.now(TIMEZONE) | |
| today = now_est.strftime("%Y-%m-%d") | |
| with scheduler.lock: | |
| data = {} | |
| if log_path.exists(): | |
| with open(log_path, "r") as f: | |
| try: | |
| data = json.load(f) | |
| except: | |
| data = {} | |
| if today not in data: | |
| data[today] = {"total_tokens": 0, "users": {}} | |
| if username not in data[today]["users"]: | |
| data[today]["users"][username] = {"input": 0, "output": 0, "calls": 0} | |
| data[today]["total_tokens"] += (input_tokens + output_tokens) | |
| data[today]["users"][username]["input"] += input_tokens | |
| data[today]["users"][username]["output"] += output_tokens | |
| data[today]["users"][username]["calls"] += 1 | |
| with open(log_path, "w") as f: | |
| json.dump(data, f, indent=2) | |
| def get_daily_stats(): | |
| ensure_log_exists() | |
| log_path = get_log_path() | |
| now_est = datetime.now(TIMEZONE) | |
| today = now_est.strftime("%Y-%m-%d") | |
| if log_path.exists(): | |
| with open(log_path, "r") as f: | |
| try: | |
| data = json.load(f) | |
| if today in data: | |
| return data[today] | |
| except: | |
| pass | |
| return {"total_tokens": 0, "users": {}} |