| import asyncio |
| import aiohttp |
| import logging |
| from typing import List, Dict, Any |
| from services.booking_service import BookingService |
| from models.requests import HotelQuery |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class HotelScraper: |
| """Main scraper class that coordinates the scraping process""" |
| |
| def __init__(self): |
| self.booking_service = BookingService() |
| |
| async def scrape_hotels(self, hotel_queries: List[HotelQuery]) -> List[Dict[str, Any]]: |
| """Scrape multiple hotels concurrently""" |
| logger.info(f"Starting to scrape {len(hotel_queries)} hotels") |
| |
| |
| connector = aiohttp.TCPConnector( |
| limit=3, |
| ttl_dns_cache=300, |
| force_close=True, |
| enable_cleanup_closed=True, |
| ) |
| |
| timeout = aiohttp.ClientTimeout(total=90) |
| |
| async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
| |
| processed_results = [] |
| chunk_size = 1 |
| |
| for i in range(0, len(hotel_queries), chunk_size): |
| chunk_queries = hotel_queries[i:i+chunk_size] |
| |
| |
| tasks = [] |
| for query in chunk_queries: |
| task = asyncio.create_task(self.booking_service.search_hotel( |
| session=session, |
| destination=query.destination, |
| hotel_name=query.hotel_name |
| )) |
| tasks.append(task) |
| |
| |
| chunk_results = await asyncio.gather(*tasks, return_exceptions=True) |
| |
| |
| for j, result in enumerate(chunk_results): |
| query_index = i + j |
| if isinstance(result, Exception): |
| logger.error(f"Error scraping {hotel_queries[query_index].hotel_name}: {result}") |
| processed_results.append({ |
| "destination": hotel_queries[query_index].destination, |
| "hotel_name": hotel_queries[query_index].hotel_name, |
| "error": f"Scraping failed: {str(result)}" |
| }) |
| else: |
| processed_results.append(result) |
| |
| |
| if i + chunk_size < len(hotel_queries): |
| await asyncio.sleep(2) |
| |
| return processed_results |