import asyncio import re import os import httpx import random import time from typing import List, Dict from datetime import datetime from urllib.parse import quote # Import bypass utilities from rate_limit_bypass import ( smart_requester, get_random_user_agent, generate_user_agent, cache, RequestDelayer ) from scraper_health import scraper_metrics class PatchrightAirbnbScraper: def __init__(self): self.firecrawl_key = os.getenv("FIRECRAWL_API_KEY") or os.getenv("firecrawl_api_key") self.cache = cache self.delayer = RequestDelayer(min_delay=5, max_delay=15) async def search_airbnb(self, region: str, checkin: str, checkout: str, adults: int = 4, children: int = 0, pets: int = 1, budget_max: int = 500) -> List[Dict]: """ Smart search with fallback strategies. """ # Specificity fix: If region is a single word and likely European, append "Germany" or "Netherlands" # to avoid landing in "Hamburg, NY" etc. search_region = region if "," not in region: low_region = region.lower() if any(x in low_region for x in ["hamburg", "berlin", "münchen", "munich", "köln", "cologne"]): search_region = f"{region}, Germany" elif any(x in low_region for x in ["amsterdam", "rotterdam", "utrecht", "zandvoort", "texel", "zeeland"]): search_region = f"{region}, Netherlands" # Calculate nights for parsing d1 = datetime.strptime(checkin, "%Y-%m-%d") d2 = datetime.strptime(checkout, "%Y-%m-%d") nights = max(1, (d2 - d1).days) strategies = [ ("curl", self._search_curl), ("firecrawl", self._search_firecrawl), ] for name, strategy in strategies: started = time.perf_counter() try: print(f" [Scraper] Trying {name} strategy for {search_region}...") deals = await strategy(search_region, checkin, checkout, adults, children, pets, budget_max, nights) duration = time.perf_counter() - started scraper_metrics.record( source="airbnb", strategy=name, success=bool(deals), duration=duration, result_count=len(deals) if deals else 0, error=None if deals else "no_results", ) if deals and len(deals) > 0: print(f" ✅ {name} strategy succeeded: {len(deals)} deals") return deals except Exception as e: duration = time.perf_counter() - started scraper_metrics.record( source="airbnb", strategy=name, success=False, duration=duration, result_count=0, error=str(e), ) err_short = self._truncate_text(str(e), 100) print(f" ❌ {name} strategy failed: {err_short}") continue fallback_started = time.perf_counter() fallback_deals = self._get_fallback_data(search_region, nights) fallback_duration = time.perf_counter() - fallback_started scraper_metrics.record( source="airbnb", strategy="fallback", success=bool(fallback_deals), duration=fallback_duration, result_count=len(fallback_deals), error=None if fallback_deals else "no_results", ) return fallback_deals async def _search_curl(self, region: str, checkin: str, checkout: str, adults: int, children: int, pets: int, budget_max: int, nights: int) -> List[Dict]: """ Fast strategy using local httpx request with rotated User-Agents. Note: Airbnb often blocks this, hence why it's the first (fast) attempt. """ await self.delayer.wait() url = f"https://www.airbnb.com/s/{quote(region)}/homes?checkin={checkin}&checkout={checkout}&adults={adults}&children={children}&pets={pets}&price_max={budget_max}" headers = { "User-Agent": get_random_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "DNT": "1", "Upgrade-Insecure-Requests": "1", } async with httpx.AsyncClient(headers=headers, timeout=30.0, follow_redirects=True) as client: response = await client.get(url) if response.status_code == 200: # Basic check for block if "dropped her ice cream" in response.text or "unusual activity" in response.text: raise Exception("429 Blocked by Airbnb (Ice Cream/Bot detection)") # If we got real HTML, parse it (parsing logic might need to be different for raw HTML vs Markdown) # For now, we reuse the markdown parser if the text looks okay, or return empty to trigger next strategy return [] # Placeholder: HTML parsing is complex, fallback to Firecrawl for now elif response.status_code == 429: raise Exception("429 Too Many Requests") else: raise Exception(f"HTTP Error {response.status_code}") return [] async def _search_firecrawl(self, region: str, checkin: str, checkout: str, adults: int, children: int, pets: int, budget_max: int, nights: int) -> List[Dict]: """Verified strategy using Firecrawl cloud scraping.""" if not self.firecrawl_key: raise Exception("Firecrawl API key missing") url = f"https://www.airbnb.com/s/{quote(region)}/homes?checkin={checkin}&checkout={checkout}&adults={adults}&children={children}&pets={pets}&price_max={budget_max}" async def make_firecrawl_call(): async with httpx.AsyncClient(timeout=120.0) as client: payload = { "url": url, "formats": ["markdown"], "waitFor": 8000, "actions": [ {"type": "scroll", "direction": "down", "amount": 500}, {"type": "wait", "milliseconds": 2000} ] } return await client.post( "https://api.firecrawl.dev/v1/scrape", headers={"Authorization": f"Bearer {self.firecrawl_key}"}, json=payload ) response = await smart_requester.request(make_firecrawl_call) if response.status_code == 200: data = response.json().get('data', {}) html = data.get('html', '') markdown = data.get('markdown', '') deals = [] # Check for Airbnb Error Page (Ice Cream Girl / 503) if html and "dropped her ice cream" not in html and "temporarily unavailable" not in html: # Airbnb HTML parsing is complex, we mainly use markdown, # but we can try to find properties in markdown here deals = self._parse_markdown(markdown, region, nights) if not deals and markdown: deals = self._parse_markdown(markdown, region, nights) if not deals: raise Exception("Airbnb blocked or no results found") return deals else: raise Exception(f"Firecrawl API Error: {response.status_code}") def _get_fallback_data(self, region: str, nights: int, *args, **kwargs) -> List[Dict]: """Emergency fallback data when all scraping fails.""" print(f" ⚠️ Using fallback data for {region}") return [ { "name": f"Gemütliches Haus in {region} (Fallback)", "location": region, "price_per_night": 120, "rating": 4.5, "reviews": 10, "pet_friendly": True, "source": "fallback", "url": "https://www.airbnb.com", "image_url": "https://images.unsplash.com/photo-1518780664697-55e3ad937233?auto=format&fit=crop&q=80&w=720" } ] def _parse_markdown(self, text: str, region: str, searched_nights: int) -> List[Dict]: deals = [] # 0. Check for "No results" or "Other dates" sections # If we see "Results for other dates", we should truncate the text to avoid parsing them other_dates_patterns = [ "Results for other dates", "Ergebnisse für andere Daten", "Suggested results", "Vorgeschlagene Ergebnisse", "Try adjusting your search", "Versuche es mit anderen Filtern" ] clean_text = text for p in other_dates_patterns: if p in text: # Truncate text at the first occurrence of such a section clean_text = text.split(p)[0] break # 1. Identify all Room IDs and their positions in the CLEAN text id_pattern = re.compile(r'rooms/(\d+)') matches = [(m.group(1), m.start()) for m in id_pattern.finditer(clean_text)] # Deduplicate while preserving order of first appearance seen = set() unique_matches = [] for rid, pos in matches: if rid not in seen: seen.add(rid) unique_matches.append((rid, pos)) for i, (room_id, pos) in enumerate(unique_matches): # Define the text block for this listing # Instead of starting at pos, we look at the range between IDs # or a generous buffer before the current ID prev_pos = unique_matches[i-1][1] if i > 0 else 0 # The block should start after the previous deal or at a reasonable offset start_search = max(prev_pos, pos - 2000) end_search = unique_matches[i+1][1] if i + 1 < len(unique_matches) else len(clean_text) block = self._substring(clean_text, start_search, end_search) # --- PARSING LOGIC --- # 1. Images (capture up to 5) images = [] # Look for all images in this block img_matches = re.findall(r'!\[.*?\]\((https://[^)]+)\)', block) for img_url in img_matches: full_url = img_url.split('?')[0] + "?im_w=720" if full_url not in images: images.append(full_url) if len(images) >= 5: break image_url = images[0] if images else "" # 2. Name # Strategy: Look for the title which is often a bold line or a line following the "Apartment in..." name = "[DEBUG: NAME FEHLT]" # Remove image markdown from block to avoid noise clean_block = re.sub(r'!\[.*?\]\(.*?\)', '', block) lines = [l.strip() for l in clean_block.split('\n') if l.strip()] # Pattern for "Type in Location" type_pattern = r'(Apartment|Home|Condo|Villa|House|Guest suite|Cottage|Loft|Room|Private room) in ([A-Za-z\s,\-]+)' for idx, line in enumerate(lines): # If we find the type line, the name is usually the next line if re.search(type_pattern, line, re.I): if idx + 1 < len(lines): potential_name = lines[idx+1] # Ensure it's not a rating line or another room ID if "stars" not in potential_name.lower() and "rooms/" not in potential_name: name = potential_name break # If it's the only line or next is invalid, use current minus the prefix name = re.sub(type_pattern, '', line, flags=re.I).strip() if not name: name = "Airbnb Stay" break if name == "[DEBUG: NAME FEHLT]" or len(name) < 3: # Fallback: Use the first non-link, non-rating line for l in lines: if "rooms/" not in l and "rating" not in l.lower() and "review" not in l.lower() and len(l) > 5: name = l break # Cleanup name: remove leading/trailing punctuation often found in markdown name = name.strip('*,# ') if name.lower() == region.lower(): # If name is just the city, it's a bad parse name = f"Stay in {region}" # 3. Price price_per_night = 0 # Search for "$1,350 ... for 5 nights" pattern # Matches: $1,234 or €1.234 price_block_match = re.search(r'([\$\€\£])\s*([\d,\.]+).*?for\s+(\d+)\s+nights', block, re.DOTALL | re.IGNORECASE) if price_block_match: currency, amount_str, nights_found = price_block_match.groups() amount = int(re.sub(r'[^\d]', '', amount_str)) nights_found = int(nights_found) if nights_found > 0: price_per_night = round(amount / nights_found) else: # Fallback: Find any price and assume it is nightly if low, or total if high # Check for "per night" or "Nacht" nearby nightly_match = re.search(r'([\$\€\£])\s*([\d,\.]+)\s*(per night|night|Nacht)', block, re.IGNORECASE) if nightly_match: price_per_night = int(re.sub(r'[^\d]', '', nightly_match.group(2))) else: prices = re.findall(r'[\$\€\£]\s*([\d,\.]+)', block) valid_prices = [] for p in prices: try: v = int(re.sub(r'[^\d]', '', p)) valid_prices.append(v) except: pass if valid_prices: best_guess = min(valid_prices) if best_guess > 1000: price_per_night = round(best_guess / searched_nights) else: price_per_night = best_guess # 4. Rating / Reviews rating = 4.8 reviews = 20 # "4.32 out of 5 average rating, 141 reviews" rating_match = re.search(r'([\d\.]+)\s*out of 5', block) if rating_match: try: rating = float(rating_match.group(1)) except: pass rev_match = re.search(r'(\d+)\s*reviews', block) if rev_match: try: reviews = int(rev_match.group(1)) except: pass # Add to list # Availability logic: If no price could be determined, it's not a valid deal for these dates if price_per_night > 0: deals.append({ "name": name, "location": region, "price_per_night": price_per_night, "rating": rating, "reviews": reviews, "pet_friendly": True, "source": "airbnb (cloud)", "url": f"https://www.airbnb.com/rooms/{room_id}", "image_url": image_url, "images": images }) return deals def _truncate_text(self, value: object, limit: int = 120) -> str: text = str(value) if len(text) <= limit: return text result = "" idx = 0 while idx < limit and idx < len(text): result = result + text[idx] idx += 1 return result def _substring(self, text: str, start: int, end: int) -> str: safe_start = max(0, start) safe_end = max(safe_start, end) text_len = len(text) if safe_start >= text_len: return "" if safe_end > text_len: safe_end = text_len out = "" idx = safe_start while idx < safe_end: out = out + text[idx] idx += 1 return out SmartAirbnbScraper = PatchrightAirbnbScraper