ERUMESU / app /core /scraper.py
leave-everything's picture
Migrated from another account
643d1b5 verified
"""Core scraping logic using Playwright and BrightData"""
import asyncio
import urllib.parse
from typing import List, Dict, Optional
from playwright.async_api import async_playwright
import logging
from app.core.config import settings
logger = logging.getLogger(__name__)
class HermesScraper:
"""Hermes product scraper using BrightData Browser API"""
def __init__(self):
self.browser_url = settings.browser_url
self.max_retries = settings.MAX_RETRIES
self.timeout = settings.DEFAULT_TIMEOUT
async def extract_products(self, search_term: str, max_products: int = 200) -> Dict:
"""
Extract products from Hermes search results
Args:
search_term: Search term for products
max_products: Maximum number of products to extract
Returns:
Dictionary containing extraction results
"""
encoded_term = urllib.parse.quote(search_term)
search_url = f"https://www.hermes.com/jp/ja/search/?s={encoded_term}#|"
logger.info(f"Starting extraction for search term: {search_term}")
logger.info(f"BrightData URL prefix: {self.browser_url[:50]}...")
logger.info(f"Target URL: {search_url}")
for retry_count in range(self.max_retries):
try:
logger.info(f"Attempt {retry_count + 1}/{self.max_retries} - Connecting to BrightData...")
async with async_playwright() as pw:
# Try WebSocket connection first
browser = None
try:
logger.info("Trying WebSocket CDP connection...")
browser = await pw.chromium.connect_over_cdp(
self.browser_url,
timeout=self.timeout
)
logger.info("Successfully connected via WebSocket CDP")
except Exception as ws_error:
logger.warning(f"WebSocket connection failed: {str(ws_error)[:100]}")
logger.info("Trying alternative HTTP proxy connection...")
# Try alternative proxy connection
browser = await pw.chromium.launch(
proxy=settings.proxy_settings,
headless=True,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
logger.info("Successfully connected via HTTP proxy")
if not browser:
raise Exception("Could not establish browser connection")
page = await browser.new_page()
logger.info("New page created")
# Navigate to search results
logger.info(f"Navigating to: {search_url}")
await page.goto(search_url, wait_until='domcontentloaded', timeout=self.timeout)
# Wait for Angular to stabilize (critical from WSL version)
logger.info("Waiting for Angular framework to stabilize...")
await page.evaluate("""
() => new Promise(resolve => {
if (window.getAllAngularTestabilities) {
const testabilities = window.getAllAngularTestabilities();
Promise.all(
testabilities.map(t =>
new Promise(r => t.whenStable(r))
)
).then(resolve);
} else {
setTimeout(resolve, 3000);
}
})
""")
logger.info("Angular stabilized")
# Additional wait for dynamic content
await page.wait_for_timeout(3000)
# Log page information for debugging
current_url = page.url
page_title = await page.title()
logger.info(f"Current URL: {current_url}")
logger.info(f"Page title: {page_title}")
# Take screenshot for debugging (save to /tmp for HF Spaces)
screenshot_path = f"/tmp/debug_screenshot_{retry_count + 1}.png"
await page.screenshot(path=screenshot_path)
logger.info(f"Screenshot saved to: {screenshot_path}")
# Log page content snippet for debugging
page_content = await page.content()
logger.info(f"Page content snippet: {page_content[:500]}...")
# Check for error page
error_check = await self._check_for_error(page)
if error_check['has_error'] and not error_check['has_products']:
if retry_count < self.max_retries - 1:
logger.warning("Error page detected, retrying...")
await browser.close()
await asyncio.sleep(5)
continue
else:
logger.error("Max retries reached on error page")
await browser.close()
return {
'success': False,
'error': 'Error page detected',
'products': []
}
# Extract total count
total_count = await self._extract_total_count(page)
if not total_count:
total_count = max_products
logger.info(f"Total products expected: {total_count}")
# Get initial products
products_info = await self._get_products_info(page)
current_count = products_info['count']
all_products = products_info['products']
logger.info(f"Initial products found: {current_count}")
logger.info(f"Selector used: {products_info.get('selector_used', 'unknown')}")
if current_count > 0 and all_products:
logger.info(f"First product example: {all_products[0] if all_products else 'None'}")
# Load more products if needed
if current_count < min(total_count, max_products):
await self._load_more_products(
page,
current_count,
min(total_count, max_products),
all_products
)
# Final extraction
final_info = await self._get_products_info(page)
final_products = final_info['products'][:max_products]
await browser.close()
extraction_rate = None
if total_count:
extraction_rate = (len(final_products) / total_count * 100)
return {
'success': True,
'total_expected': total_count,
'total_extracted': len(final_products),
'extraction_rate': extraction_rate,
'products': final_products,
'error': None
}
except Exception as e:
logger.error(f"Attempt {retry_count + 1} failed: {str(e)}")
if retry_count < self.max_retries - 1:
await asyncio.sleep(5)
else:
return {
'success': False,
'error': str(e),
'products': []
}
return {
'success': False,
'error': 'Max retries exceeded',
'products': []
}
async def _check_for_error(self, page) -> Dict:
"""Check if the page contains an error"""
return await page.evaluate("""
() => {
const bodyText = document.body.innerText || '';
const hasError =
bodyText.includes('error') ||
bodyText.includes('Error') ||
bodyText.includes('404') ||
bodyText.includes('500') ||
bodyText.includes('Not Found');
const hasProducts = document.querySelectorAll('.product-grid-list-item').length > 0;
return {
has_error: hasError,
has_products: hasProducts
};
}
""")
async def _extract_total_count(self, page) -> Optional[int]:
"""Extract total product count from the page"""
return await page.evaluate("""
() => {
const patterns = [
/«.*?».*?[((](\\d+)[))]/,
/[((](\\d+)[))]/
];
const bodyText = document.body.innerText;
for (const pattern of patterns) {
const match = bodyText.match(pattern);
if (match) {
return parseInt(match[1]);
}
}
return null;
}
""")
async def _get_products_info(self, page) -> Dict:
"""Get information about products on the page"""
# First log what selectors we can find (for debugging)
selector_check = await page.evaluate("""
() => {
return {
product_grid: document.querySelectorAll('.product-grid-list-item').length,
product_item: document.querySelectorAll('.product-item').length,
h_product: document.querySelectorAll('[class*="h-product"]').length,
grid_item: document.querySelectorAll('[class*="grid-item"]').length,
any_product: document.querySelectorAll('[class*="product"]').length
};
}
""")
logger.info(f"Selector check: {selector_check}")
return await page.evaluate("""
() => {
// Try multiple possible selectors (from WSL success)
const selectors = [
'.product-grid-list-item',
'.product-item',
'[class*="h-product"]',
'[class*="grid-item"]',
'article[class*="product"]'
];
let items = [];
for (const selector of selectors) {
items = document.querySelectorAll(selector);
if (items.length > 0) {
console.log(`Found ${items.length} products with selector: ${selector}`);
break;
}
}
const products = [];
items.forEach((item, index) => {
// Try multiple name selectors
const nameSelectors = [
'.product-item-name',
'h3',
'[class*="product-name"]',
'[class*="title"]',
'a[href*="/product/"] span',
'a[href*="/product/"]'
];
let name = '';
for (const selector of nameSelectors) {
const el = item.querySelector(selector);
if (el && el.textContent) {
name = el.textContent.trim();
if (name) break;
}
}
// Try multiple price selectors
const priceSelectors = [
'.product-item-price',
'[class*="price"]',
'span[class*="price"]',
'div[class*="price"]'
];
let priceText = '';
for (const selector of priceSelectors) {
const el = item.querySelector(selector);
if (el && el.textContent) {
priceText = el.textContent.trim();
if (priceText && priceText.includes('¥')) break;
}
}
const linkElement = item.querySelector('a[href*="/product/"], a[href]');
const url = linkElement ? linkElement.href : '';
const imgElement = item.querySelector('img[src], img[data-src]');
const imageUrl = imgElement ? (imgElement.src || imgElement.dataset.src) : '';
const urlMatch = url.match(/product\\/([^\\/]+)/);
const productId = urlMatch ? urlMatch[1] : `item_${index + 1}`;
if (name || priceText) {
products.push({
id: productId,
name: name,
price: priceText,
url: url,
image_url: imageUrl
});
}
});
return {
count: items.length,
products: products,
selector_used: items.length > 0 ? selectors.find(s => document.querySelectorAll(s).length > 0) : 'none'
};
}
""")
async def _load_more_products(self, page, current_count: int, target_count: int, all_products: List):
"""Load more products using Load More button and scrolling"""
load_attempts = 0
max_load_attempts = 10
while current_count < target_count and load_attempts < max_load_attempts:
load_attempts += 1
logger.info(f"Load attempt {load_attempts}: {current_count}/{target_count}")
# Try Load More button (improved from WSL version)
if load_attempts <= 3: # Try button click multiple times
logger.info("Looking for Load More button...")
# First check if button exists and log details
button_info = await page.evaluate("""
() => {
// Multiple possible selectors for Load More button
const selectors = [
'button.h-btn_text', // WSL version success selector
'.h-btn_text',
'button[class*="btn_text"]',
'.grid-result-footer-wrapper button',
'button:has-text("Load More")',
'button:has-text("もっと見る")', // Japanese version
'[role="button"]:has-text("Load")',
'button.load-more'
];
for (const selector of selectors) {
try {
const button = document.querySelector(selector);
if (button) {
return {
found: true,
selector: selector,
text: button.textContent,
visible: button.offsetParent !== null,
disabled: button.disabled,
classes: button.className
};
}
} catch (e) {
// Continue to next selector
}
}
return { found: false };
}
""")
logger.info(f"Button info: {button_info}")
if button_info.get('found'):
# Try to click the button
button_clicked = await page.evaluate("""
async () => {
const button = document.querySelector('""" + button_info.get('selector', '') + """');
if (button && !button.disabled) {
// Scroll to button first
button.scrollIntoView({behavior: 'smooth', block: 'center'});
await new Promise(resolve => setTimeout(resolve, 1000));
// Try multiple click methods
try {
// Method 1: Direct click
button.click();
return true;
} catch (e1) {
try {
// Method 2: Dispatch click event
const event = new MouseEvent('click', {
view: window,
bubbles: true,
cancelable: true
});
button.dispatchEvent(event);
return true;
} catch (e2) {
console.error('All click methods failed');
return false;
}
}
}
return false;
}
""")
if button_clicked:
logger.info("Load More button clicked successfully")
await page.wait_for_timeout(5000) # Wait longer for products to load
# Wait for Angular to stabilize again
await page.evaluate("""
() => new Promise(resolve => {
if (window.getAllAngularTestabilities) {
const testabilities = window.getAllAngularTestabilities();
Promise.all(
testabilities.map(t =>
new Promise(r => t.whenStable(r))
)
).then(resolve);
} else {
setTimeout(resolve, 2000);
}
})
""")
else:
logger.info("Load More button not found, trying scroll...")
# Scroll strategy (from WSL success pattern)
else:
logger.info("Attempting scroll-based loading...")
# Smooth scroll to bottom
for _ in range(3):
await page.evaluate("""
window.scrollBy({
top: 500,
behavior: 'smooth'
})
""")
await page.wait_for_timeout(500)
# Final scroll to absolute bottom
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(3000)
# Check for loading indicator (dots)
is_loading = await page.evaluate("""
() => {
const dots = document.querySelector('.dots, [class*="loading"], [class*="spinner"]');
return dots && dots.offsetParent !== null;
}
""")
if is_loading:
logger.info("Loading indicator detected, waiting...")
await page.wait_for_timeout(3000)
# Check for new products
products_info = await self._get_products_info(page)
new_count = products_info['count']
if new_count > current_count:
logger.info(f"Loaded {new_count - current_count} new products")
current_count = new_count
all_products.extend(products_info['products'][len(all_products):])
else:
# Try aggressive scroll
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(3000)
products_info = await self._get_products_info(page)
if products_info['count'] == current_count:
logger.info("No more products available")
break
current_count = products_info['count']