File size: 9,011 Bytes
6d12932
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""

Core validation logic module.

Contains authentication, RAG setup, and common validation routines.

"""
import os
import shutil
import hashlib
import logging
from typing import Optional, List, Dict, Any, Union

import streamlit as st
try:
    from langchain_openai import AzureOpenAI
    from langchain_openai import AzureOpenAIEmbeddings
    from langchain_chroma import Chroma
    from langchain.chains import RetrievalQA
    AI_AVAILABLE = True
except ImportError:
    AI_AVAILABLE = False
    # Define dummy classes or None to prevent NameError if used in type hints or default args
    AzureOpenAI = None
    AzureOpenAIEmbeddings = None
    Chroma = None
    RetrievalQA = None

from core.settings import settings

# Attempt to import DB modules
try:
    from db.database import (
        get_user,
        update_last_login,
        save_chat_message as db_save_chat_message,
        get_chat_history as db_get_chat_history,
        log_audit_event,
        log_analytics_event
    )
    DB_AVAILABLE = True
except ImportError:
    DB_AVAILABLE = False

logger = logging.getLogger(__name__)

def hash_password(password: str) -> str:
    """Hash a password using SHA-256."""
    return hashlib.sha256(password.encode()).hexdigest()

def get_default_users() -> Dict[str, Dict[str, str]]:
    """Return default users from settings (in-memory)."""
    return {
        "admin": {
            "password": settings.ADMIN_PASSWORD,
            "role": "admin"
        },
        "nurse": {
            "password": settings.NURSE_PASSWORD,
            "role": "nurse"
        },
        "clinician": {
            "password": settings.CLINICIAN_PASSWORD,
            "role": "clinician"
        },
    }

def authenticate_user(username: str, password: str) -> Optional[str]:
    """

    Authenticate user against database (if enabled) or default in-memory users.

    Returns the role if successful, None otherwise.

    """
    if not username or not password:
        logger.warning("Invalid authentication attempt - empty credentials")
        return None

    # 1. Try Database
    if settings.USE_DATABASE and DB_AVAILABLE:
        try:
            user = get_user(username)
            if user:
                password_hash = hash_password(password)
                if user["password_hash"] == password_hash and user["is_active"]:
                    logger.info(f"User authenticated from database: {username}")
                    update_last_login(user["id"])
                    return user["role"]
                else:
                    logger.warning(
                        f"Failed auth for DB user: {username} "
                        "(invalid password or inactive)"
                    )
                    return None
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Database authentication error", e)
            # Fall through to defaults if DB fails (optional behavior, strictly speaking should fail closed,
            # but legacy app.py allowed fallback. For security, maybe we should stop here if DB is configured.)
            # However, prompt implies 'Phase 2' supports fallback. I will keep fallback logic but log it.

    # 2. Fallback to Default Users
    defaults = get_default_users()
    if username in defaults:
        if defaults[username]["password"] == password:
            logger.info(f"User authenticated from defaults: {username}")
            return defaults[username]["role"]
        else:
            logger.warning(f"Failed auth for default user: {username}")
            return None

    logger.warning(f"User not found: {username}")
    return None

@st.cache_resource
def load_vector_db():
    """Load and cache ChromaDB vector database."""
    if not AI_AVAILABLE:
        logger.warning("AI modules not available - vector DB disabled")
        return None

    vector_db_path = settings.VECTOR_DB_PATH
    local_db_path = settings.LOCAL_DB_PATH

    if not os.path.exists(vector_db_path):
        logger.warning(f"Vector database not found at {vector_db_path}")
        return None

    if not os.path.exists(local_db_path):
        try:
            logger.info(f"Copying vector DB to {local_db_path}")
            shutil.copytree(vector_db_path, local_db_path, dirs_exist_ok=True)
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Failed to copy vector DB", e)
            return None

    try:
        logger.debug("Loading embeddings...")
        embeddings = AzureOpenAIEmbeddings(
            azure_deployment=settings.EMBEDDING_MODEL,
            api_key=settings.AZURE_OPENAI_API_KEY,
            azure_endpoint=settings.AZURE_OPENAI_ENDPOINT,
            api_version=settings.AZURE_OPENAI_API_VERSION
        )
        logger.debug("Loading ChromaDB...")
        db = Chroma(persist_directory=local_db_path, embedding_function=embeddings)
        logger.info("Vector DB loaded successfully")
        return db
    except Exception as e:
        from core.safe_logging import log_exception_safe
        log_exception_safe(logger, "Failed to load vector DB", e)
        return None

def save_chat_message(username: str, role: str, content: str, chat_history_file: str = ".chat_history.json") -> bool:
    """Save chat message to database or JSON file."""
    # 1. Try Database
    if settings.USE_DATABASE and DB_AVAILABLE:
        try:
            user = get_user(username)
            if user:
                db_save_chat_message(user["id"], role, content)
                return True
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Failed to save to database", e, level="warning")

    # 2. Fallback to JSON
    try:
        history_data = {}
        if os.path.exists(chat_history_file):
            import json
            with open(chat_history_file, "r") as f:
                history_data = json.load(f)

        if username not in history_data:
            history_data[username] = []

        history_data[username].append({"role": role, "content": content})

        import json
        with open(chat_history_file, "w") as f:
            json.dump(history_data, f, indent=2)
        return True
    except Exception as e:
        from core.safe_logging import log_exception_safe
        log_exception_safe(logger, "Failed to save chat history to file", e)
        return False

def load_chat_history(username: str, chat_history_file: str = ".chat_history.json") -> List[Dict[str, Any]]:
    """Load chat history from database or JSON."""
    # 1. Try Database
    if settings.USE_DATABASE and DB_AVAILABLE:
        try:
            user = get_user(username)
            if user:
                messages = db_get_chat_history(user["id"], limit=100)
                return [
                    {"role": msg["role"], "content": msg["content"]}
                    for msg in messages
                ]
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Failed to load from database", e, level="warning")

    # 2. Fallback to JSON
    try:
        import json
        if os.path.exists(chat_history_file):
            with open(chat_history_file, "r") as f:
                history_data = json.load(f)
                return history_data.get(username, [])
    except Exception as e:
        from core.safe_logging import log_exception_safe
        log_exception_safe(logger, "Failed to load chat history from file", e, level="warning")

    return []

def audit_log(username: str, action: str, details: Optional[Dict] = None):
    """Log user action to database if available, otherwise just logger."""
    if settings.USE_DATABASE and DB_AVAILABLE:
        try:
            user = get_user(username)
            if user:
                log_audit_event(user["id"], action, changes=details)
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Failed to log audit event to DB", e, level="warning")

    logger.info(f"AUDIT - User: {username}, Action: {action}, Details: {details}")

def analytics_log(username: str, event_type: str, event_name: str, data: Optional[Dict] = None):
    """Log analytics event."""
    if settings.USE_DATABASE and DB_AVAILABLE:
        try:
            user = get_user(username)
            if user:
                log_analytics_event(user["id"], event_type, event_name, data)
        except Exception as e:
            from core.safe_logging import log_exception_safe
            log_exception_safe(logger, "Failed to log analytics to DB", e, level="warning")