MukeshKapoor25's picture
Initial commit: Booking Allocation Engine with complete documentation
7f9f4b4
"""
Integration Test: Complete Allocation Flow
Tests the end-to-end booking allocation process including:
- Booking event → partner identification → scoring → offer creation → notification
- Offer expiry → retry with next partner
- Partner accept → booking assignment
- Partner decline → retry with next partner
- No eligible partners → allocation failed
Validates Requirements: 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1
"""
import asyncio
from datetime import datetime, timedelta
from decimal import Decimal
from uuid import UUID, uuid4
import pytest
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from app.models.booking import BookingEvent
from app.models.partner import Partner, RankedPartner, Location
from app.services.offer_service import OfferService
from app.services.scoring_service import ScoringService
# ============================================================================
# Test Fixtures
# ============================================================================
@pytest.fixture
async def test_db():
"""Create test database with schema"""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False
)
# Create schema
async with engine.begin() as conn:
# Bookings table
await conn.execute(text("""
CREATE TABLE spa_bookings (
booking_id TEXT PRIMARY KEY,
customer_id TEXT NOT NULL,
service_category TEXT NOT NULL,
city TEXT NOT NULL,
location_lat REAL NOT NULL,
location_lng REAL NOT NULL,
scheduled_time TEXT NOT NULL,
allocation_status TEXT NOT NULL DEFAULT 'pending',
assigned_partner_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
# Offers table
await conn.execute(text("""
CREATE TABLE spa_booking_offers (
offer_id TEXT PRIMARY KEY,
booking_id TEXT NOT NULL,
partner_id TEXT NOT NULL,
offer_status TEXT NOT NULL DEFAULT 'pending',
offered_at TEXT NOT NULL,
offer_expiry TEXT NOT NULL,
responded_at TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
# Allocation attempts table
await conn.execute(text("""
CREATE TABLE spa_allocation_attempts (
attempt_id TEXT PRIMARY KEY,
booking_id TEXT NOT NULL,
partner_id TEXT NOT NULL,
attempt_timestamp TEXT NOT NULL,
attempt_status TEXT NOT NULL,
response_status TEXT,
response_timestamp TEXT,
score REAL
)
"""))
# Partner service map table
await conn.execute(text("""
CREATE TABLE spa_partner_service_map (
id TEXT PRIMARY KEY,
partner_id TEXT NOT NULL,
service_category TEXT NOT NULL,
city TEXT NOT NULL,
is_active INTEGER DEFAULT 1
)
"""))
# Partner availability table
await conn.execute(text("""
CREATE TABLE spa_partner_availability (
id TEXT PRIMARY KEY,
partner_id TEXT NOT NULL,
available_from TEXT NOT NULL,
available_to TEXT NOT NULL,
current_load INTEGER DEFAULT 0,
max_load INTEGER DEFAULT 5,
location_lat REAL NOT NULL,
location_lng REAL NOT NULL,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
# Partner profiles table
await conn.execute(text("""
CREATE TABLE spa_partner_profiles (
partner_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
rating REAL DEFAULT 0.0,
completed_bookings INTEGER DEFAULT 0,
active_bookings INTEGER DEFAULT 0,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""))
yield engine
await engine.dispose()
async def create_booking(engine: AsyncEngine, booking_id: UUID, status: str = "pending") -> None:
"""Helper to create a booking"""
async with engine.begin() as conn:
await conn.execute(
text("""
INSERT INTO spa_bookings
(booking_id, customer_id, service_category, city,
location_lat, location_lng, scheduled_time, allocation_status)
VALUES (:booking_id, :customer_id, :service_category, :city,
:location_lat, :location_lng, :scheduled_time, :allocation_status)
"""),
{
"booking_id": str(booking_id),
"customer_id": str(uuid4()),
"service_category": "massage",
"city": "Bangalore",
"location_lat": 12.9716,
"location_lng": 77.5946,
"scheduled_time": (datetime.utcnow() + timedelta(hours=2)).isoformat(),
"allocation_status": status
}
)
async def create_partner(
engine: AsyncEngine,
partner_id: UUID,
service_category: str = "massage",
city: str = "Bangalore",
rating: float = 4.5,
completed_bookings: int = 100,
active_bookings: int = 2,
location_lat: float = 12.9716,
location_lng: float = 77.5946
) -> None:
"""Helper to create a partner with service mapping and availability"""
async with engine.begin() as conn:
# Create partner profile
await conn.execute(
text("""
INSERT INTO spa_partner_profiles
(partner_id, name, rating, completed_bookings, active_bookings)
VALUES (:partner_id, :name, :rating, :completed_bookings, :active_bookings)
"""),
{
"partner_id": str(partner_id),
"name": f"Partner {partner_id}",
"rating": rating,
"completed_bookings": completed_bookings,
"active_bookings": active_bookings
}
)
# Create service mapping
await conn.execute(
text("""
INSERT INTO spa_partner_service_map
(id, partner_id, service_category, city, is_active)
VALUES (:id, :partner_id, :service_category, :city, 1)
"""),
{
"id": str(uuid4()),
"partner_id": str(partner_id),
"service_category": service_category,
"city": city
}
)
# Create availability
now = datetime.utcnow()
await conn.execute(
text("""
INSERT INTO spa_partner_availability
(id, partner_id, available_from, available_to, current_load, max_load,
location_lat, location_lng)
VALUES (:id, :partner_id, :available_from, :available_to, :current_load, :max_load,
:location_lat, :location_lng)
"""),
{
"id": str(uuid4()),
"partner_id": str(partner_id),
"available_from": now.isoformat(),
"available_to": (now + timedelta(hours=8)).isoformat(),
"current_load": active_bookings,
"max_load": 5,
"location_lat": location_lat,
"location_lng": location_lng
}
)
async def get_booking_status(engine: AsyncEngine, booking_id: UUID) -> dict:
"""Helper to get booking status"""
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT allocation_status, assigned_partner_id FROM spa_bookings WHERE booking_id = :booking_id"),
{"booking_id": str(booking_id)}
)
row = result.fetchone()
return {"allocation_status": row[0], "assigned_partner_id": row[1]} if row else None
async def get_offer_status(engine: AsyncEngine, booking_id: UUID) -> dict:
"""Helper to get latest offer for booking"""
async with engine.connect() as conn:
result = await conn.execute(
text("""
SELECT offer_id, partner_id, offer_status, offered_at, offer_expiry
FROM spa_booking_offers
WHERE booking_id = :booking_id
ORDER BY offered_at DESC
LIMIT 1
"""),
{"booking_id": str(booking_id)}
)
row = result.fetchone()
if row:
return {
"offer_id": row[0],
"partner_id": row[1],
"offer_status": row[2],
"offered_at": row[3],
"offer_expiry": row[4]
}
return None
# ============================================================================
# Integration Tests
# ============================================================================
@pytest.mark.asyncio
@pytest.mark.integration
async def test_complete_allocation_flow_with_acceptance(test_db):
"""
Test: Booking event → partner identification → scoring → offer creation → partner accept → booking assigned
Validates Requirements: 1.1, 2.1, 3.1, 4.1, 6.1
"""
# Arrange: Create booking and partner
booking_id = uuid4()
partner_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner_id, rating=4.8, completed_bookings=150)
# Act: Create offer (simulating allocation worker)
offer_service = OfferService(engine=test_db)
offer = await offer_service.create_offer(booking_id, partner_id)
# Assert: Offer created successfully
assert offer is not None
assert offer.offer_status == "pending"
assert offer.booking_id == booking_id
assert offer.partner_id == partner_id
# Verify offer expiry is 30 seconds from offered_at
time_diff = (offer.offer_expiry - offer.offered_at).total_seconds()
assert time_diff == 30.0
# Act: Partner accepts offer
accepted = await offer_service.accept_offer(offer.offer_id)
# Assert: Acceptance processed
assert accepted is True
# Verify booking is assigned
booking_status = await get_booking_status(test_db, booking_id)
assert booking_status["allocation_status"] == "assigned"
assert booking_status["assigned_partner_id"] == str(partner_id)
# Verify offer status updated
offer_status = await get_offer_status(test_db, booking_id)
assert offer_status["offer_status"] == "accepted"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_offer_expiry_and_retry(test_db):
"""
Test: Offer expiry → retry with next partner
Validates Requirements: 5.1, 7.1
"""
# Arrange: Create booking and two partners
booking_id = uuid4()
partner1_id = uuid4()
partner2_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner1_id, rating=4.5, location_lat=12.9716, location_lng=77.5946)
await create_partner(test_db, partner2_id, rating=4.3, location_lat=12.9800, location_lng=77.6000)
# Act: Create offer to first partner
offer_service = OfferService(engine=test_db)
offer1 = await offer_service.create_offer(booking_id, partner1_id)
assert offer1 is not None
assert offer1.offer_status == "pending"
# Simulate time passing beyond expiry (in real scenario, expiry worker would do this)
# For testing, we manually expire the offer
await asyncio.sleep(0.1) # Small delay to ensure time has passed
# Manually set offer as expired (simulating expiry worker)
async with test_db.begin() as conn:
await conn.execute(
text("UPDATE spa_booking_offers SET offer_status = 'expired' WHERE offer_id = :offer_id"),
{"offer_id": str(offer1.offer_id)}
)
# Act: Create offer to second partner (retry)
offer2 = await offer_service.create_offer(booking_id, partner2_id)
# Assert: Second offer created successfully
assert offer2 is not None
assert offer2.offer_status == "pending"
assert offer2.partner_id == partner2_id
assert offer2.offer_id != offer1.offer_id
# Verify first offer is expired
async with test_db.connect() as conn:
result = await conn.execute(
text("SELECT offer_status FROM spa_booking_offers WHERE offer_id = :offer_id"),
{"offer_id": str(offer1.offer_id)}
)
row = result.fetchone()
assert row[0] == "expired"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_partner_decline_and_retry(test_db):
"""
Test: Partner decline → retry with next partner
Validates Requirements: 6.1, 7.1
"""
# Arrange: Create booking and two partners
booking_id = uuid4()
partner1_id = uuid4()
partner2_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner1_id, rating=4.7)
await create_partner(test_db, partner2_id, rating=4.4)
# Act: Create offer to first partner
offer_service = OfferService(engine=test_db)
offer1 = await offer_service.create_offer(booking_id, partner1_id)
assert offer1 is not None
# Act: First partner declines
declined = await offer_service.decline_offer(offer1.offer_id)
# Assert: Decline processed
assert declined is True
# Verify offer status
offer_status = await get_offer_status(test_db, booking_id)
assert offer_status["offer_status"] == "declined"
# Act: Create offer to second partner (retry)
offer2 = await offer_service.create_offer(booking_id, partner2_id)
# Assert: Second offer created successfully
assert offer2 is not None
assert offer2.offer_status == "pending"
assert offer2.partner_id == partner2_id
# Verify booking still pending (not assigned)
booking_status = await get_booking_status(test_db, booking_id)
assert booking_status["allocation_status"] == "pending"
@pytest.mark.asyncio
@pytest.mark.integration
async def test_no_eligible_partners_allocation_failed(test_db):
"""
Test: No eligible partners → allocation failed
Validates Requirements: 2.1, 7.1
"""
# Arrange: Create booking but no partners
booking_id = uuid4()
await create_booking(test_db, booking_id)
# Act: Try to create offer (should fail - no partners available)
offer_service = OfferService(engine=test_db)
# In real scenario, allocation worker would check for eligible partners first
# and set status to failed if none found
# Here we simulate that by manually updating the booking
async with test_db.begin() as conn:
await conn.execute(
text("UPDATE spa_bookings SET allocation_status = 'failed' WHERE booking_id = :booking_id"),
{"booking_id": str(booking_id)}
)
# Assert: Booking marked as failed
booking_status = await get_booking_status(test_db, booking_id)
assert booking_status["allocation_status"] == "failed"
assert booking_status["assigned_partner_id"] is None
@pytest.mark.asyncio
@pytest.mark.integration
async def test_booking_already_assigned_prevents_new_offer(test_db):
"""
Test: Booking already assigned → new offer creation aborted
Validates Requirements: 8.2
"""
# Arrange: Create booking and partner, then assign booking
booking_id = uuid4()
partner1_id = uuid4()
partner2_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner1_id)
await create_partner(test_db, partner2_id)
# Create and accept first offer
offer_service = OfferService(engine=test_db)
offer1 = await offer_service.create_offer(booking_id, partner1_id)
await offer_service.accept_offer(offer1.offer_id)
# Verify booking is assigned
booking_status = await get_booking_status(test_db, booking_id)
assert booking_status["allocation_status"] == "assigned"
# Act: Try to create another offer (should be rejected)
offer2 = await offer_service.create_offer(booking_id, partner2_id)
# Assert: Second offer creation aborted
assert offer2 is None
# Verify only one offer exists
async with test_db.connect() as conn:
result = await conn.execute(
text("SELECT COUNT(*) FROM spa_booking_offers WHERE booking_id = :booking_id"),
{"booking_id": str(booking_id)}
)
count = result.fetchone()[0]
assert count == 1
@pytest.mark.asyncio
@pytest.mark.integration
async def test_offer_expiry_timing_exactly_30_seconds(test_db):
"""
Test: Offer expiry is exactly 30 seconds from offered_at
Validates Requirements: 4.3
"""
# Arrange
booking_id = uuid4()
partner_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner_id)
# Act
offer_service = OfferService(engine=test_db)
offer = await offer_service.create_offer(booking_id, partner_id)
# Assert
assert offer is not None
time_diff = (offer.offer_expiry - offer.offered_at).total_seconds()
assert time_diff == 30.0
@pytest.mark.asyncio
@pytest.mark.integration
async def test_multiple_allocation_attempts_tracked(test_db):
"""
Test: Multiple allocation attempts are tracked in spa_allocation_attempts
Validates Requirements: 4.6, 7.1
"""
# Arrange
booking_id = uuid4()
partner1_id = uuid4()
partner2_id = uuid4()
partner3_id = uuid4()
await create_booking(test_db, booking_id)
await create_partner(test_db, partner1_id)
await create_partner(test_db, partner2_id)
await create_partner(test_db, partner3_id)
# Act: Create offers to multiple partners
offer_service = OfferService(engine=test_db)
offer1 = await offer_service.create_offer(booking_id, partner1_id)
await offer_service.decline_offer(offer1.offer_id)
offer2 = await offer_service.create_offer(booking_id, partner2_id)
await offer_service.decline_offer(offer2.offer_id)
offer3 = await offer_service.create_offer(booking_id, partner3_id)
await offer_service.accept_offer(offer3.offer_id)
# Assert: All attempts tracked
async with test_db.connect() as conn:
result = await conn.execute(
text("""
SELECT partner_id, attempt_status, response_status
FROM spa_allocation_attempts
WHERE booking_id = :booking_id
ORDER BY attempt_timestamp
"""),
{"booking_id": str(booking_id)}
)
attempts = result.fetchall()
assert len(attempts) == 3
# Verify first attempt
assert attempts[0][0] == str(partner1_id)
assert attempts[0][1] == "offered"
assert attempts[0][2] == "declined"
# Verify second attempt
assert attempts[1][0] == str(partner2_id)
assert attempts[1][1] == "offered"
assert attempts[1][2] == "declined"
# Verify third attempt
assert attempts[2][0] == str(partner3_id)
assert attempts[2][1] == "offered"
assert attempts[2][2] == "accepted"