Spaces:
Running
Running
File size: 14,114 Bytes
b5cb5bb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 | import logging
import os
import re
from urllib.parse import quote_plus, urlparse
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from .email_service import EmailSendResult, EmailService, EmailMessagePayload
from .email_templates import WelcomeCredentialsEmailContext, build_welcome_credentials_email
logger = logging.getLogger("mathpulse")
VALID_ROLES = {"student", "teacher", "admin"}
VALID_STATUSES = {"active", "inactive"}
EMAIL_REGEX = re.compile(r"^[^\s@]+@[^\s@]+\.[^\s@]+$")
PASSWORD_UPPER_REGEX = re.compile(r"[A-Z]")
PASSWORD_LOWER_REGEX = re.compile(r"[a-z]")
PASSWORD_DIGIT_REGEX = re.compile(r"\d")
PASSWORD_SPECIAL_REGEX = re.compile(r"[^A-Za-z0-9]")
@dataclass
class AdminCreateUserInput:
name: str
email: str
password: str
confirm_password: str
role: str
status: str
grade: str
section: str
lrn: Optional[str] = None
@dataclass
class CreateUserAndNotifyResult:
uid: str
user_created: bool
email_sent: bool
result_code: str
message: str
warnings: List[str] = field(default_factory=list)
email_result: Optional[EmailSendResult] = None
class UserProvisioningError(Exception):
def __init__(self, code: str, message: str, status_code: int = 400) -> None:
super().__init__(message)
self.code = code
self.message = message
self.status_code = status_code
class UserProvisioningService:
def __init__(
self,
*,
firebase_auth_module: Any,
firestore_module: Any,
firestore_server_timestamp: Any,
email_service: EmailService,
) -> None:
self._firebase_auth_module = firebase_auth_module
self._firestore_module = firestore_module
self._firestore_server_timestamp = firestore_server_timestamp
self._email_service = email_service
def _ensure_dependencies(self) -> None:
if self._firebase_auth_module is None:
raise UserProvisioningError("auth_unavailable", "Firebase Auth service is unavailable.", 503)
if self._firestore_module is None:
raise UserProvisioningError("firestore_unavailable", "Firestore service is unavailable.", 503)
def _normalize_role(self, role: str) -> str:
normalized = (role or "").strip().lower()
if normalized not in VALID_ROLES:
raise UserProvisioningError("invalid_role", "Role must be Student, Teacher, or Admin.", 400)
return normalized
def _normalize_status(self, status: str) -> str:
normalized = (status or "").strip().lower()
if normalized not in VALID_STATUSES:
raise UserProvisioningError("invalid_status", "Status must be Active or Inactive.", 400)
return "Active" if normalized == "active" else "Inactive"
def _validate_email(self, email: str) -> str:
normalized = (email or "").strip().lower()
if not normalized or not EMAIL_REGEX.match(normalized):
raise UserProvisioningError("invalid_email", "Invalid email format.", 400)
return normalized
def _validate_password(self, password: str, confirm_password: str) -> str:
value = password or ""
if len(value) < 8:
raise UserProvisioningError("weak_password", "Password must be at least 8 characters.", 400)
if not PASSWORD_UPPER_REGEX.search(value):
raise UserProvisioningError("weak_password", "Password must include at least one uppercase letter.", 400)
if not PASSWORD_LOWER_REGEX.search(value):
raise UserProvisioningError("weak_password", "Password must include at least one lowercase letter.", 400)
if not PASSWORD_DIGIT_REGEX.search(value):
raise UserProvisioningError("weak_password", "Password must include at least one number.", 400)
if not PASSWORD_SPECIAL_REGEX.search(value):
raise UserProvisioningError("weak_password", "Password must include at least one special character.", 400)
if value != (confirm_password or ""):
raise UserProvisioningError("password_mismatch", "Password and confirm password do not match.", 400)
return value
@staticmethod
def _auth_user_not_found(error: Exception) -> bool:
message = str(error).lower()
return "not found" in message or "no user record" in message
@staticmethod
def _slugify(value: str) -> str:
token = re.sub(r"[^a-z0-9]+", "_", (value or "").strip().lower())
return re.sub(r"_+", "_", token).strip("_")
@staticmethod
def _build_default_avatar_url(display_name: str) -> str:
return f"https://ui-avatars.com/api/?name={quote_plus(display_name or 'User')}&background=0d9488&color=fff"
@staticmethod
def _derive_brand_avatar_url(login_url: str) -> str:
configured = (os.getenv("APP_BRAND_AVATAR_URL", "") or "").strip()
if configured:
return configured
parsed = urlparse(login_url or "")
if parsed.scheme in {"http", "https"} and parsed.netloc:
return f"{parsed.scheme}://{parsed.netloc}/avatar/avatar_icon.png"
return "https://mathpulse.ai/avatar/avatar_icon.png"
def _ensure_no_duplicate_email(self, email: str, firestore_client: Any) -> None:
try:
self._firebase_auth_module.get_user_by_email(email)
raise UserProvisioningError("duplicate_email", "A user with this email already exists.", 409)
except UserProvisioningError:
raise
except Exception as auth_lookup_error:
if not self._auth_user_not_found(auth_lookup_error):
logger.warning("Auth duplicate lookup failed for %s: %s", email, auth_lookup_error)
raise UserProvisioningError("auth_lookup_failed", "Unable to verify duplicate email in Auth.", 503)
try:
existing_docs = list(
firestore_client.collection("users").where("email", "==", email).limit(1).stream()
)
if existing_docs:
raise UserProvisioningError("duplicate_email", "A user profile with this email already exists.", 409)
except UserProvisioningError:
raise
except Exception as firestore_lookup_error:
logger.warning("Firestore duplicate lookup failed for %s: %s", email, firestore_lookup_error)
raise UserProvisioningError("firestore_lookup_failed", "Unable to verify duplicate email in Firestore.", 503)
def _build_profile_payload(self, user_input: AdminCreateUserInput, role_lower: str, normalized_status: str) -> Dict[str, Any]:
display_name = (user_input.name or "").strip()
grade = (user_input.grade or "").strip() or "Grade 11"
section = (user_input.section or "").strip() or "Section A"
class_section_id = self._slugify(f"{grade}_{section}") or "grade_11_section_a"
payload: Dict[str, Any] = {
"name": display_name,
"email": (user_input.email or "").strip().lower(),
"role": role_lower,
"status": normalized_status,
"grade": grade,
"section": section,
"classSectionId": class_section_id,
"forcePasswordChange": True,
"photo": self._build_default_avatar_url(display_name),
"updatedAt": self._firestore_server_timestamp,
}
if role_lower == "student":
lrn = (user_input.lrn or "").strip()
if not lrn:
raise UserProvisioningError("missing_lrn", "LRN is required for student accounts.", 400)
payload.update(
{
"lrn": lrn,
"level": 1,
"currentXP": 0,
"totalXP": 0,
"atRiskSubjects": [],
"hasTakenDiagnostic": False,
}
)
elif role_lower == "teacher":
payload.update(
{
"department": f"{grade} - {section}",
"teacherId": f"TCH-{self._slugify(payload['email'])}",
"subject": "Mathematics",
"yearsOfExperience": "0",
"qualification": "",
"students": [],
}
)
else:
payload.update(
{
"department": f"{grade} - {section}",
"adminId": f"ADM-{self._slugify(payload['email'])}",
"position": "Administrator",
}
)
return payload
def create_user(self, user_input: AdminCreateUserInput) -> str:
self._ensure_dependencies()
if not (user_input.name or "").strip():
raise UserProvisioningError("missing_name", "Name is required.", 400)
normalized_email = self._validate_email(user_input.email)
validated_password = self._validate_password(user_input.password, user_input.confirm_password)
role_lower = self._normalize_role(user_input.role)
normalized_status = self._normalize_status(user_input.status)
firestore_client = self._firestore_module.client()
self._ensure_no_duplicate_email(normalized_email, firestore_client)
try:
created_auth_user = self._firebase_auth_module.create_user(
email=normalized_email,
password=validated_password,
display_name=(user_input.name or "").strip(),
disabled=(normalized_status == "Inactive"),
)
except Exception as auth_create_error:
logger.error("Auth user creation failed for %s: %s", normalized_email, auth_create_error)
auth_error_text = str(auth_create_error)
auth_error_text_lower = auth_error_text.lower()
if "password_does_not_meet_requirements" in auth_error_text_lower or "password requirements" in auth_error_text_lower:
raise UserProvisioningError(
"weak_password",
"Password does not meet authentication policy requirements.",
400,
)
if "email already exists" in auth_error_text_lower or "email_exists" in auth_error_text_lower:
raise UserProvisioningError("duplicate_email", "A user with this email already exists.", 409)
raise UserProvisioningError("auth_create_failed", "Failed to create authentication account.", 500)
uid = str(getattr(created_auth_user, "uid", "") or "").strip()
if not uid:
raise UserProvisioningError("missing_uid", "Authentication account created without UID.", 500)
profile_payload = self._build_profile_payload(user_input, role_lower, normalized_status)
profile_payload["createdAt"] = self._firestore_server_timestamp
try:
firestore_client.collection("users").document(uid).set(profile_payload, merge=True)
except Exception as firestore_write_error:
logger.error("Firestore profile write failed for %s: %s", uid, firestore_write_error)
try:
self._firebase_auth_module.delete_user(uid)
logger.info("Rolled back Auth user creation for %s after Firestore write failure.", uid)
except Exception as rollback_error:
logger.warning(
"Failed to roll back Auth user %s after Firestore write failure: %s",
uid,
rollback_error,
)
raise UserProvisioningError("profile_write_failed", "Failed to create user profile in Firestore.", 500)
return uid
def send_welcome_credentials_email(self, user_input: AdminCreateUserInput) -> EmailSendResult:
display_name = (user_input.name or "").strip()
login_url = (os.getenv("APP_LOGIN_URL", "") or "").strip() or "https://mathpulse.ai"
brand_avatar_url = self._derive_brand_avatar_url(login_url)
recipient_avatar_url = self._build_default_avatar_url(display_name)
template = build_welcome_credentials_email(
WelcomeCredentialsEmailContext(
recipient_name=display_name,
login_email=(user_input.email or "").strip().lower(),
temporary_password=user_input.password,
role=(user_input.role or "").strip().title(),
login_url=login_url,
brand_avatar_url=brand_avatar_url,
recipient_avatar_url=recipient_avatar_url,
)
)
message = EmailMessagePayload(
to_name=display_name,
to_email=(user_input.email or "").strip().lower(),
subject=template["subject"],
html_content=template["html"],
text_content=template["text"],
)
return self._email_service.send_transactional_email(message)
def create_user_and_notify(self, user_input: AdminCreateUserInput) -> CreateUserAndNotifyResult:
uid = self.create_user(user_input)
warnings: List[str] = []
email_result = self.send_welcome_credentials_email(user_input)
if email_result.success:
return CreateUserAndNotifyResult(
uid=uid,
user_created=True,
email_sent=True,
result_code="created_and_emailed",
message="User account was created and welcome email was sent.",
warnings=warnings,
email_result=email_result,
)
warnings.append("User was created but welcome email delivery failed.")
if email_result.error_message:
warnings.append(email_result.error_message)
return CreateUserAndNotifyResult(
uid=uid,
user_created=True,
email_sent=False,
result_code="created_email_failed",
message="User account was created, but welcome email failed to send.",
warnings=warnings,
email_result=email_result,
)
|