AI_Toolkit / src /tracker.py
NavyDevilDoc's picture
Update src/tracker.py
9e01cfe verified
import streamlit as st
import streamlit_authenticator as stauth
import yaml
from yaml.loader import SafeLoader
import json
import os
import stat # <--- NEW IMPORT for permission handling
import uuid
from datetime import datetime
import pytz
from huggingface_hub import HfApi, hf_hub_download, snapshot_download, CommitScheduler
from pathlib import Path
import bcrypt
# --- CONFIGURATION ---
DATASET_REPO_ID = "NavyDevilDoc/navy-ai-logs"
LOG_FILE = "usage_log.json"
CONFIG_FILE = "config.yaml"
CHROMA_ROOT = "chroma_db"
HF_TOKEN = os.getenv("HF_TOKEN")
INVITE_CODE = os.getenv("INVITE_CODE", "CHANGE_ME_IN_SETTINGS")
TIMEZONE = pytz.timezone("US/Eastern")
# --- DATA PERSISTENCE SETUP ---
LOCAL_DATA_DIR = Path("data_persistence")
LOCAL_DATA_DIR.mkdir(exist_ok=True)
scheduler = CommitScheduler(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
folder_path=LOCAL_DATA_DIR,
path_in_repo=".",
every=1,
token=HF_TOKEN
)
# --- PATH HELPERS ---
def get_config_path():
return LOCAL_DATA_DIR / CONFIG_FILE
def get_log_path():
return LOCAL_DATA_DIR / LOG_FILE
# --- GENERIC FILE SYNC ---
@st.cache_data(ttl=60)
def download_config_if_needed():
if not HF_TOKEN: return
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=CONFIG_FILE,
repo_type="dataset",
local_dir=LOCAL_DATA_DIR,
token=HF_TOKEN,
force_download=True
)
print("✅ Config refreshed from cloud.")
except Exception as e:
print(f"⚠️ Cloud pull failed for config: {e}")
def ensure_log_exists():
if not (LOCAL_DATA_DIR / LOG_FILE).exists():
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=LOG_FILE,
repo_type="dataset",
local_dir=LOCAL_DATA_DIR,
token=HF_TOKEN
)
except:
with open(LOCAL_DATA_DIR / LOG_FILE, "w") as f:
json.dump({}, f)
# --- USER DB SYNC (THE FIX IS HERE) ---
def download_user_db(username):
"""Restores ONLY the specific user's Knowledge Base and unlocks permissions."""
if not HF_TOKEN: return
target_dir = os.path.dirname(os.path.abspath(__file__))
user_db_path = f"{CHROMA_ROOT}/{username}"
try:
print(f"📥 Syncing Knowledge Base for {username}...")
snapshot_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
allow_patterns=[f"{user_db_path}/*"],
local_dir=target_dir,
token=HF_TOKEN
)
# --- PERMISSION FIX ---
# Force-add Write permissions to the downloaded folder and files.
# This solves the "readonly database" (Error 1032).
user_folder = Path(target_dir) / CHROMA_ROOT / username
if user_folder.exists():
# 1. Unlock the User Directory itself
current_mode = user_folder.stat().st_mode
user_folder.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR)
# 2. Unlock every file and subfolder inside
for item in user_folder.rglob('*'):
current_mode = item.stat().st_mode
if item.is_dir():
# Directories need Execute (IX) to be traversable
item.chmod(current_mode | stat.S_IWUSR | stat.S_IXUSR)
else:
# Files need Write (IW) to be modifiable
item.chmod(current_mode | stat.S_IWUSR)
print("✅ User Knowledge Base Restored & Unlocked.")
except Exception as e:
print(f"⚠️ New user or sync error: {e}")
def upload_user_db(username):
"""Backs up ONLY the specific user's Knowledge Base."""
if not HF_TOKEN: return
target_dir = os.path.dirname(os.path.abspath(__file__))
user_db_rel_path = os.path.join(CHROMA_ROOT, username)
user_db_abs_path = os.path.join(target_dir, user_db_rel_path)
if not os.path.exists(user_db_abs_path):
return
try:
api = HfApi(token=HF_TOKEN)
api.upload_folder(
folder_path=user_db_abs_path,
path_in_repo=user_db_rel_path,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"KB Update ({username}): {datetime.now(TIMEZONE)}"
)
print(f"✅ Knowledge Base Saved for {username}.")
except Exception as e:
print(f"⚠️ DB sync failed: {e}")
# --- AUTHENTICATION ---
def check_login():
download_config_if_needed()
try:
config_path = get_config_path()
if not config_path.exists():
st.error(f"🚨 CRITICAL: Config not found at {config_path}")
return False
with open(config_path) as file:
config = yaml.load(file, Loader=SafeLoader)
except Exception as e:
st.error(f"🚨 Config Error: {e}")
return False
authenticator = stauth.Authenticate(
config['credentials'],
config['cookie']['name'],
config['cookie']['key'],
config['cookie']['expiry_days']
)
authenticator.login(location='main')
if st.session_state["authentication_status"]:
username = st.session_state["username"]
try:
user_data = config['credentials']['usernames'].get(username, {})
user_roles = user_data.get('roles', [])
except Exception as e:
user_roles = []
st.session_state.roles = user_roles
st.session_state.username = username
st.session_state.name = st.session_state.get("name")
st.session_state.authenticator = authenticator
return True
elif st.session_state["authentication_status"] is False:
st.error('Username/password is incorrect')
return False
elif st.session_state["authentication_status"] is None:
st.warning('Please enter your username and password')
return False
# --- REGISTRATION ---
def register_user(new_email, new_username, new_name, new_password, invite_code):
if invite_code != INVITE_CODE:
return False, "Invalid Invite Code."
download_config_if_needed()
config_path = get_config_path()
with scheduler.lock:
with open(config_path) as file:
config = yaml.load(file, Loader=SafeLoader)
if new_username in config['credentials']['usernames']:
return False, "Username already exists."
hashed_bytes = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
hashed_pwd = hashed_bytes.decode('utf-8')
new_user_entry = {
"email": new_email,
"name": new_name,
"password": hashed_pwd,
"roles": ["user"]
}
config['credentials']['usernames'][new_username] = new_user_entry
with open(config_path, 'w') as file:
yaml.dump(config, file, default_flow_style=False)
return True, "Account created! Please log in."
# --- LOGGING ---
def log_usage(model_name, input_tokens, output_tokens):
ensure_log_exists()
log_path = get_log_path()
username = st.session_state.get("username", "anonymous")
now_est = datetime.now(TIMEZONE)
today = now_est.strftime("%Y-%m-%d")
with scheduler.lock:
data = {}
if log_path.exists():
with open(log_path, "r") as f:
try:
data = json.load(f)
except:
data = {}
if today not in data:
data[today] = {"total_tokens": 0, "users": {}}
if username not in data[today]["users"]:
data[today]["users"][username] = {"input": 0, "output": 0, "calls": 0}
data[today]["total_tokens"] += (input_tokens + output_tokens)
data[today]["users"][username]["input"] += input_tokens
data[today]["users"][username]["output"] += output_tokens
data[today]["users"][username]["calls"] += 1
with open(log_path, "w") as f:
json.dump(data, f, indent=2)
def get_daily_stats():
ensure_log_exists()
log_path = get_log_path()
now_est = datetime.now(TIMEZONE)
today = now_est.strftime("%Y-%m-%d")
if log_path.exists():
with open(log_path, "r") as f:
try:
data = json.load(f)
if today in data:
return data[today]
except:
pass
return {"total_tokens": 0, "users": {}}