import asyncio import random import re import aiohttp # type: ignore from bs4 import BeautifulSoup # type: ignore from typing import Dict, Any, List, Optional, Tuple import logging from urllib.parse import urljoin, quote from dotenv import load_dotenv import os from .utils.http_utils import fetch_page from .utils.image_utils import filter_logo_images, is_logo_image from .utils.html_utils import extract_rating_from_element, extract_images_from_soup from .utils.google_search_utils import fetch_hotel_images_from_google # Load environment variables load_dotenv() logger = logging.getLogger(__name__) class BookingService: """Service for scraping hotel data from Booking.com""" def __init__(self): # List of diverse user agents for rotation self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15", "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/123.0.0.0 Mobile/15E148 Safari/604.1", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/123.0.0.0 Safari/537.36", "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko" ] # Base headers - User-Agent will be overridden in get_page self.headers = { "Accept-Language": "en-US,en;q=0.9", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Connection": "keep-alive", "Upgrade-Insecure-Requests": "1", "sec-ch-ua": '"Google Chrome";v="123", "Not:A-Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } # Check if Google Search API credentials are available self.google_api_available = bool(os.getenv("GOOGLE_SEARCH_API_KEY") and os.getenv("GOOGLE_SEARCH_ENGINE_ID")) logger.info(f"BookingService initialized at 2025-05-21 15:22:38 by Garvit-Nagok") if self.google_api_available: logger.info("Google Custom Search API configured as fallback for hotel images") else: logger.warning("Google Custom Search API credentials not found - fallback will not be available") # [Keep all existing methods unchanged] async def get_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]: """Wrapper for fetch_page with rotating user agents""" # Create a copy of headers and use a random user agent current_headers = self.headers.copy() current_headers["User-Agent"] = random.choice(self.user_agents) logger.debug(f"Using user agent: {current_headers['User-Agent'][:30]}...") return await fetch_page(session, url, current_headers) async def extract_amenities(self, session: aiohttp.ClientSession, hotel_element, hotel_url: Optional[str] = None) -> List[str]: """Extract popular facilities from hotel detail page""" unique_amenities = set() if hotel_url: try: html = await self.get_page(session, hotel_url) if html: soup = BeautifulSoup(html, 'html.parser') popular_heading = soup.find(string=lambda text: text and text.strip() == "Most popular facilities") if popular_heading: current = popular_heading.parent container = None # Look for container with facility icons for _ in range(3): if not current: break if current.select("svg") or current.select("img"): container = current break parent = current.parent if parent and (parent.select("svg") or parent.select("img")): container = parent break sibling = current.find_next_sibling() if sibling and (sibling.select("svg") or sibling.select("img")): container = sibling break current = parent if not container: heading_parent = popular_heading.parent if heading_parent: container = heading_parent.find_next_sibling() # Extract facility items if container: facility_items = container.select("span") or container.select("div") for item in facility_items: text = item.get_text().strip() if text and text != "Most popular facilities" and len(text) < 30: unique_amenities.add(text) # Fallback method if not unique_amenities: try: rows = soup.select(".f6b6d2a959") or soup.select_one("div:-soup-contains('Most popular facilities')").parent.find_next_sibling().select("span") for item in rows: text = item.get_text().strip() if text and text != "Most popular facilities" and len(text) < 30: unique_amenities.add(text) except AttributeError: logger.debug("Could not find facilities using fallback selector") except Exception as e: logger.error(f"Error extracting amenities: {e}") return list(unique_amenities) async def get_room_images_from_detail_page(self, session: aiohttp.ClientSession, url: str) -> List[str]: """Get a mix of property and room images from hotel detail page""" all_images = [] try: html = await self.get_page(session, url) if html: soup = BeautifulSoup(html, 'html.parser') selectors = [ ".bui-carousel__item img", ".bh-photo-grid img", ".hp-gallery img", ".hotel-photos img", ".room-gallery img", ".hotel-room-photographs-slides img", "img.active-image", ".gallery-mosaic img", ".tour-360__image img", "img[width='300'], img[width='350'], img[width='400'], img[width='500']", ] all_images = extract_images_from_soup(soup, url, selectors) if len(all_images) < 5: for img in soup.select("img"): width = img.get("width") if width and int(width) < 100: continue src = img.get("src") or img.get("data-src") if src and not is_logo_image(src) and src not in all_images: if not src.startswith("http"): src = urljoin(url, src) all_images.append(src) if len(all_images) >= 5: break return filter_logo_images(all_images)[:5] except Exception as e: logger.error(f"Error getting hotel images: {e}", exc_info=True) return all_images[:5] if all_images else [] async def extract_rating_from_detail_page(self, session: aiohttp.ClientSession, url: str) -> Optional[float]: """Extract rating from hotel detail page""" try: html = await self.get_page(session, url) if not html: return None soup = BeautifulSoup(html, 'html.parser') guest_reviews_section = soup.find("h2", string="Guest reviews") if guest_reviews_section: rating_div = soup.select_one("div[aria-label*='Scored'] strong") or soup.select_one(".b5cd09854e") if rating_div: text = rating_div.get_text().strip() match = re.search(r"(\d+[.,]\d+)", text) if match: return float(match.group(1).replace(',', '.')) nearby_elements = guest_reviews_section.parent.select("div") for elem in nearby_elements: text = elem.get_text().strip() if re.match(r"^\d+[.,]\d+$", text): return float(text.replace(',', '.')) score_elements = soup.select(".review-score-badge, .b5cd09854e") for elem in score_elements: text = elem.get_text().strip() match = re.search(r"(\d+[.,]\d+)", text) if match: return float(match.group(1).replace(',', '.')) review_text = soup.find(string=lambda text: text and ("Review score" in text)) if review_text: parent_text = review_text.parent.get_text() if review_text.parent else "" match = re.search(r"(\d+[.,]\d+)", parent_text) if match: return float(match.group(1).replace(',', '.')) except Exception as e: logger.error(f"Error extracting rating: {e}") return None def extract_rating(self, hotel_element) -> Optional[float]: """Extract rating from hotel element""" return extract_rating_from_element(hotel_element) def is_name_similar(self, name1: str, name2: str) -> bool: """Check if two hotel names are similar enough""" if not name1 or not name2: return False name1 = name1.lower() name2 = name2.lower() if name1 in name2 or name2 in name1: return True # Compare words words1 = set(re.findall(r'\w+', name1)) words2 = set(re.findall(r'\w+', name2)) if not words1 or not words2: return False # Calculate word overlap common_words = words1.intersection(words2) similarity = len(common_words) / min(len(words1), len(words2)) return similarity >= 0.5 # 50% word overlap async def search_hotel(self, session: aiohttp.ClientSession, destination: str, hotel_name: str) -> Dict[str, Any]: """Search for a specific hotel on Booking.com""" search_query = f"{hotel_name} {destination}" search_url = f"https://www.booking.com/search.html?ss={quote(search_query)}" html = await self.get_page(session, search_url) if not html: return { "destination": destination, "hotel_name": hotel_name, "error": "Failed to retrieve search results" } soup = BeautifulSoup(html, 'html.parser') hotel_cards = soup.select("[data-testid='property-card'], .sr_property_block, .sr_item") if not hotel_cards: return { "destination": destination, "hotel_name": hotel_name, "error": "No hotels found" } # Find matching hotel card hotel_card = None for card in hotel_cards: name_elem = card.select_one("[data-testid='title'], .sr-hotel__name, .hotel_name") if name_elem: card_hotel_name = name_elem.text.strip() if self.is_name_similar(card_hotel_name, hotel_name): hotel_card = card break if not hotel_card: hotel_card = hotel_cards[0] name_elem = hotel_card.select_one("[data-testid='title'], .sr-hotel__name, .hotel_name") name = name_elem.text.strip() if name_elem else hotel_name rating = self.extract_rating(hotel_card) link_elem = hotel_card.select_one("a[href*='hotel'], a.hotel_name_link") hotel_url = "" if link_elem and 'href' in link_elem.attrs: href = link_elem['href'] hotel_url = urljoin("https://www.booking.com", href) if not href.startswith('http') else href if hotel_url: tasks = [ self.extract_rating_from_detail_page(session, hotel_url), self.get_room_images_from_detail_page(session, hotel_url), self.extract_amenities(session, hotel_card, hotel_url) ] detail_rating, images, amenities = await asyncio.gather(*tasks) if detail_rating is not None: rating = detail_rating else: images = [] amenities = [] # If scraping didn't return any images, use Google Custom Search API as fallback if not images and self.google_api_available: logger.info(f"No images found via scraping for {hotel_name} in {destination}. Using Google API as fallback.") images = await fetch_hotel_images_from_google(session, hotel_name, destination) return { "destination": destination, "hotel_name": hotel_name, "data": { "name": name, "rating": rating, "images": images, "amenities": amenities, "booking_link": hotel_url } }