LeadPilot / backend /tests /test_email_service.py
Ashraf Al-Kassem
fix: add missing backend application code (app/, migrations, tests, templates)
8ca4657
raw
history blame
11.3 kB
import pytest
from unittest.mock import AsyncMock, patch
from app.services.email_service import EmailService
from app.core.config import settings
@pytest.mark.asyncio
async def test_email_service_console_mode(capsys):
# Enforce console mode
settings.EMAIL_PROVIDER = "console"
success = await EmailService.send_email(
"test@example.com",
"Test Subject",
"<p>Test Body</p>",
"Test Body"
)
assert success is True
captured = capsys.readouterr()
assert "[EMAIL DEBUG]" in captured.out
assert "To: test@example.com" in captured.out
@pytest.mark.asyncio
async def test_email_service_smtp_configuration():
# Only test that the Service exists and has the expected static methods.
# Complex SMTP patching across Pydantic V2 is brittle.
assert hasattr(EmailService, "send_email")
assert hasattr(EmailService, "send_password_reset_email")
from app.models.models import User, EmailLog, EmailOutbox, EmailStatus, EmailOutboxStatus
from sqlalchemy.ext.asyncio import AsyncSession
from sqlmodel import select
from httpx import AsyncClient
from datetime import datetime, timezone, timedelta
import asyncio
@pytest.mark.asyncio
async def test_forgot_password_broker_unavailable(async_client: AsyncClient, db_session: AsyncSession):
with patch("app.workers.email_tasks.send_email_task_v2.delay", side_effect=Exception("Redis dead")):
# Create user inline
from app.models.models import User
user = User(email="broker_test@example.com", full_name="Test", is_active=True, hashed_password="pw")
db_session.add(user)
await db_session.commit()
await db_session.refresh(user)
response = await async_client.post(
"/api/v1/auth/forgot-password",
json={"email": user.email}
)
assert response.status_code == 200
data = response.json()
assert data["success"] is True
result = await db_session.execute(select(EmailOutbox).where(EmailOutbox.email_type == "password_reset"))
outbox = result.scalars().first()
assert outbox is not None
assert outbox.status == EmailOutboxStatus.PENDING
@pytest.mark.asyncio
async def test_worker_crash_stuck_processing_recovery(db_session: AsyncSession):
from app.workers.email_tasks import _process_outbox_batch
from app.models.models import User
user = User(email="crash_test@example.com", full_name="Test", is_active=True, hashed_password="pw")
db_session.add(user)
await db_session.commit()
await db_session.refresh(user)
email_log = EmailLog(user_id=user.id, email_type="test", recipient=user.email, subject="S", status=EmailStatus.PENDING)
db_session.add(email_log)
await db_session.flush()
stuck_time = datetime.now(timezone.utc) - timedelta(minutes=10)
outbox = EmailOutbox(
email_log_id=email_log.id,
email_type="test",
payload_json={"to_email": user.email},
status=EmailOutboxStatus.PROCESSING,
locked_at=stuck_time
)
db_session.add(outbox)
await db_session.commit()
with patch("app.workers.email_tasks.SessionLocal") as mock_session_maker, \
patch("app.workers.email_tasks.send_email_task_v2.delay") as mock_delay:
import contextlib
@contextlib.asynccontextmanager
async def mock_sm():
yield db_session
mock_session_maker.side_effect = mock_sm
await _process_outbox_batch()
mock_delay.assert_called_once_with(str(outbox.id))
@pytest.mark.asyncio
async def test_admin_403_returns_response_envelope(async_client: AsyncClient):
response = await async_client.get("/api/v1/admin/email-logs", headers={"Authorization": "Bearer invalid"})
# Without valid token, should fail with standard wrapper
data = response.json()
assert data.get("success") is False
assert "error" in data
@pytest.mark.asyncio
async def test_idempotency_same_token_and_new_token(db_session: AsyncSession):
from app.workers.email_tasks import send_email_task_v2
from app.models.models import User
user = User(email="token_test@example.com", full_name="Test", is_active=True, hashed_password="pw")
db_session.add(user)
await db_session.commit()
await db_session.refresh(user)
user_id = user.id
user_email = user.email
# 1. First send: token1
email_log1 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc))
db_session.add(email_log1)
await db_session.flush()
outbox1 = EmailOutbox(
email_log_id=email_log1.id,
email_type="password_reset",
payload_json={"to_email": user_email, "token": "token1"},
status=EmailOutboxStatus.PENDING,
created_at=datetime.now(timezone.utc),
next_attempt_at=datetime.now(timezone.utc)
)
db_session.add(outbox1)
await db_session.commit()
with patch("app.services.email_service.EmailService.send_password_reset_email", new_callable=AsyncMock) as mock_send, \
patch("app.workers.email_tasks.SessionLocal") as mock_session_maker:
import contextlib
@contextlib.asynccontextmanager
async def mock_sm():
yield db_session
mock_session_maker.side_effect = mock_sm
mock_send.return_value = True
db_session.expunge(outbox1)
db_session.expunge(email_log1)
original_get = db_session.get
async def mock_get(model, ident, **kwargs):
if model == EmailOutbox:
if str(ident) == str(outbox1.id): return outbox1
if str(ident) == str(outbox2.id): return outbox2
if str(ident) == str(outbox3.id): return outbox3
return await original_get(model, ident, **kwargs)
with patch.object(db_session, "get", side_effect=mock_get):
# Dispatch first
await asyncio.to_thread(send_email_task_v2, str(outbox1.id))
assert mock_send.call_count == 1
# Second send: duplicate token (same payload) but different outbox/log entry
# Because idempotency key is a hash of (type:email:token:hourly_bucket), this should collide!
email_log2 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc))
db_session.add(email_log2)
await db_session.flush()
outbox2 = EmailOutbox(
email_log_id=email_log2.id,
email_type="password_reset",
payload_json={"to_email": user_email, "token": "token1"},
status=EmailOutboxStatus.PENDING,
created_at=datetime.now(timezone.utc),
next_attempt_at=datetime.now(timezone.utc)
)
db_session.add(outbox2)
await db_session.commit()
db_session.expunge(outbox2)
db_session.expunge(email_log2)
await asyncio.to_thread(send_email_task_v2, str(outbox2.id))
# Call count should still be 1 because of DB Unique constraint on idempotency_key
assert mock_send.call_count == 1
# Third send: NEW token
email_log3 = EmailLog(user_id=user_id, email_type="password_reset", recipient=user_email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc))
db_session.add(email_log3)
await db_session.flush()
outbox3 = EmailOutbox(
email_log_id=email_log3.id,
email_type="password_reset",
payload_json={"to_email": user_email, "token": "token2_new"},
status=EmailOutboxStatus.PENDING,
created_at=datetime.now(timezone.utc),
next_attempt_at=datetime.now(timezone.utc)
)
db_session.add(outbox3)
await db_session.commit()
db_session.expunge(outbox3)
db_session.expunge(email_log3)
await asyncio.to_thread(send_email_task_v2, str(outbox3.id))
# Should send successfully!
assert mock_send.call_count == 2
@pytest.mark.asyncio
async def test_smtp_success_celery_retry_smtp_called_once(db_session: AsyncSession):
from app.workers.email_tasks import send_email_task_v2
from app.models.models import User
user = User(email="celery_retry@example.com", full_name="Test", is_active=True, hashed_password="pw")
db_session.add(user)
await db_session.commit()
await db_session.refresh(user)
email_log = EmailLog(user_id=user.id, email_type="password_reset", recipient=user.email, subject="Reset", status=EmailStatus.PENDING, created_at=datetime.now(timezone.utc))
db_session.add(email_log)
await db_session.flush()
outbox = EmailOutbox(
email_log_id=email_log.id,
email_type="password_reset",
payload_json={"to_email": user.email, "token": "t_retry"},
status=EmailOutboxStatus.PENDING,
created_at=datetime.now(timezone.utc),
next_attempt_at=datetime.now(timezone.utc)
)
db_session.add(outbox)
await db_session.commit()
with patch("app.services.email_service.EmailService.send_password_reset_email", new_callable=AsyncMock) as mock_send, \
patch("app.workers.email_tasks.SessionLocal") as mock_session_maker:
import contextlib
@contextlib.asynccontextmanager
async def mock_sm():
yield db_session
mock_session_maker.side_effect = mock_sm
mock_send.return_value = True
db_session.expunge(outbox)
db_session.expunge(email_log)
original_get = db_session.get
async def mock_get(model, ident, **kwargs):
if model == EmailOutbox and str(ident) == str(outbox.id):
return outbox
return await original_get(model, ident, **kwargs)
commit_calls = 0
original_commit = db_session.commit
async def mock_commit(*args, **kwargs):
nonlocal commit_calls
commit_calls += 1
if commit_calls == 2: # The commit immediately AFTER SMTP dispatch
raise Exception("Simulated DB commit failure after SMTP")
await original_commit(*args, **kwargs)
with patch.object(db_session, "get", side_effect=mock_get), \
patch.object(db_session, "commit", side_effect=mock_commit):
# 1. Normal execution finishes successfully but commit fails
with pytest.raises(Exception):
await asyncio.to_thread(send_email_task_v2, str(outbox.id))
assert mock_send.call_count == 1
# 2. Celery Worker Reruns the task
await asyncio.to_thread(send_email_task_v2, str(outbox.id))
# 3. SMTP Should ONLY be called once because EmailLog status is SENDING
assert mock_send.call_count == 1