|
|
"""
|
|
|
Core validation logic module.
|
|
|
Contains authentication, RAG setup, and common validation routines.
|
|
|
"""
|
|
|
import os
|
|
|
import shutil
|
|
|
import hashlib
|
|
|
import logging
|
|
|
from typing import Optional, List, Dict, Any, Union
|
|
|
|
|
|
import streamlit as st
|
|
|
try:
|
|
|
from langchain_openai import AzureOpenAI
|
|
|
from langchain_openai import AzureOpenAIEmbeddings
|
|
|
from langchain_chroma import Chroma
|
|
|
from langchain.chains import RetrievalQA
|
|
|
AI_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
AI_AVAILABLE = False
|
|
|
|
|
|
AzureOpenAI = None
|
|
|
AzureOpenAIEmbeddings = None
|
|
|
Chroma = None
|
|
|
RetrievalQA = None
|
|
|
|
|
|
from core.settings import settings
|
|
|
|
|
|
|
|
|
try:
|
|
|
from db.database import (
|
|
|
get_user,
|
|
|
update_last_login,
|
|
|
save_chat_message as db_save_chat_message,
|
|
|
get_chat_history as db_get_chat_history,
|
|
|
log_audit_event,
|
|
|
log_analytics_event
|
|
|
)
|
|
|
DB_AVAILABLE = True
|
|
|
except ImportError:
|
|
|
DB_AVAILABLE = False
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
|
"""Hash a password using SHA-256."""
|
|
|
return hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
|
|
def get_default_users() -> Dict[str, Dict[str, str]]:
|
|
|
"""Return default users from settings (in-memory)."""
|
|
|
return {
|
|
|
"admin": {
|
|
|
"password": settings.ADMIN_PASSWORD,
|
|
|
"role": "admin"
|
|
|
},
|
|
|
"nurse": {
|
|
|
"password": settings.NURSE_PASSWORD,
|
|
|
"role": "nurse"
|
|
|
},
|
|
|
"clinician": {
|
|
|
"password": settings.CLINICIAN_PASSWORD,
|
|
|
"role": "clinician"
|
|
|
},
|
|
|
}
|
|
|
|
|
|
def authenticate_user(username: str, password: str) -> Optional[str]:
|
|
|
"""
|
|
|
Authenticate user against database (if enabled) or default in-memory users.
|
|
|
Returns the role if successful, None otherwise.
|
|
|
"""
|
|
|
if not username or not password:
|
|
|
logger.warning("Invalid authentication attempt - empty credentials")
|
|
|
return None
|
|
|
|
|
|
|
|
|
if settings.USE_DATABASE and DB_AVAILABLE:
|
|
|
try:
|
|
|
user = get_user(username)
|
|
|
if user:
|
|
|
password_hash = hash_password(password)
|
|
|
if user["password_hash"] == password_hash and user["is_active"]:
|
|
|
logger.info(f"User authenticated from database: {username}")
|
|
|
update_last_login(user["id"])
|
|
|
return user["role"]
|
|
|
else:
|
|
|
logger.warning(
|
|
|
f"Failed auth for DB user: {username} "
|
|
|
"(invalid password or inactive)"
|
|
|
)
|
|
|
return None
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Database authentication error", e)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
defaults = get_default_users()
|
|
|
if username in defaults:
|
|
|
if defaults[username]["password"] == password:
|
|
|
logger.info(f"User authenticated from defaults: {username}")
|
|
|
return defaults[username]["role"]
|
|
|
else:
|
|
|
logger.warning(f"Failed auth for default user: {username}")
|
|
|
return None
|
|
|
|
|
|
logger.warning(f"User not found: {username}")
|
|
|
return None
|
|
|
|
|
|
@st.cache_resource
|
|
|
def load_vector_db():
|
|
|
"""Load and cache ChromaDB vector database."""
|
|
|
if not AI_AVAILABLE:
|
|
|
logger.warning("AI modules not available - vector DB disabled")
|
|
|
return None
|
|
|
|
|
|
vector_db_path = settings.VECTOR_DB_PATH
|
|
|
local_db_path = settings.LOCAL_DB_PATH
|
|
|
|
|
|
if not os.path.exists(vector_db_path):
|
|
|
logger.warning(f"Vector database not found at {vector_db_path}")
|
|
|
return None
|
|
|
|
|
|
if not os.path.exists(local_db_path):
|
|
|
try:
|
|
|
logger.info(f"Copying vector DB to {local_db_path}")
|
|
|
shutil.copytree(vector_db_path, local_db_path, dirs_exist_ok=True)
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to copy vector DB", e)
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
logger.debug("Loading embeddings...")
|
|
|
embeddings = AzureOpenAIEmbeddings(
|
|
|
azure_deployment=settings.EMBEDDING_MODEL,
|
|
|
api_key=settings.AZURE_OPENAI_API_KEY,
|
|
|
azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
|
|
|
api_version=settings.AZURE_OPENAI_API_VERSION
|
|
|
)
|
|
|
logger.debug("Loading ChromaDB...")
|
|
|
db = Chroma(persist_directory=local_db_path, embedding_function=embeddings)
|
|
|
logger.info("Vector DB loaded successfully")
|
|
|
return db
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to load vector DB", e)
|
|
|
return None
|
|
|
|
|
|
def save_chat_message(username: str, role: str, content: str, chat_history_file: str = ".chat_history.json") -> bool:
|
|
|
"""Save chat message to database or JSON file."""
|
|
|
|
|
|
if settings.USE_DATABASE and DB_AVAILABLE:
|
|
|
try:
|
|
|
user = get_user(username)
|
|
|
if user:
|
|
|
db_save_chat_message(user["id"], role, content)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to save to database", e, level="warning")
|
|
|
|
|
|
|
|
|
try:
|
|
|
history_data = {}
|
|
|
if os.path.exists(chat_history_file):
|
|
|
import json
|
|
|
with open(chat_history_file, "r") as f:
|
|
|
history_data = json.load(f)
|
|
|
|
|
|
if username not in history_data:
|
|
|
history_data[username] = []
|
|
|
|
|
|
history_data[username].append({"role": role, "content": content})
|
|
|
|
|
|
import json
|
|
|
with open(chat_history_file, "w") as f:
|
|
|
json.dump(history_data, f, indent=2)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to save chat history to file", e)
|
|
|
return False
|
|
|
|
|
|
def load_chat_history(username: str, chat_history_file: str = ".chat_history.json") -> List[Dict[str, Any]]:
|
|
|
"""Load chat history from database or JSON."""
|
|
|
|
|
|
if settings.USE_DATABASE and DB_AVAILABLE:
|
|
|
try:
|
|
|
user = get_user(username)
|
|
|
if user:
|
|
|
messages = db_get_chat_history(user["id"], limit=100)
|
|
|
return [
|
|
|
{"role": msg["role"], "content": msg["content"]}
|
|
|
for msg in messages
|
|
|
]
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to load from database", e, level="warning")
|
|
|
|
|
|
|
|
|
try:
|
|
|
import json
|
|
|
if os.path.exists(chat_history_file):
|
|
|
with open(chat_history_file, "r") as f:
|
|
|
history_data = json.load(f)
|
|
|
return history_data.get(username, [])
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to load chat history from file", e, level="warning")
|
|
|
|
|
|
return []
|
|
|
|
|
|
def audit_log(username: str, action: str, details: Optional[Dict] = None):
|
|
|
"""Log user action to database if available, otherwise just logger."""
|
|
|
if settings.USE_DATABASE and DB_AVAILABLE:
|
|
|
try:
|
|
|
user = get_user(username)
|
|
|
if user:
|
|
|
log_audit_event(user["id"], action, changes=details)
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to log audit event to DB", e, level="warning")
|
|
|
|
|
|
logger.info(f"AUDIT - User: {username}, Action: {action}, Details: {details}")
|
|
|
|
|
|
def analytics_log(username: str, event_type: str, event_name: str, data: Optional[Dict] = None):
|
|
|
"""Log analytics event."""
|
|
|
if settings.USE_DATABASE and DB_AVAILABLE:
|
|
|
try:
|
|
|
user = get_user(username)
|
|
|
if user:
|
|
|
log_analytics_event(user["id"], event_type, event_name, data)
|
|
|
except Exception as e:
|
|
|
from core.safe_logging import log_exception_safe
|
|
|
log_exception_safe(logger, "Failed to log analytics to DB", e, level="warning")
|
|
|
|