AI_Toolkit / src /tracker.py
NavyDevilDoc's picture
Upload 4 files
3755446 verified
raw
history blame
8.98 kB
import streamlit as st
import streamlit_authenticator as stauth
import yaml
from yaml.loader import SafeLoader
import json
import os
import uuid
from datetime import datetime
import pytz
from huggingface_hub import HfApi, hf_hub_download, snapshot_download, CommitScheduler
from pathlib import Path
import bcrypt
# --- CONFIGURATION ---
DATASET_REPO_ID = "NavyDevilDoc/navy-ai-logs"
LOG_FILE = "usage_log.json"
CONFIG_FILE = "config.yaml"
CHROMA_ROOT = "chroma_db"
HF_TOKEN = os.getenv("HF_TOKEN")
INVITE_CODE = os.getenv("INVITE_CODE", "CHANGE_ME_IN_SETTINGS") # Security Fix
TIMEZONE = pytz.timezone("US/Eastern")
# --- DATA PERSISTENCE SETUP (The Fix) ---
# Create a local directory that the Scheduler will watch
LOCAL_DATA_DIR = Path("data_persistence")
LOCAL_DATA_DIR.mkdir(exist_ok=True)
# Initialize the Scheduler
# This runs in the background and pushes changes every 1 minute
scheduler = CommitScheduler(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
folder_path=LOCAL_DATA_DIR,
path_in_repo=".", # Sync to root of dataset
every=1, # Sync every 1 minute
token=HF_TOKEN
)
# --- PATH HELPERS ---
def get_config_path():
"""Returns the path to the LOCAL config file in the persistence folder."""
return LOCAL_DATA_DIR / CONFIG_FILE
def get_log_path():
"""Returns the path to the LOCAL log file in the persistence folder."""
return LOCAL_DATA_DIR / LOG_FILE
# --- GENERIC FILE SYNC (Cached!) ---
@st.cache_data(ttl=60) # Only check cloud every 60 seconds
def download_config_if_needed():
"""Downloads config from HF only if cache is stale."""
if not HF_TOKEN: return
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=CONFIG_FILE,
repo_type="dataset",
local_dir=LOCAL_DATA_DIR, # Download directly to watched folder
token=HF_TOKEN,
force_download=True
)
print("✅ Config refreshed from cloud.")
except Exception as e:
print(f"⚠️ Cloud pull failed for config: {e}")
# We don't cache logs because we need to write to them frequently
def ensure_log_exists():
if not (LOCAL_DATA_DIR / LOG_FILE).exists():
try:
hf_hub_download(
repo_id=DATASET_REPO_ID,
filename=LOG_FILE,
repo_type="dataset",
local_dir=LOCAL_DATA_DIR,
token=HF_TOKEN
)
except:
# Create empty log if it doesn't exist on cloud yet
with open(LOCAL_DATA_DIR / LOG_FILE, "w") as f:
json.dump({}, f)
# --- USER DB SYNC (For ChromaDB) ---
def download_user_db(username):
"""Restores ONLY the specific user's Knowledge Base."""
if not HF_TOKEN: return
target_dir = os.path.dirname(os.path.abspath(__file__))
user_db_path = f"{CHROMA_ROOT}/{username}"
try:
# We don't use the scheduler for ChromaDB yet (too large)
# We stick to snapshot_download for now
print(f"📥 Syncing Knowledge Base for {username}...")
snapshot_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
allow_patterns=[f"{user_db_path}/*"],
local_dir=target_dir,
token=HF_TOKEN
)
print("✅ User Knowledge Base Restored.")
except Exception as e:
print(f"⚠️ New user or sync error: {e}")
def upload_user_db(username):
"""Backs up ONLY the specific user's Knowledge Base."""
if not HF_TOKEN: return
target_dir = os.path.dirname(os.path.abspath(__file__))
user_db_rel_path = os.path.join(CHROMA_ROOT, username)
user_db_abs_path = os.path.join(target_dir, user_db_rel_path)
if not os.path.exists(user_db_abs_path):
return
try:
api = HfApi(token=HF_TOKEN)
api.upload_folder(
folder_path=user_db_abs_path,
path_in_repo=user_db_rel_path,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"KB Update ({username}): {datetime.now(TIMEZONE)}"
)
print(f"✅ Knowledge Base Saved for {username}.")
except Exception as e:
print(f"⚠️ DB sync failed: {e}")
# --- AUTHENTICATION ---
def check_login():
# 1. Cached Download
download_config_if_needed()
try:
config_path = get_config_path()
if not config_path.exists():
st.error(f"🚨 CRITICAL: Config not found at {config_path}")
return False
with open(config_path) as file:
config = yaml.load(file, Loader=SafeLoader)
except Exception as e:
st.error(f"🚨 Config Error: {e}")
return False
authenticator = stauth.Authenticate(
config['credentials'],
config['cookie']['name'],
config['cookie']['key'],
config['cookie']['expiry_days']
)
authenticator.login(location='main')
if st.session_state["authentication_status"]:
username = st.session_state["username"]
try:
user_data = config['credentials']['usernames'].get(username, {})
user_roles = user_data.get('roles', [])
except Exception as e:
user_roles = []
st.session_state.roles = user_roles
st.session_state.username = username
st.session_state.name = st.session_state.get("name")
st.session_state.authenticator = authenticator
return True
elif st.session_state["authentication_status"] is False:
st.error('Username/password is incorrect')
return False
elif st.session_state["authentication_status"] is None:
st.warning('Please enter your username and password')
return False
# --- REGISTRATION ---
def register_user(new_email, new_username, new_name, new_password, invite_code):
if invite_code != INVITE_CODE:
return False, "Invalid Invite Code."
# Ensure we have the latest config before writing
download_config_if_needed()
config_path = get_config_path()
# Lock the file for reading/writing
# (The Scheduler handles the cloud sync, but we need to handle local consistency)
with scheduler.lock:
with open(config_path) as file:
config = yaml.load(file, Loader=SafeLoader)
if new_username in config['credentials']['usernames']:
return False, "Username already exists."
hashed_bytes = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt())
hashed_pwd = hashed_bytes.decode('utf-8')
new_user_entry = {
"email": new_email,
"name": new_name,
"password": hashed_pwd,
"roles": ["user"]
}
config['credentials']['usernames'][new_username] = new_user_entry
with open(config_path, 'w') as file:
yaml.dump(config, file, default_flow_style=False)
return True, "Account created! Please log in."
# --- LOGGING ---
def log_usage(model_name, input_tokens, output_tokens):
ensure_log_exists()
log_path = get_log_path()
username = st.session_state.get("username", "anonymous")
now_est = datetime.now(TIMEZONE)
today = now_est.strftime("%Y-%m-%d")
# Scheduler Lock guarantees atomic writes locally
with scheduler.lock:
data = {}
if log_path.exists():
with open(log_path, "r") as f:
try:
data = json.load(f)
except:
data = {}
if today not in data:
data[today] = {"total_tokens": 0, "users": {}}
if username not in data[today]["users"]:
data[today]["users"][username] = {"input": 0, "output": 0, "calls": 0}
data[today]["total_tokens"] += (input_tokens + output_tokens)
data[today]["users"][username]["input"] += input_tokens
data[today]["users"][username]["output"] += output_tokens
data[today]["users"][username]["calls"] += 1
with open(log_path, "w") as f:
json.dump(data, f, indent=2)
# No need to call upload_file() manually!
# The Scheduler detects the file change and uploads it automatically.
def get_daily_stats():
ensure_log_exists()
log_path = get_log_path()
now_est = datetime.now(TIMEZONE)
today = now_est.strftime("%Y-%m-%d")
if log_path.exists():
with open(log_path, "r") as f:
try:
data = json.load(f)
if today in data:
return data[today]
except:
pass
return {"total_tokens": 0, "users": {}}