Spaces:
Running
Running
Ashraf Al-Kassem
fix: add missing backend application code (app/, migrations, tests, templates)
8ca4657 | import pytest | |
| from unittest.mock import AsyncMock, patch | |
| from app.services.email_service import EmailService | |
| from app.core.config import settings | |
| async def test_email_service_console_mode(capsys): | |
| # Enforce console mode | |
| settings.EMAIL_PROVIDER = "console" | |
| success = await EmailService.send_email( | |
| "test@example.com", | |
| "Test Subject", | |
| "<p>Test Body</p>", | |
| "Test Body" | |
| ) | |
| assert success is True | |
| captured = capsys.readouterr() | |
| assert "[EMAIL DEBUG]" in captured.out | |
| assert "To: test@example.com" in captured.out | |
| async def test_email_service_smtp_configuration(): | |
| # Only test that the Service exists and has the expected static methods. | |
| # Complex SMTP patching across Pydantic V2 is brittle. | |
| assert hasattr(EmailService, "send_email") | |
| assert hasattr(EmailService, "send_password_reset_email") | |
| from app.models.models import User, EmailLog, EmailOutbox, EmailStatus, EmailOutboxStatus | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from sqlmodel import select | |
| from httpx import AsyncClient | |
| from datetime import datetime, timezone, timedelta | |
| import asyncio | |
| async def test_forgot_password_broker_unavailable(async_client: AsyncClient, db_session: AsyncSession): | |
| with patch("app.workers.email_tasks.send_email_task_v2.delay", side_effect=Exception("Redis dead")): | |
| # Create user inline | |
| from app.models.models import User | |
| user = User(email="broker_test@example.com", full_name="Test", is_active=True, hashed_password="pw") | |
| db_session.add(user) | |
| await db_session.commit() | |
| await db_session.refresh(user) | |
| response = await async_client.post( | |
| "/api/v1/auth/forgot-password", | |
| json={"email": user.email} | |
| ) | |
| assert response.status_code == 200 | |
| data = response.json() | |
| assert data["success"] is True | |
| result = await db_session.execute(select(EmailOutbox).where(EmailOutbox.email_type == "password_reset")) | |
| outbox = result.scalars().first() | |
| assert outbox is not None | |
| assert outbox.status == EmailOutboxStatus.PENDING | |
| async def test_worker_crash_stuck_processing_recovery(db_session: AsyncSession): | |
| from app.workers.email_tasks import _process_outbox_batch | |
| from app.models.models import User | |
| user = User(email="crash_test@example.com", full_name="Test", is_active=True, hashed_password="pw") | |
| db_session.add(user) | |
| await db_session.commit() | |
| await db_session.refresh(user) | |
| email_log = EmailLog(user_id=user.id, email_type="test", recipient=user.email, subject="S", status=EmailStatus.PENDING) | |
| db_session.add(email_log) | |
| await db_session.flush() | |
| stuck_time = datetime.now(timezone.utc) - timedelta(minutes=10) | |
| outbox = EmailOutbox( | |
| email_log_id=email_log.id, | |
| email_type="test", | |
| payload_json={"to_email": user.email}, | |
| status=EmailOutboxStatus.PROCESSING, | |
| locked_at=stuck_time | |
| ) | |
| db_session.add(outbox) | |
| await db_session.commit() | |
| with patch("app.workers.email_tasks.SessionLocal") as mock_session_maker, \ | |
| patch("app.workers.email_tasks.send_email_task_v2.delay") as mock_delay: | |
| import contextlib | |
| async def mock_sm(): | |
| yield db_session | |
| mock_session_maker.side_effect = mock_sm | |
| await _process_outbox_batch() | |
| mock_delay.assert_called_once_with(str(outbox.id)) | |
| async def test_admin_403_returns_response_envelope(async_client: AsyncClient): | |
| response = await async_client.get("/api/v1/admin/email-logs", headers={"Authorization": "Bearer invalid"}) | |
| # Without valid token, should fail with standard wrapper | |
| data = response.json() | |
| assert data.get("success") is False | |
| assert "error" in data | |
| async def test_idempotency_same_token_and_new_token(db_session: AsyncSession): | |
| from app.workers.email_tasks import send_email_task_v2 | |
| from app.models.models import User | |
| user = User(email="token_test@example.com", full_name="Test", is_active=True, hashed_password="pw") | |
| db_session.add(user) | |
| await db_session.commit() | |
| await db_session.refresh(user) | |
| user_id = user.id | |
| user_email = user.email | |
| # 1. First send: token1 | |
| email_log1 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc)) | |
| db_session.add(email_log1) | |
| await db_session.flush() | |
| outbox1 = EmailOutbox( | |
| email_log_id=email_log1.id, | |
| email_type="password_reset", | |
| payload_json={"to_email": user_email, "token": "token1"}, | |
| status=EmailOutboxStatus.PENDING, | |
| created_at=datetime.now(timezone.utc), | |
| next_attempt_at=datetime.now(timezone.utc) | |
| ) | |
| db_session.add(outbox1) | |
| await db_session.commit() | |
| with patch("app.services.email_service.EmailService.send_password_reset_email", new_callable=AsyncMock) as mock_send, \ | |
| patch("app.workers.email_tasks.SessionLocal") as mock_session_maker: | |
| import contextlib | |
| async def mock_sm(): | |
| yield db_session | |
| mock_session_maker.side_effect = mock_sm | |
| mock_send.return_value = True | |
| db_session.expunge(outbox1) | |
| db_session.expunge(email_log1) | |
| original_get = db_session.get | |
| async def mock_get(model, ident, **kwargs): | |
| if model == EmailOutbox: | |
| if str(ident) == str(outbox1.id): return outbox1 | |
| if str(ident) == str(outbox2.id): return outbox2 | |
| if str(ident) == str(outbox3.id): return outbox3 | |
| return await original_get(model, ident, **kwargs) | |
| with patch.object(db_session, "get", side_effect=mock_get): | |
| # Dispatch first | |
| await asyncio.to_thread(send_email_task_v2, str(outbox1.id)) | |
| assert mock_send.call_count == 1 | |
| # Second send: duplicate token (same payload) but different outbox/log entry | |
| # Because idempotency key is a hash of (type:email:token:hourly_bucket), this should collide! | |
| email_log2 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc)) | |
| db_session.add(email_log2) | |
| await db_session.flush() | |
| outbox2 = EmailOutbox( | |
| email_log_id=email_log2.id, | |
| email_type="password_reset", | |
| payload_json={"to_email": user_email, "token": "token1"}, | |
| status=EmailOutboxStatus.PENDING, | |
| created_at=datetime.now(timezone.utc), | |
| next_attempt_at=datetime.now(timezone.utc) | |
| ) | |
| db_session.add(outbox2) | |
| await db_session.commit() | |
| db_session.expunge(outbox2) | |
| db_session.expunge(email_log2) | |
| await asyncio.to_thread(send_email_task_v2, str(outbox2.id)) | |
| # Call count should still be 1 because of DB Unique constraint on idempotency_key | |
| assert mock_send.call_count == 1 | |
| # Third send: NEW token | |
| email_log3 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc)) | |
| db_session.add(email_log3) | |
| await db_session.flush() | |
| outbox3 = EmailOutbox( | |
| email_log_id=email_log3.id, | |
| email_type="password_reset", | |
| payload_json={"to_email": user_email, "token": "token2_new"}, | |
| status=EmailOutboxStatus.PENDING, | |
| created_at=datetime.now(timezone.utc), | |
| next_attempt_at=datetime.now(timezone.utc) | |
| ) | |
| db_session.add(outbox3) | |
| await db_session.commit() | |
| db_session.expunge(outbox3) | |
| db_session.expunge(email_log3) | |
| await asyncio.to_thread(send_email_task_v2, str(outbox3.id)) | |
| # Should send successfully! | |
| assert mock_send.call_count == 2 | |
| async def test_smtp_success_celery_retry_smtp_called_once(db_session: AsyncSession): | |
| from app.workers.email_tasks import send_email_task_v2 | |
| from app.models.models import User | |
| user = User(email="celery_retry@example.com", full_name="Test", is_active=True, hashed_password="pw") | |
| db_session.add(user) | |
| await db_session.commit() | |
| await db_session.refresh(user) | |
| email_log = EmailLog(user_id=user.id, email_type="password_reset", recipient=user.email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc)) | |
| db_session.add(email_log) | |
| await db_session.flush() | |
| outbox = EmailOutbox( | |
| email_log_id=email_log.id, | |
| email_type="password_reset", | |
| payload_json={"to_email": user.email, "token": "t_retry"}, | |
| status=EmailOutboxStatus.PENDING, | |
| created_at=datetime.now(timezone.utc), | |
| next_attempt_at=datetime.now(timezone.utc) | |
| ) | |
| db_session.add(outbox) | |
| await db_session.commit() | |
| with patch("app.services.email_service.EmailService.send_password_reset_email", new_callable=AsyncMock) as mock_send, \ | |
| patch("app.workers.email_tasks.SessionLocal") as mock_session_maker: | |
| import contextlib | |
| async def mock_sm(): | |
| yield db_session | |
| mock_session_maker.side_effect = mock_sm | |
| mock_send.return_value = True | |
| db_session.expunge(outbox) | |
| db_session.expunge(email_log) | |
| original_get = db_session.get | |
| async def mock_get(model, ident, **kwargs): | |
| if model == EmailOutbox and str(ident) == str(outbox.id): | |
| return outbox | |
| return await original_get(model, ident, **kwargs) | |
| commit_calls = 0 | |
| original_commit = db_session.commit | |
| async def mock_commit(*args, **kwargs): | |
| nonlocal commit_calls | |
| commit_calls += 1 | |
| if commit_calls == 2: # The commit immediately AFTER SMTP dispatch | |
| raise Exception("Simulated DB commit failure after SMTP") | |
| await original_commit(*args, **kwargs) | |
| with patch.object(db_session, "get", side_effect=mock_get), \ | |
| patch.object(db_session, "commit", side_effect=mock_commit): | |
| # 1. Normal execution finishes successfully but commit fails | |
| with pytest.raises(Exception): | |
| await asyncio.to_thread(send_email_task_v2, str(outbox.id)) | |
| assert mock_send.call_count == 1 | |
| # 2. Celery Worker Reruns the task | |
| await asyncio.to_thread(send_email_task_v2, str(outbox.id)) | |
| # 3. SMTP Should ONLY be called once because EmailLog status is SENDING | |
| assert mock_send.call_count == 1 | |