File size: 14,114 Bytes
b5cb5bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import logging
import os
import re
from urllib.parse import quote_plus, urlparse
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional

from .email_service import EmailSendResult, EmailService, EmailMessagePayload
from .email_templates import WelcomeCredentialsEmailContext, build_welcome_credentials_email


logger = logging.getLogger("mathpulse")


VALID_ROLES = {"student", "teacher", "admin"}
VALID_STATUSES = {"active", "inactive"}
EMAIL_REGEX = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$")
PASSWORD_UPPER_REGEX = re.compile(r"[A-Z]")
PASSWORD_LOWER_REGEX = re.compile(r"[a-z]")
PASSWORD_DIGIT_REGEX = re.compile(r"\d")
PASSWORD_SPECIAL_REGEX = re.compile(r"[^A-Za-z0-9]")


@dataclass
class AdminCreateUserInput:
    name: str
    email: str
    password: str
    confirm_password: str
    role: str
    status: str
    grade: str
    section: str
    lrn: Optional[str] = None


@dataclass
class CreateUserAndNotifyResult:
    uid: str
    user_created: bool
    email_sent: bool
    result_code: str
    message: str
    warnings: List[str] = field(default_factory=list)
    email_result: Optional[EmailSendResult] = None


class UserProvisioningError(Exception):
    def __init__(self, code: str, message: str, status_code: int = 400) -> None:
        super().__init__(message)
        self.code = code
        self.message = message
        self.status_code = status_code


class UserProvisioningService:
    def __init__(
        self,
        *,
        firebase_auth_module: Any,
        firestore_module: Any,
        firestore_server_timestamp: Any,
        email_service: EmailService,
    ) -> None:
        self._firebase_auth_module = firebase_auth_module
        self._firestore_module = firestore_module
        self._firestore_server_timestamp = firestore_server_timestamp
        self._email_service = email_service

    def _ensure_dependencies(self) -> None:
        if self._firebase_auth_module is None:
            raise UserProvisioningError("auth_unavailable", "Firebase Auth service is unavailable.", 503)
        if self._firestore_module is None:
            raise UserProvisioningError("firestore_unavailable", "Firestore service is unavailable.", 503)

    def _normalize_role(self, role: str) -> str:
        normalized = (role or "").strip().lower()
        if normalized not in VALID_ROLES:
            raise UserProvisioningError("invalid_role", "Role must be Student, Teacher, or Admin.", 400)
        return normalized

    def _normalize_status(self, status: str) -> str:
        normalized = (status or "").strip().lower()
        if normalized not in VALID_STATUSES:
            raise UserProvisioningError("invalid_status", "Status must be Active or Inactive.", 400)
        return "Active" if normalized == "active" else "Inactive"

    def _validate_email(self, email: str) -> str:
        normalized = (email or "").strip().lower()
        if not normalized or not EMAIL_REGEX.match(normalized):
            raise UserProvisioningError("invalid_email", "Invalid email format.", 400)
        return normalized

    def _validate_password(self, password: str, confirm_password: str) -> str:
        value = password or ""
        if len(value) < 8:
            raise UserProvisioningError("weak_password", "Password must be at least 8 characters.", 400)
        if not PASSWORD_UPPER_REGEX.search(value):
            raise UserProvisioningError("weak_password", "Password must include at least one uppercase letter.", 400)
        if not PASSWORD_LOWER_REGEX.search(value):
            raise UserProvisioningError("weak_password", "Password must include at least one lowercase letter.", 400)
        if not PASSWORD_DIGIT_REGEX.search(value):
            raise UserProvisioningError("weak_password", "Password must include at least one number.", 400)
        if not PASSWORD_SPECIAL_REGEX.search(value):
            raise UserProvisioningError("weak_password", "Password must include at least one special character.", 400)
        if value != (confirm_password or ""):
            raise UserProvisioningError("password_mismatch", "Password and confirm password do not match.", 400)
        return value

    @staticmethod
    def _auth_user_not_found(error: Exception) -> bool:
        message = str(error).lower()
        return "not found" in message or "no user record" in message

    @staticmethod
    def _slugify(value: str) -> str:
        token = re.sub(r"[^a-z0-9]+", "_", (value or "").strip().lower())
        return re.sub(r"_+", "_", token).strip("_")

    @staticmethod
    def _build_default_avatar_url(display_name: str) -> str:
        return f"https://ui-avatars.com/api/?name={quote_plus(display_name or 'User')}&background=0d9488&color=fff"

    @staticmethod
    def _derive_brand_avatar_url(login_url: str) -> str:
        configured = (os.getenv("APP_BRAND_AVATAR_URL", "") or "").strip()
        if configured:
            return configured

        parsed = urlparse(login_url or "")
        if parsed.scheme in {"http", "https"} and parsed.netloc:
            return f"{parsed.scheme}://{parsed.netloc}/avatar/avatar_icon.png"

        return "https://mathpulse.ai/avatar/avatar_icon.png"

    def _ensure_no_duplicate_email(self, email: str, firestore_client: Any) -> None:
        try:
            self._firebase_auth_module.get_user_by_email(email)
            raise UserProvisioningError("duplicate_email", "A user with this email already exists.", 409)
        except UserProvisioningError:
            raise
        except Exception as auth_lookup_error:
            if not self._auth_user_not_found(auth_lookup_error):
                logger.warning("Auth duplicate lookup failed for %s: %s", email, auth_lookup_error)
                raise UserProvisioningError("auth_lookup_failed", "Unable to verify duplicate email in Auth.", 503)

        try:
            existing_docs = list(
                firestore_client.collection("users").where("email", "==", email).limit(1).stream()
            )
            if existing_docs:
                raise UserProvisioningError("duplicate_email", "A user profile with this email already exists.", 409)
        except UserProvisioningError:
            raise
        except Exception as firestore_lookup_error:
            logger.warning("Firestore duplicate lookup failed for %s: %s", email, firestore_lookup_error)
            raise UserProvisioningError("firestore_lookup_failed", "Unable to verify duplicate email in Firestore.", 503)

    def _build_profile_payload(self, user_input: AdminCreateUserInput, role_lower: str, normalized_status: str) -> Dict[str, Any]:
        display_name = (user_input.name or "").strip()
        grade = (user_input.grade or "").strip() or "Grade 11"
        section = (user_input.section or "").strip() or "Section A"
        class_section_id = self._slugify(f"{grade}_{section}") or "grade_11_section_a"

        payload: Dict[str, Any] = {
            "name": display_name,
            "email": (user_input.email or "").strip().lower(),
            "role": role_lower,
            "status": normalized_status,
            "grade": grade,
            "section": section,
            "classSectionId": class_section_id,
            "forcePasswordChange": True,
            "photo": self._build_default_avatar_url(display_name),
            "updatedAt": self._firestore_server_timestamp,
        }

        if role_lower == "student":
            lrn = (user_input.lrn or "").strip()
            if not lrn:
                raise UserProvisioningError("missing_lrn", "LRN is required for student accounts.", 400)
            payload.update(
                {
                    "lrn": lrn,
                    "level": 1,
                    "currentXP": 0,
                    "totalXP": 0,
                    "atRiskSubjects": [],
                    "hasTakenDiagnostic": False,
                }
            )
        elif role_lower == "teacher":
            payload.update(
                {
                    "department": f"{grade} - {section}",
                    "teacherId": f"TCH-{self._slugify(payload['email'])}",
                    "subject": "Mathematics",
                    "yearsOfExperience": "0",
                    "qualification": "",
                    "students": [],
                }
            )
        else:
            payload.update(
                {
                    "department": f"{grade} - {section}",
                    "adminId": f"ADM-{self._slugify(payload['email'])}",
                    "position": "Administrator",
                }
            )

        return payload

    def create_user(self, user_input: AdminCreateUserInput) -> str:
        self._ensure_dependencies()

        if not (user_input.name or "").strip():
            raise UserProvisioningError("missing_name", "Name is required.", 400)

        normalized_email = self._validate_email(user_input.email)
        validated_password = self._validate_password(user_input.password, user_input.confirm_password)
        role_lower = self._normalize_role(user_input.role)
        normalized_status = self._normalize_status(user_input.status)

        firestore_client = self._firestore_module.client()
        self._ensure_no_duplicate_email(normalized_email, firestore_client)

        try:
            created_auth_user = self._firebase_auth_module.create_user(
                email=normalized_email,
                password=validated_password,
                display_name=(user_input.name or "").strip(),
                disabled=(normalized_status == "Inactive"),
            )
        except Exception as auth_create_error:
            logger.error("Auth user creation failed for %s: %s", normalized_email, auth_create_error)
            auth_error_text = str(auth_create_error)
            auth_error_text_lower = auth_error_text.lower()

            if "password_does_not_meet_requirements" in auth_error_text_lower or "password requirements" in auth_error_text_lower:
                raise UserProvisioningError(
                    "weak_password",
                    "Password does not meet authentication policy requirements.",
                    400,
                )

            if "email already exists" in auth_error_text_lower or "email_exists" in auth_error_text_lower:
                raise UserProvisioningError("duplicate_email", "A user with this email already exists.", 409)

            raise UserProvisioningError("auth_create_failed", "Failed to create authentication account.", 500)

        uid = str(getattr(created_auth_user, "uid", "") or "").strip()
        if not uid:
            raise UserProvisioningError("missing_uid", "Authentication account created without UID.", 500)

        profile_payload = self._build_profile_payload(user_input, role_lower, normalized_status)
        profile_payload["createdAt"] = self._firestore_server_timestamp

        try:
            firestore_client.collection("users").document(uid).set(profile_payload, merge=True)
        except Exception as firestore_write_error:
            logger.error("Firestore profile write failed for %s: %s", uid, firestore_write_error)
            try:
                self._firebase_auth_module.delete_user(uid)
                logger.info("Rolled back Auth user creation for %s after Firestore write failure.", uid)
            except Exception as rollback_error:
                logger.warning(
                    "Failed to roll back Auth user %s after Firestore write failure: %s",
                    uid,
                    rollback_error,
                )
            raise UserProvisioningError("profile_write_failed", "Failed to create user profile in Firestore.", 500)

        return uid

    def send_welcome_credentials_email(self, user_input: AdminCreateUserInput) -> EmailSendResult:
        display_name = (user_input.name or "").strip()
        login_url = (os.getenv("APP_LOGIN_URL", "") or "").strip() or "https://mathpulse.ai"
        brand_avatar_url = self._derive_brand_avatar_url(login_url)
        recipient_avatar_url = self._build_default_avatar_url(display_name)

        template = build_welcome_credentials_email(
            WelcomeCredentialsEmailContext(
                recipient_name=display_name,
                login_email=(user_input.email or "").strip().lower(),
                temporary_password=user_input.password,
                role=(user_input.role or "").strip().title(),
                login_url=login_url,
                brand_avatar_url=brand_avatar_url,
                recipient_avatar_url=recipient_avatar_url,
            )
        )

        message = EmailMessagePayload(
            to_name=display_name,
            to_email=(user_input.email or "").strip().lower(),
            subject=template["subject"],
            html_content=template["html"],
            text_content=template["text"],
        )
        return self._email_service.send_transactional_email(message)

    def create_user_and_notify(self, user_input: AdminCreateUserInput) -> CreateUserAndNotifyResult:
        uid = self.create_user(user_input)
        warnings: List[str] = []

        email_result = self.send_welcome_credentials_email(user_input)
        if email_result.success:
            return CreateUserAndNotifyResult(
                uid=uid,
                user_created=True,
                email_sent=True,
                result_code="created_and_emailed",
                message="User account was created and welcome email was sent.",
                warnings=warnings,
                email_result=email_result,
            )

        warnings.append("User was created but welcome email delivery failed.")
        if email_result.error_message:
            warnings.append(email_result.error_message)

        return CreateUserAndNotifyResult(
            uid=uid,
            user_created=True,
            email_sent=False,
            result_code="created_email_failed",
            message="User account was created, but welcome email failed to send.",
            warnings=warnings,
            email_result=email_result,
        )