File size: 6,678 Bytes
d0ace1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
657a21e
d0ace1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4be0f5d
d0ace1e
 
4be0f5d
 
 
 
 
 
 
 
d0ace1e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
database/auth_store.py — User authentication and profile storage.

Manages patient and doctor accounts in SQLite.
Passwords are hashed with bcrypt.

Tables:
  users    — account credentials + role (patient / doctor)
  profiles — patient medical profile (age, gender, conditions)
"""

import os
import sqlite3
import hashlib
import secrets
from datetime import datetime

DB_PATH = '/data/sessions.db' if os.path.isdir('/data') else './data/sessions.db'


def _hash_password(password: str, salt: str) -> str:
    return hashlib.sha256((salt + password).encode()).hexdigest()


class AuthStore:
    def __init__(self, db_path: str = DB_PATH):
        self.db_path = db_path
        os.makedirs(os.path.dirname(os.path.abspath(db_path)), exist_ok=True)
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id           INTEGER PRIMARY KEY AUTOINCREMENT,
                    username     TEXT    NOT NULL UNIQUE,
                    password_hash TEXT   NOT NULL,
                    salt         TEXT    NOT NULL,
                    role         TEXT    NOT NULL DEFAULT 'patient',
                    full_name    TEXT    DEFAULT '',
                    created_at   TEXT    NOT NULL
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS patient_profiles (
                    user_id      INTEGER PRIMARY KEY,
                    age          INTEGER DEFAULT 0,
                    gender       TEXT    DEFAULT 'male',
                    respiratory_condition INTEGER DEFAULT 0,
                    smoking      INTEGER DEFAULT 0,
                    notes        TEXT    DEFAULT '',
                    updated_at   TEXT    NOT NULL,
                    FOREIGN KEY (user_id) REFERENCES users(id)
                )
            """)
            conn.commit()

        # Create default doctor account if none exists
        if not self.get_user_by_username('doctor'):
            self.register_user('doctor', 'doctor123', 'doctor', 'Dr. Admin')

    def register_user(self, username: str, password: str,
                      role: str = 'patient', full_name: str = '') -> dict:
        """Register a new user. Returns dict with success/error."""
        salt = secrets.token_hex(16)
        pw_hash = _hash_password(password, salt)
        try:
            with sqlite3.connect(self.db_path) as conn:
                cursor = conn.execute("""
                    INSERT INTO users (username, password_hash, salt, role, full_name, created_at)
                    VALUES (?, ?, ?, ?, ?, ?)
                """, (username.strip().lower(), pw_hash, salt, role,
                      full_name.strip(), datetime.now().isoformat()))
                user_id = cursor.lastrowid

                if role == 'patient':
                    conn.execute("""
                        INSERT INTO patient_profiles (user_id, updated_at)
                        VALUES (?, ?)
                    """, (user_id, datetime.now().isoformat()))

                conn.commit()
            return {'success': True, 'user_id': user_id}
        except sqlite3.IntegrityError:
            return {'success': False, 'error': 'Username already exists'}

    def login(self, username: str, password: str) -> dict | None:
        """Verify credentials. Returns user dict or None."""
        user = self.get_user_by_username(username)
        if not user:
            return None
        pw_hash = _hash_password(password, user['salt'])
        if pw_hash == user['password_hash']:
            return {k: v for k, v in user.items()
                    if k not in ('password_hash', 'salt')}
        return None

    def get_user_by_username(self, username: str) -> dict | None:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT * FROM users WHERE username = ?",
                (username.strip().lower(),)
            )
            row = cursor.fetchone()
            if not row:
                return None
            cols = [d[0] for d in cursor.description]
            return dict(zip(cols, row))

    def get_user_by_id(self, user_id: int) -> dict | None:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT * FROM users WHERE id = ?", (user_id,)
            )
            row = cursor.fetchone()
            if not row:
                return None
            cols = [d[0] for d in cursor.description]
            return dict(zip(cols, row))

    def get_profile(self, user_id: int) -> dict | None:
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT * FROM patient_profiles WHERE user_id = ?", (user_id,)
            )
            row = cursor.fetchone()
            if not row:
                return None
            cols = [d[0] for d in cursor.description]
            return dict(zip(cols, row))

    def update_profile(self, user_id: int, age: int, gender: str,
                       respiratory_condition: bool, smoking: bool,
                       notes: str = '') -> bool:
        now = datetime.now().isoformat()
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                INSERT INTO patient_profiles (user_id, age, gender, respiratory_condition, smoking, notes, updated_at)
                VALUES (?, ?, ?, ?, ?, ?, ?)
                ON CONFLICT(user_id) DO UPDATE SET
                    age=excluded.age, gender=excluded.gender,
                    respiratory_condition=excluded.respiratory_condition,
                    smoking=excluded.smoking, notes=excluded.notes,
                    updated_at=excluded.updated_at
            """, (user_id, age, gender, int(respiratory_condition), int(smoking), notes, now))
            conn.commit()
        return True

    def get_all_patients(self) -> list:
        """Get all patient accounts with their profiles (for doctor view)."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("""
                SELECT u.id, u.username, u.full_name, u.created_at,
                       p.age, p.gender, p.respiratory_condition, p.smoking, p.notes
                FROM users u
                LEFT JOIN patient_profiles p ON u.id = p.user_id
                WHERE u.role = 'patient'
                ORDER BY u.full_name
            """)
            rows = cursor.fetchall()
            cols = [d[0] for d in cursor.description]
        return [dict(zip(cols, row)) for row in rows]