NurseCitizenDeveloper's picture
Deploy Open Nursing Validator (Docker)
6d12932 verified
"""
Core validation logic module.
Contains authentication, RAG setup, and common validation routines.
"""
import os
import shutil
import hashlib
import logging
from typing import Optional, List, Dict, Any, Union
import streamlit as st
try:
from langchain_openai import AzureOpenAI
from langchain_openai import AzureOpenAIEmbeddings
from langchain_chroma import Chroma
from langchain.chains import RetrievalQA
AI_AVAILABLE = True
except ImportError:
AI_AVAILABLE = False
# Define dummy classes or None to prevent NameError if used in type hints or default args
AzureOpenAI = None
AzureOpenAIEmbeddings = None
Chroma = None
RetrievalQA = None
from core.settings import settings
# Attempt to import DB modules
try:
from db.database import (
get_user,
update_last_login,
save_chat_message as db_save_chat_message,
get_chat_history as db_get_chat_history,
log_audit_event,
log_analytics_event
)
DB_AVAILABLE = True
except ImportError:
DB_AVAILABLE = False
logger = logging.getLogger(__name__)
def hash_password(password: str) -> str:
"""Hash a password using SHA-256."""
return hashlib.sha256(password.encode()).hexdigest()
def get_default_users() -> Dict[str, Dict[str, str]]:
"""Return default users from settings (in-memory)."""
return {
"admin": {
"password": settings.ADMIN_PASSWORD,
"role": "admin"
},
"nurse": {
"password": settings.NURSE_PASSWORD,
"role": "nurse"
},
"clinician": {
"password": settings.CLINICIAN_PASSWORD,
"role": "clinician"
},
}
def authenticate_user(username: str, password: str) -> Optional[str]:
"""
Authenticate user against database (if enabled) or default in-memory users.
Returns the role if successful, None otherwise.
"""
if not username or not password:
logger.warning("Invalid authentication attempt - empty credentials")
return None
# 1. Try Database
if settings.USE_DATABASE and DB_AVAILABLE:
try:
user = get_user(username)
if user:
password_hash = hash_password(password)
if user["password_hash"] == password_hash and user["is_active"]:
logger.info(f"User authenticated from database: {username}")
update_last_login(user["id"])
return user["role"]
else:
logger.warning(
f"Failed auth for DB user: {username} "
"(invalid password or inactive)"
)
return None
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Database authentication error", e)
# Fall through to defaults if DB fails (optional behavior, strictly speaking should fail closed,
# but legacy app.py allowed fallback. For security, maybe we should stop here if DB is configured.)
# However, prompt implies 'Phase 2' supports fallback. I will keep fallback logic but log it.
# 2. Fallback to Default Users
defaults = get_default_users()
if username in defaults:
if defaults[username]["password"] == password:
logger.info(f"User authenticated from defaults: {username}")
return defaults[username]["role"]
else:
logger.warning(f"Failed auth for default user: {username}")
return None
logger.warning(f"User not found: {username}")
return None
@st.cache_resource
def load_vector_db():
"""Load and cache ChromaDB vector database."""
if not AI_AVAILABLE:
logger.warning("AI modules not available - vector DB disabled")
return None
vector_db_path = settings.VECTOR_DB_PATH
local_db_path = settings.LOCAL_DB_PATH
if not os.path.exists(vector_db_path):
logger.warning(f"Vector database not found at {vector_db_path}")
return None
if not os.path.exists(local_db_path):
try:
logger.info(f"Copying vector DB to {local_db_path}")
shutil.copytree(vector_db_path, local_db_path, dirs_exist_ok=True)
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to copy vector DB", e)
return None
try:
logger.debug("Loading embeddings...")
embeddings = AzureOpenAIEmbeddings(
azure_deployment=settings.EMBEDDING_MODEL,
api_key=settings.AZURE_OPENAI_API_KEY,
azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
api_version=settings.AZURE_OPENAI_API_VERSION
)
logger.debug("Loading ChromaDB...")
db = Chroma(persist_directory=local_db_path, embedding_function=embeddings)
logger.info("Vector DB loaded successfully")
return db
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to load vector DB", e)
return None
def save_chat_message(username: str, role: str, content: str, chat_history_file: str = ".chat_history.json") -> bool:
"""Save chat message to database or JSON file."""
# 1. Try Database
if settings.USE_DATABASE and DB_AVAILABLE:
try:
user = get_user(username)
if user:
db_save_chat_message(user["id"], role, content)
return True
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to save to database", e, level="warning")
# 2. Fallback to JSON
try:
history_data = {}
if os.path.exists(chat_history_file):
import json
with open(chat_history_file, "r") as f:
history_data = json.load(f)
if username not in history_data:
history_data[username] = []
history_data[username].append({"role": role, "content": content})
import json
with open(chat_history_file, "w") as f:
json.dump(history_data, f, indent=2)
return True
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to save chat history to file", e)
return False
def load_chat_history(username: str, chat_history_file: str = ".chat_history.json") -> List[Dict[str, Any]]:
"""Load chat history from database or JSON."""
# 1. Try Database
if settings.USE_DATABASE and DB_AVAILABLE:
try:
user = get_user(username)
if user:
messages = db_get_chat_history(user["id"], limit=100)
return [
{"role": msg["role"], "content": msg["content"]}
for msg in messages
]
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to load from database", e, level="warning")
# 2. Fallback to JSON
try:
import json
if os.path.exists(chat_history_file):
with open(chat_history_file, "r") as f:
history_data = json.load(f)
return history_data.get(username, [])
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to load chat history from file", e, level="warning")
return []
def audit_log(username: str, action: str, details: Optional[Dict] = None):
"""Log user action to database if available, otherwise just logger."""
if settings.USE_DATABASE and DB_AVAILABLE:
try:
user = get_user(username)
if user:
log_audit_event(user["id"], action, changes=details)
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to log audit event to DB", e, level="warning")
logger.info(f"AUDIT - User: {username}, Action: {action}, Details: {details}")
def analytics_log(username: str, event_type: str, event_name: str, data: Optional[Dict] = None):
"""Log analytics event."""
if settings.USE_DATABASE and DB_AVAILABLE:
try:
user = get_user(username)
if user:
log_analytics_event(user["id"], event_type, event_name, data)
except Exception as e:
from core.safe_logging import log_exception_safe
log_exception_safe(logger, "Failed to log analytics to DB", e, level="warning")