| """ |
| Integration Test: Complete Allocation Flow |
| |
| Tests the end-to-end booking allocation process including: |
| - Booking event → partner identification → scoring → offer creation → notification |
| - Offer expiry → retry with next partner |
| - Partner accept → booking assignment |
| - Partner decline → retry with next partner |
| - No eligible partners → allocation failed |
| |
| Validates Requirements: 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1 |
| """ |
|
|
| import asyncio |
| from datetime import datetime, timedelta |
| from decimal import Decimal |
| from uuid import UUID, uuid4 |
|
|
| import pytest |
| from sqlalchemy import text |
| from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine |
|
|
| from app.models.booking import BookingEvent |
| from app.models.partner import Partner, RankedPartner, Location |
| from app.services.offer_service import OfferService |
| from app.services.scoring_service import ScoringService |
|
|
|
|
| |
| |
| |
|
|
| @pytest.fixture |
| async def test_db(): |
| """Create test database with schema""" |
| engine = create_async_engine( |
| "sqlite+aiosqlite:///:memory:", |
| echo=False |
| ) |
| |
| |
| async with engine.begin() as conn: |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_bookings ( |
| booking_id TEXT PRIMARY KEY, |
| customer_id TEXT NOT NULL, |
| service_category TEXT NOT NULL, |
| city TEXT NOT NULL, |
| location_lat REAL NOT NULL, |
| location_lng REAL NOT NULL, |
| scheduled_time TEXT NOT NULL, |
| allocation_status TEXT NOT NULL DEFAULT 'pending', |
| assigned_partner_id TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ) |
| """)) |
| |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_booking_offers ( |
| offer_id TEXT PRIMARY KEY, |
| booking_id TEXT NOT NULL, |
| partner_id TEXT NOT NULL, |
| offer_status TEXT NOT NULL DEFAULT 'pending', |
| offered_at TEXT NOT NULL, |
| offer_expiry TEXT NOT NULL, |
| responded_at TEXT, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ) |
| """)) |
| |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_allocation_attempts ( |
| attempt_id TEXT PRIMARY KEY, |
| booking_id TEXT NOT NULL, |
| partner_id TEXT NOT NULL, |
| attempt_timestamp TEXT NOT NULL, |
| attempt_status TEXT NOT NULL, |
| response_status TEXT, |
| response_timestamp TEXT, |
| score REAL |
| ) |
| """)) |
| |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_partner_service_map ( |
| id TEXT PRIMARY KEY, |
| partner_id TEXT NOT NULL, |
| service_category TEXT NOT NULL, |
| city TEXT NOT NULL, |
| is_active INTEGER DEFAULT 1 |
| ) |
| """)) |
| |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_partner_availability ( |
| id TEXT PRIMARY KEY, |
| partner_id TEXT NOT NULL, |
| available_from TEXT NOT NULL, |
| available_to TEXT NOT NULL, |
| current_load INTEGER DEFAULT 0, |
| max_load INTEGER DEFAULT 5, |
| location_lat REAL NOT NULL, |
| location_lng REAL NOT NULL, |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ) |
| """)) |
| |
| |
| await conn.execute(text(""" |
| CREATE TABLE spa_partner_profiles ( |
| partner_id TEXT PRIMARY KEY, |
| name TEXT NOT NULL, |
| rating REAL DEFAULT 0.0, |
| completed_bookings INTEGER DEFAULT 0, |
| active_bookings INTEGER DEFAULT 0, |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, |
| updated_at TEXT DEFAULT CURRENT_TIMESTAMP |
| ) |
| """)) |
| |
| yield engine |
| |
| await engine.dispose() |
|
|
|
|
| async def create_booking(engine: AsyncEngine, booking_id: UUID, status: str = "pending") -> None: |
| """Helper to create a booking""" |
| async with engine.begin() as conn: |
| await conn.execute( |
| text(""" |
| INSERT INTO spa_bookings |
| (booking_id, customer_id, service_category, city, |
| location_lat, location_lng, scheduled_time, allocation_status) |
| VALUES (:booking_id, :customer_id, :service_category, :city, |
| :location_lat, :location_lng, :scheduled_time, :allocation_status) |
| """), |
| { |
| "booking_id": str(booking_id), |
| "customer_id": str(uuid4()), |
| "service_category": "massage", |
| "city": "Bangalore", |
| "location_lat": 12.9716, |
| "location_lng": 77.5946, |
| "scheduled_time": (datetime.utcnow() + timedelta(hours=2)).isoformat(), |
| "allocation_status": status |
| } |
| ) |
|
|
|
|
| async def create_partner( |
| engine: AsyncEngine, |
| partner_id: UUID, |
| service_category: str = "massage", |
| city: str = "Bangalore", |
| rating: float = 4.5, |
| completed_bookings: int = 100, |
| active_bookings: int = 2, |
| location_lat: float = 12.9716, |
| location_lng: float = 77.5946 |
| ) -> None: |
| """Helper to create a partner with service mapping and availability""" |
| async with engine.begin() as conn: |
| |
| await conn.execute( |
| text(""" |
| INSERT INTO spa_partner_profiles |
| (partner_id, name, rating, completed_bookings, active_bookings) |
| VALUES (:partner_id, :name, :rating, :completed_bookings, :active_bookings) |
| """), |
| { |
| "partner_id": str(partner_id), |
| "name": f"Partner {partner_id}", |
| "rating": rating, |
| "completed_bookings": completed_bookings, |
| "active_bookings": active_bookings |
| } |
| ) |
| |
| |
| await conn.execute( |
| text(""" |
| INSERT INTO spa_partner_service_map |
| (id, partner_id, service_category, city, is_active) |
| VALUES (:id, :partner_id, :service_category, :city, 1) |
| """), |
| { |
| "id": str(uuid4()), |
| "partner_id": str(partner_id), |
| "service_category": service_category, |
| "city": city |
| } |
| ) |
| |
| |
| now = datetime.utcnow() |
| await conn.execute( |
| text(""" |
| INSERT INTO spa_partner_availability |
| (id, partner_id, available_from, available_to, current_load, max_load, |
| location_lat, location_lng) |
| VALUES (:id, :partner_id, :available_from, :available_to, :current_load, :max_load, |
| :location_lat, :location_lng) |
| """), |
| { |
| "id": str(uuid4()), |
| "partner_id": str(partner_id), |
| "available_from": now.isoformat(), |
| "available_to": (now + timedelta(hours=8)).isoformat(), |
| "current_load": active_bookings, |
| "max_load": 5, |
| "location_lat": location_lat, |
| "location_lng": location_lng |
| } |
| ) |
|
|
|
|
| async def get_booking_status(engine: AsyncEngine, booking_id: UUID) -> dict: |
| """Helper to get booking status""" |
| async with engine.connect() as conn: |
| result = await conn.execute( |
| text("SELECT allocation_status, assigned_partner_id FROM spa_bookings WHERE booking_id = :booking_id"), |
| {"booking_id": str(booking_id)} |
| ) |
| row = result.fetchone() |
| return {"allocation_status": row[0], "assigned_partner_id": row[1]} if row else None |
|
|
|
|
| async def get_offer_status(engine: AsyncEngine, booking_id: UUID) -> dict: |
| """Helper to get latest offer for booking""" |
| async with engine.connect() as conn: |
| result = await conn.execute( |
| text(""" |
| SELECT offer_id, partner_id, offer_status, offered_at, offer_expiry |
| FROM spa_booking_offers |
| WHERE booking_id = :booking_id |
| ORDER BY offered_at DESC |
| LIMIT 1 |
| """), |
| {"booking_id": str(booking_id)} |
| ) |
| row = result.fetchone() |
| if row: |
| return { |
| "offer_id": row[0], |
| "partner_id": row[1], |
| "offer_status": row[2], |
| "offered_at": row[3], |
| "offer_expiry": row[4] |
| } |
| return None |
|
|
|
|
| |
| |
| |
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_complete_allocation_flow_with_acceptance(test_db): |
| """ |
| Test: Booking event → partner identification → scoring → offer creation → partner accept → booking assigned |
| |
| Validates Requirements: 1.1, 2.1, 3.1, 4.1, 6.1 |
| """ |
| |
| booking_id = uuid4() |
| partner_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner_id, rating=4.8, completed_bookings=150) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| offer = await offer_service.create_offer(booking_id, partner_id) |
| |
| |
| assert offer is not None |
| assert offer.offer_status == "pending" |
| assert offer.booking_id == booking_id |
| assert offer.partner_id == partner_id |
| |
| |
| time_diff = (offer.offer_expiry - offer.offered_at).total_seconds() |
| assert time_diff == 30.0 |
| |
| |
| accepted = await offer_service.accept_offer(offer.offer_id) |
| |
| |
| assert accepted is True |
| |
| |
| booking_status = await get_booking_status(test_db, booking_id) |
| assert booking_status["allocation_status"] == "assigned" |
| assert booking_status["assigned_partner_id"] == str(partner_id) |
| |
| |
| offer_status = await get_offer_status(test_db, booking_id) |
| assert offer_status["offer_status"] == "accepted" |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_offer_expiry_and_retry(test_db): |
| """ |
| Test: Offer expiry → retry with next partner |
| |
| Validates Requirements: 5.1, 7.1 |
| """ |
| |
| booking_id = uuid4() |
| partner1_id = uuid4() |
| partner2_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner1_id, rating=4.5, location_lat=12.9716, location_lng=77.5946) |
| await create_partner(test_db, partner2_id, rating=4.3, location_lat=12.9800, location_lng=77.6000) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| offer1 = await offer_service.create_offer(booking_id, partner1_id) |
| |
| assert offer1 is not None |
| assert offer1.offer_status == "pending" |
| |
| |
| |
| await asyncio.sleep(0.1) |
| |
| |
| async with test_db.begin() as conn: |
| await conn.execute( |
| text("UPDATE spa_booking_offers SET offer_status = 'expired' WHERE offer_id = :offer_id"), |
| {"offer_id": str(offer1.offer_id)} |
| ) |
| |
| |
| offer2 = await offer_service.create_offer(booking_id, partner2_id) |
| |
| |
| assert offer2 is not None |
| assert offer2.offer_status == "pending" |
| assert offer2.partner_id == partner2_id |
| assert offer2.offer_id != offer1.offer_id |
| |
| |
| async with test_db.connect() as conn: |
| result = await conn.execute( |
| text("SELECT offer_status FROM spa_booking_offers WHERE offer_id = :offer_id"), |
| {"offer_id": str(offer1.offer_id)} |
| ) |
| row = result.fetchone() |
| assert row[0] == "expired" |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_partner_decline_and_retry(test_db): |
| """ |
| Test: Partner decline → retry with next partner |
| |
| Validates Requirements: 6.1, 7.1 |
| """ |
| |
| booking_id = uuid4() |
| partner1_id = uuid4() |
| partner2_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner1_id, rating=4.7) |
| await create_partner(test_db, partner2_id, rating=4.4) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| offer1 = await offer_service.create_offer(booking_id, partner1_id) |
| |
| assert offer1 is not None |
| |
| |
| declined = await offer_service.decline_offer(offer1.offer_id) |
| |
| |
| assert declined is True |
| |
| |
| offer_status = await get_offer_status(test_db, booking_id) |
| assert offer_status["offer_status"] == "declined" |
| |
| |
| offer2 = await offer_service.create_offer(booking_id, partner2_id) |
| |
| |
| assert offer2 is not None |
| assert offer2.offer_status == "pending" |
| assert offer2.partner_id == partner2_id |
| |
| |
| booking_status = await get_booking_status(test_db, booking_id) |
| assert booking_status["allocation_status"] == "pending" |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_no_eligible_partners_allocation_failed(test_db): |
| """ |
| Test: No eligible partners → allocation failed |
| |
| Validates Requirements: 2.1, 7.1 |
| """ |
| |
| booking_id = uuid4() |
| await create_booking(test_db, booking_id) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| |
| |
| |
| |
| async with test_db.begin() as conn: |
| await conn.execute( |
| text("UPDATE spa_bookings SET allocation_status = 'failed' WHERE booking_id = :booking_id"), |
| {"booking_id": str(booking_id)} |
| ) |
| |
| |
| booking_status = await get_booking_status(test_db, booking_id) |
| assert booking_status["allocation_status"] == "failed" |
| assert booking_status["assigned_partner_id"] is None |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_booking_already_assigned_prevents_new_offer(test_db): |
| """ |
| Test: Booking already assigned → new offer creation aborted |
| |
| Validates Requirements: 8.2 |
| """ |
| |
| booking_id = uuid4() |
| partner1_id = uuid4() |
| partner2_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner1_id) |
| await create_partner(test_db, partner2_id) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| offer1 = await offer_service.create_offer(booking_id, partner1_id) |
| await offer_service.accept_offer(offer1.offer_id) |
| |
| |
| booking_status = await get_booking_status(test_db, booking_id) |
| assert booking_status["allocation_status"] == "assigned" |
| |
| |
| offer2 = await offer_service.create_offer(booking_id, partner2_id) |
| |
| |
| assert offer2 is None |
| |
| |
| async with test_db.connect() as conn: |
| result = await conn.execute( |
| text("SELECT COUNT(*) FROM spa_booking_offers WHERE booking_id = :booking_id"), |
| {"booking_id": str(booking_id)} |
| ) |
| count = result.fetchone()[0] |
| assert count == 1 |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_offer_expiry_timing_exactly_30_seconds(test_db): |
| """ |
| Test: Offer expiry is exactly 30 seconds from offered_at |
| |
| Validates Requirements: 4.3 |
| """ |
| |
| booking_id = uuid4() |
| partner_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner_id) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| offer = await offer_service.create_offer(booking_id, partner_id) |
| |
| |
| assert offer is not None |
| time_diff = (offer.offer_expiry - offer.offered_at).total_seconds() |
| assert time_diff == 30.0 |
|
|
|
|
| @pytest.mark.asyncio |
| @pytest.mark.integration |
| async def test_multiple_allocation_attempts_tracked(test_db): |
| """ |
| Test: Multiple allocation attempts are tracked in spa_allocation_attempts |
| |
| Validates Requirements: 4.6, 7.1 |
| """ |
| |
| booking_id = uuid4() |
| partner1_id = uuid4() |
| partner2_id = uuid4() |
| partner3_id = uuid4() |
| |
| await create_booking(test_db, booking_id) |
| await create_partner(test_db, partner1_id) |
| await create_partner(test_db, partner2_id) |
| await create_partner(test_db, partner3_id) |
| |
| |
| offer_service = OfferService(engine=test_db) |
| |
| offer1 = await offer_service.create_offer(booking_id, partner1_id) |
| await offer_service.decline_offer(offer1.offer_id) |
| |
| offer2 = await offer_service.create_offer(booking_id, partner2_id) |
| await offer_service.decline_offer(offer2.offer_id) |
| |
| offer3 = await offer_service.create_offer(booking_id, partner3_id) |
| await offer_service.accept_offer(offer3.offer_id) |
| |
| |
| async with test_db.connect() as conn: |
| result = await conn.execute( |
| text(""" |
| SELECT partner_id, attempt_status, response_status |
| FROM spa_allocation_attempts |
| WHERE booking_id = :booking_id |
| ORDER BY attempt_timestamp |
| """), |
| {"booking_id": str(booking_id)} |
| ) |
| attempts = result.fetchall() |
| |
| assert len(attempts) == 3 |
| |
| |
| assert attempts[0][0] == str(partner1_id) |
| assert attempts[0][1] == "offered" |
| assert attempts[0][2] == "declined" |
| |
| |
| assert attempts[1][0] == str(partner2_id) |
| assert attempts[1][1] == "offered" |
| assert attempts[1][2] == "declined" |
| |
| |
| assert attempts[2][0] == str(partner3_id) |
| assert attempts[2][1] == "offered" |
| assert attempts[2][2] == "accepted" |
|
|