Spaces:
Sleeping
Sleeping
| import logging | |
| import asyncio | |
| from playwright.async_api import Locator, Page | |
| logger = logging.getLogger('fastapi_cli') | |
| # XPaths for navigation links | |
| INFO_PAGE_XPATHS = [ | |
| "//a[contains(@href, 'about') or contains(@href, 'company') or contains(@href, 'info') or contains(translate(text(), 'ABOUT', 'about'), 'about')]", | |
| "//a[contains(text(), '会社概要') or contains(text(), '企業情報')]" | |
| ] | |
| HOME_PAGE_XPATHS = [ | |
| "//a[contains(@href, 'home') or contains(@href, 'index') or @href='/']", | |
| "//a[contains(text(), 'ホーム') or contains(text(), 'HOME')]" | |
| ] | |
| def normalize_url(url: str, base_url: str) -> str: | |
| """Normalize URL to absolute form.""" | |
| if not url: | |
| return "" | |
| if url.startswith("http"): | |
| return url.rstrip("/") | |
| if url.startswith("/"): | |
| return base_url.rstrip("/") + url | |
| return base_url.rstrip("/") + "/" + url | |
| async def locate_info_page_links(page: Page) -> list[Locator]: | |
| """Locate all possible info/about page links in parallel.""" | |
| tasks = [page.locator(xpath).all() for xpath in INFO_PAGE_XPATHS] | |
| results = await asyncio.gather(*tasks) | |
| links = [link for result in results for link in result] | |
| valid_links = [] | |
| for link in links: | |
| href = await link.get_attribute("href") | |
| if href and not href.startswith(("#", "javascript:", "mailto:", "tel:")): | |
| valid_links.append(link) | |
| return valid_links | |
| async def locate_home_page_link(page: Page) -> Locator | None: | |
| """Locate the home page link in parallel.""" | |
| tasks = [page.locator(xpath).all() for xpath in HOME_PAGE_XPATHS] | |
| results = await asyncio.gather(*tasks) | |
| for links in results: | |
| for link in links: | |
| href = await link.get_attribute("href") | |
| if href and not href.startswith(("#", "javascript:", "mailto:", "tel:")): | |
| return link | |
| return None | |
| async def scrape_page_content(page: Page) -> dict[str, str]: | |
| """Scrape relevant textual content from the page in parallel.""" | |
| content = { | |
| 'url': page.url, | |
| 'title': await page.title(), | |
| 'main_content': '', | |
| 'company_info': '', | |
| 'meta_description': '', | |
| 'meta_keywords': '' | |
| } | |
| async def get_meta_data(): | |
| """Fetch meta description and keywords concurrently.""" | |
| try: | |
| meta_desc_task = page.locator('meta[name="description"]').get_attribute('content') | |
| meta_keywords_task = page.locator('meta[name="keywords"]').get_attribute('content') | |
| meta_desc, meta_keywords = await asyncio.gather(meta_desc_task, meta_keywords_task) | |
| content['meta_description'] = meta_desc or '' | |
| content['meta_keywords'] = meta_keywords or '' | |
| except Exception as e: | |
| logger.warning(f'Failed to get meta description and keywords: {e}') | |
| async def get_main_content(): | |
| """Fetch the main page content concurrently.""" | |
| main_selectors = [ | |
| "main", "article", ".main-content", "#main-content", | |
| ".content", "#content", "body" # fallback | |
| ] | |
| for selector in main_selectors: | |
| try: | |
| elements = await page.locator(selector).all() | |
| tasks = [element.inner_text() for element in elements if await element.is_visible()] | |
| texts = await asyncio.gather(*tasks) | |
| valid_texts = [text for text in texts if text and len(text) > 100] | |
| if valid_texts: | |
| content['main_content'] = valid_texts[0] | |
| break | |
| except Exception as e: | |
| logger.warning(f'Failed to scrape the main page content: {e}') | |
| async def get_company_info(): | |
| """Fetch company information concurrently.""" | |
| company_selectors = [ | |
| ".company-info", "#company-info", | |
| "section:has-text('会社概要')", "div:has-text('企業情報')", | |
| "table:has-text('会社概要')", "table:has-text('企業情報')" | |
| ] | |
| for selector in company_selectors: | |
| try: | |
| elements = await page.locator(selector).all() | |
| tasks = [element.inner_text() for element in elements if await element.is_visible()] | |
| texts = await asyncio.gather(*tasks) | |
| valid_texts = [text for text in texts if text] | |
| if valid_texts: | |
| content['company_info'] = valid_texts[0] | |
| break | |
| except Exception as e: | |
| logger.warning(f'Failed to scrape company info: {e}') | |
| # Run all scraping tasks concurrently | |
| await asyncio.gather(get_meta_data(), get_main_content(), get_company_info()) | |
| return content | |