""" Integration Test: Complete Allocation Flow Tests the end-to-end booking allocation process including: - Booking event → partner identification → scoring → offer creation → notification - Offer expiry → retry with next partner - Partner accept → booking assignment - Partner decline → retry with next partner - No eligible partners → allocation failed Validates Requirements: 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1 """ import asyncio from datetime import datetime, timedelta from decimal import Decimal from uuid import UUID, uuid4 import pytest from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine from app.models.booking import BookingEvent from app.models.partner import Partner, RankedPartner, Location from app.services.offer_service import OfferService from app.services.scoring_service import ScoringService # ============================================================================ # Test Fixtures # ============================================================================ @pytest.fixture async def test_db(): """Create test database with schema""" engine = create_async_engine( "sqlite+aiosqlite:///:memory:", echo=False ) # Create schema async with engine.begin() as conn: # Bookings table await conn.execute(text(""" CREATE TABLE spa_bookings ( booking_id TEXT PRIMARY KEY, customer_id TEXT NOT NULL, service_category TEXT NOT NULL, city TEXT NOT NULL, location_lat REAL NOT NULL, location_lng REAL NOT NULL, scheduled_time TEXT NOT NULL, allocation_status TEXT NOT NULL DEFAULT 'pending', assigned_partner_id TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) # Offers table await conn.execute(text(""" CREATE TABLE spa_booking_offers ( offer_id TEXT PRIMARY KEY, booking_id TEXT NOT NULL, partner_id TEXT NOT NULL, offer_status TEXT NOT NULL DEFAULT 'pending', offered_at TEXT NOT NULL, offer_expiry TEXT NOT NULL, responded_at TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) # Allocation attempts table await conn.execute(text(""" CREATE TABLE spa_allocation_attempts ( attempt_id TEXT PRIMARY KEY, booking_id TEXT NOT NULL, partner_id TEXT NOT NULL, attempt_timestamp TEXT NOT NULL, attempt_status TEXT NOT NULL, response_status TEXT, response_timestamp TEXT, score REAL ) """)) # Partner service map table await conn.execute(text(""" CREATE TABLE spa_partner_service_map ( id TEXT PRIMARY KEY, partner_id TEXT NOT NULL, service_category TEXT NOT NULL, city TEXT NOT NULL, is_active INTEGER DEFAULT 1 ) """)) # Partner availability table await conn.execute(text(""" CREATE TABLE spa_partner_availability ( id TEXT PRIMARY KEY, partner_id TEXT NOT NULL, available_from TEXT NOT NULL, available_to TEXT NOT NULL, current_load INTEGER DEFAULT 0, max_load INTEGER DEFAULT 5, location_lat REAL NOT NULL, location_lng REAL NOT NULL, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) # Partner profiles table await conn.execute(text(""" CREATE TABLE spa_partner_profiles ( partner_id TEXT PRIMARY KEY, name TEXT NOT NULL, rating REAL DEFAULT 0.0, completed_bookings INTEGER DEFAULT 0, active_bookings INTEGER DEFAULT 0, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """)) yield engine await engine.dispose() async def create_booking(engine: AsyncEngine, booking_id: UUID, status: str = "pending") -> None: """Helper to create a booking""" async with engine.begin() as conn: await conn.execute( text(""" INSERT INTO spa_bookings (booking_id, customer_id, service_category, city, location_lat, location_lng, scheduled_time, allocation_status) VALUES (:booking_id, :customer_id, :service_category, :city, :location_lat, :location_lng, :scheduled_time, :allocation_status) """), { "booking_id": str(booking_id), "customer_id": str(uuid4()), "service_category": "massage", "city": "Bangalore", "location_lat": 12.9716, "location_lng": 77.5946, "scheduled_time": (datetime.utcnow() + timedelta(hours=2)).isoformat(), "allocation_status": status } ) async def create_partner( engine: AsyncEngine, partner_id: UUID, service_category: str = "massage", city: str = "Bangalore", rating: float = 4.5, completed_bookings: int = 100, active_bookings: int = 2, location_lat: float = 12.9716, location_lng: float = 77.5946 ) -> None: """Helper to create a partner with service mapping and availability""" async with engine.begin() as conn: # Create partner profile await conn.execute( text(""" INSERT INTO spa_partner_profiles (partner_id, name, rating, completed_bookings, active_bookings) VALUES (:partner_id, :name, :rating, :completed_bookings, :active_bookings) """), { "partner_id": str(partner_id), "name": f"Partner {partner_id}", "rating": rating, "completed_bookings": completed_bookings, "active_bookings": active_bookings } ) # Create service mapping await conn.execute( text(""" INSERT INTO spa_partner_service_map (id, partner_id, service_category, city, is_active) VALUES (:id, :partner_id, :service_category, :city, 1) """), { "id": str(uuid4()), "partner_id": str(partner_id), "service_category": service_category, "city": city } ) # Create availability now = datetime.utcnow() await conn.execute( text(""" INSERT INTO spa_partner_availability (id, partner_id, available_from, available_to, current_load, max_load, location_lat, location_lng) VALUES (:id, :partner_id, :available_from, :available_to, :current_load, :max_load, :location_lat, :location_lng) """), { "id": str(uuid4()), "partner_id": str(partner_id), "available_from": now.isoformat(), "available_to": (now + timedelta(hours=8)).isoformat(), "current_load": active_bookings, "max_load": 5, "location_lat": location_lat, "location_lng": location_lng } ) async def get_booking_status(engine: AsyncEngine, booking_id: UUID) -> dict: """Helper to get booking status""" async with engine.connect() as conn: result = await conn.execute( text("SELECT allocation_status, assigned_partner_id FROM spa_bookings WHERE booking_id = :booking_id"), {"booking_id": str(booking_id)} ) row = result.fetchone() return {"allocation_status": row[0], "assigned_partner_id": row[1]} if row else None async def get_offer_status(engine: AsyncEngine, booking_id: UUID) -> dict: """Helper to get latest offer for booking""" async with engine.connect() as conn: result = await conn.execute( text(""" SELECT offer_id, partner_id, offer_status, offered_at, offer_expiry FROM spa_booking_offers WHERE booking_id = :booking_id ORDER BY offered_at DESC LIMIT 1 """), {"booking_id": str(booking_id)} ) row = result.fetchone() if row: return { "offer_id": row[0], "partner_id": row[1], "offer_status": row[2], "offered_at": row[3], "offer_expiry": row[4] } return None # ============================================================================ # Integration Tests # ============================================================================ @pytest.mark.asyncio @pytest.mark.integration async def test_complete_allocation_flow_with_acceptance(test_db): """ Test: Booking event → partner identification → scoring → offer creation → partner accept → booking assigned Validates Requirements: 1.1, 2.1, 3.1, 4.1, 6.1 """ # Arrange: Create booking and partner booking_id = uuid4() partner_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner_id, rating=4.8, completed_bookings=150) # Act: Create offer (simulating allocation worker) offer_service = OfferService(engine=test_db) offer = await offer_service.create_offer(booking_id, partner_id) # Assert: Offer created successfully assert offer is not None assert offer.offer_status == "pending" assert offer.booking_id == booking_id assert offer.partner_id == partner_id # Verify offer expiry is 30 seconds from offered_at time_diff = (offer.offer_expiry - offer.offered_at).total_seconds() assert time_diff == 30.0 # Act: Partner accepts offer accepted = await offer_service.accept_offer(offer.offer_id) # Assert: Acceptance processed assert accepted is True # Verify booking is assigned booking_status = await get_booking_status(test_db, booking_id) assert booking_status["allocation_status"] == "assigned" assert booking_status["assigned_partner_id"] == str(partner_id) # Verify offer status updated offer_status = await get_offer_status(test_db, booking_id) assert offer_status["offer_status"] == "accepted" @pytest.mark.asyncio @pytest.mark.integration async def test_offer_expiry_and_retry(test_db): """ Test: Offer expiry → retry with next partner Validates Requirements: 5.1, 7.1 """ # Arrange: Create booking and two partners booking_id = uuid4() partner1_id = uuid4() partner2_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner1_id, rating=4.5, location_lat=12.9716, location_lng=77.5946) await create_partner(test_db, partner2_id, rating=4.3, location_lat=12.9800, location_lng=77.6000) # Act: Create offer to first partner offer_service = OfferService(engine=test_db) offer1 = await offer_service.create_offer(booking_id, partner1_id) assert offer1 is not None assert offer1.offer_status == "pending" # Simulate time passing beyond expiry (in real scenario, expiry worker would do this) # For testing, we manually expire the offer await asyncio.sleep(0.1) # Small delay to ensure time has passed # Manually set offer as expired (simulating expiry worker) async with test_db.begin() as conn: await conn.execute( text("UPDATE spa_booking_offers SET offer_status = 'expired' WHERE offer_id = :offer_id"), {"offer_id": str(offer1.offer_id)} ) # Act: Create offer to second partner (retry) offer2 = await offer_service.create_offer(booking_id, partner2_id) # Assert: Second offer created successfully assert offer2 is not None assert offer2.offer_status == "pending" assert offer2.partner_id == partner2_id assert offer2.offer_id != offer1.offer_id # Verify first offer is expired async with test_db.connect() as conn: result = await conn.execute( text("SELECT offer_status FROM spa_booking_offers WHERE offer_id = :offer_id"), {"offer_id": str(offer1.offer_id)} ) row = result.fetchone() assert row[0] == "expired" @pytest.mark.asyncio @pytest.mark.integration async def test_partner_decline_and_retry(test_db): """ Test: Partner decline → retry with next partner Validates Requirements: 6.1, 7.1 """ # Arrange: Create booking and two partners booking_id = uuid4() partner1_id = uuid4() partner2_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner1_id, rating=4.7) await create_partner(test_db, partner2_id, rating=4.4) # Act: Create offer to first partner offer_service = OfferService(engine=test_db) offer1 = await offer_service.create_offer(booking_id, partner1_id) assert offer1 is not None # Act: First partner declines declined = await offer_service.decline_offer(offer1.offer_id) # Assert: Decline processed assert declined is True # Verify offer status offer_status = await get_offer_status(test_db, booking_id) assert offer_status["offer_status"] == "declined" # Act: Create offer to second partner (retry) offer2 = await offer_service.create_offer(booking_id, partner2_id) # Assert: Second offer created successfully assert offer2 is not None assert offer2.offer_status == "pending" assert offer2.partner_id == partner2_id # Verify booking still pending (not assigned) booking_status = await get_booking_status(test_db, booking_id) assert booking_status["allocation_status"] == "pending" @pytest.mark.asyncio @pytest.mark.integration async def test_no_eligible_partners_allocation_failed(test_db): """ Test: No eligible partners → allocation failed Validates Requirements: 2.1, 7.1 """ # Arrange: Create booking but no partners booking_id = uuid4() await create_booking(test_db, booking_id) # Act: Try to create offer (should fail - no partners available) offer_service = OfferService(engine=test_db) # In real scenario, allocation worker would check for eligible partners first # and set status to failed if none found # Here we simulate that by manually updating the booking async with test_db.begin() as conn: await conn.execute( text("UPDATE spa_bookings SET allocation_status = 'failed' WHERE booking_id = :booking_id"), {"booking_id": str(booking_id)} ) # Assert: Booking marked as failed booking_status = await get_booking_status(test_db, booking_id) assert booking_status["allocation_status"] == "failed" assert booking_status["assigned_partner_id"] is None @pytest.mark.asyncio @pytest.mark.integration async def test_booking_already_assigned_prevents_new_offer(test_db): """ Test: Booking already assigned → new offer creation aborted Validates Requirements: 8.2 """ # Arrange: Create booking and partner, then assign booking booking_id = uuid4() partner1_id = uuid4() partner2_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner1_id) await create_partner(test_db, partner2_id) # Create and accept first offer offer_service = OfferService(engine=test_db) offer1 = await offer_service.create_offer(booking_id, partner1_id) await offer_service.accept_offer(offer1.offer_id) # Verify booking is assigned booking_status = await get_booking_status(test_db, booking_id) assert booking_status["allocation_status"] == "assigned" # Act: Try to create another offer (should be rejected) offer2 = await offer_service.create_offer(booking_id, partner2_id) # Assert: Second offer creation aborted assert offer2 is None # Verify only one offer exists async with test_db.connect() as conn: result = await conn.execute( text("SELECT COUNT(*) FROM spa_booking_offers WHERE booking_id = :booking_id"), {"booking_id": str(booking_id)} ) count = result.fetchone()[0] assert count == 1 @pytest.mark.asyncio @pytest.mark.integration async def test_offer_expiry_timing_exactly_30_seconds(test_db): """ Test: Offer expiry is exactly 30 seconds from offered_at Validates Requirements: 4.3 """ # Arrange booking_id = uuid4() partner_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner_id) # Act offer_service = OfferService(engine=test_db) offer = await offer_service.create_offer(booking_id, partner_id) # Assert assert offer is not None time_diff = (offer.offer_expiry - offer.offered_at).total_seconds() assert time_diff == 30.0 @pytest.mark.asyncio @pytest.mark.integration async def test_multiple_allocation_attempts_tracked(test_db): """ Test: Multiple allocation attempts are tracked in spa_allocation_attempts Validates Requirements: 4.6, 7.1 """ # Arrange booking_id = uuid4() partner1_id = uuid4() partner2_id = uuid4() partner3_id = uuid4() await create_booking(test_db, booking_id) await create_partner(test_db, partner1_id) await create_partner(test_db, partner2_id) await create_partner(test_db, partner3_id) # Act: Create offers to multiple partners offer_service = OfferService(engine=test_db) offer1 = await offer_service.create_offer(booking_id, partner1_id) await offer_service.decline_offer(offer1.offer_id) offer2 = await offer_service.create_offer(booking_id, partner2_id) await offer_service.decline_offer(offer2.offer_id) offer3 = await offer_service.create_offer(booking_id, partner3_id) await offer_service.accept_offer(offer3.offer_id) # Assert: All attempts tracked async with test_db.connect() as conn: result = await conn.execute( text(""" SELECT partner_id, attempt_status, response_status FROM spa_allocation_attempts WHERE booking_id = :booking_id ORDER BY attempt_timestamp """), {"booking_id": str(booking_id)} ) attempts = result.fetchall() assert len(attempts) == 3 # Verify first attempt assert attempts[0][0] == str(partner1_id) assert attempts[0][1] == "offered" assert attempts[0][2] == "declined" # Verify second attempt assert attempts[1][0] == str(partner2_id) assert attempts[1][1] == "offered" assert attempts[1][2] == "declined" # Verify third attempt assert attempts[2][0] == str(partner3_id) assert attempts[2][1] == "offered" assert attempts[2][2] == "accepted"