| | import json |
| | from pydantic import BaseModel |
| | from typing import List, Set, Tuple |
| | from crawl4ai import ( |
| | AsyncWebCrawler, |
| | BrowserConfig, |
| | CacheMode, |
| | CrawlerRunConfig, |
| | LLMExtractionStrategy, |
| | ) |
| | from utils import is_duplicated |
| | from config import LLM_MODEL, API_TOKEN |
| |
|
| |
|
| | def get_browser_config() -> BrowserConfig: |
| | """ |
| | Returns the browser configuration for the crawler. |
| | |
| | Returns: |
| | BrowserConfig: The configuration settings for the browser. |
| | """ |
| | |
| | return BrowserConfig( |
| | browser_type="chromium", |
| | headless=True, |
| | verbose=True, |
| | ) |
| |
|
| |
|
| | def get_llm_strategy(llm_instructions: str, output_format: BaseModel) -> LLMExtractionStrategy: |
| | """ |
| | Returns the configuration for the language model extraction strategy. |
| | |
| | Returns: |
| | LLMExtractionStrategy: The settings for how to extract data using LLM. |
| | """ |
| | |
| | return LLMExtractionStrategy( |
| | provider=LLM_MODEL, |
| | api_token=API_TOKEN, |
| | schema=output_format.model_json_schema(), |
| | extraction_type="schema", |
| | instruction=llm_instructions, |
| | input_format="markdown", |
| | verbose=True, |
| | ) |
| |
|
| | async def check_no_results( |
| | crawler: AsyncWebCrawler, |
| | url: str, |
| | session_id: str, |
| | ) -> bool: |
| | """ |
| | Checks if the "No Results Found" message is present on the page. |
| | |
| | Args: |
| | crawler (AsyncWebCrawler): The web crawler instance. |
| | url (str): The URL to check. |
| | session_id (str): The session identifier. |
| | |
| | Returns: |
| | bool: True if "No Results Found" message is found, False otherwise. |
| | """ |
| | |
| | result = await crawler.arun( |
| | url=url, |
| | config=CrawlerRunConfig( |
| | cache_mode=CacheMode.BYPASS, |
| | session_id=session_id, |
| | ), |
| | ) |
| |
|
| | if result.success: |
| | if "No Results Found" in result.cleaned_html: |
| | return True |
| | else: |
| | print( |
| | f"Error fetching page for 'No Results Found' check: {result.error_message}" |
| | ) |
| |
|
| | return False |
| |
|
| |
|
| | async def fetch_and_process_page( |
| | crawler: AsyncWebCrawler, |
| | page_number: int, |
| | base_url: str, |
| | css_selector: str, |
| | llm_strategy: LLMExtractionStrategy, |
| | session_id: str, |
| | seen_names: Set[str], |
| | ) -> Tuple[List[dict], bool]: |
| | """ |
| | Fetches and processes a single page from yellowpages. |
| | |
| | Args: |
| | crawler (AsyncWebCrawler): The web crawler instance. |
| | page_number (int): The page number to fetch. |
| | base_url (str): The base URL of the website. |
| | css_selector (str): The CSS selector to target the content. |
| | llm_strategy (LLMExtractionStrategy): The LLM extraction strategy. |
| | session_id (str): The session identifier. |
| | required_keys (List[str]): List of required keys in the business data. |
| | seen_names (Set[str]): Set of business names that have already been seen. |
| | |
| | Returns: |
| | Tuple[List[dict], bool]: |
| | - List[dict]: A list of processed businesss from the page. |
| | - bool: A flag indicating if the "No Results Found" message was encountered. |
| | """ |
| | url = base_url.format(page_number=page_number) |
| | print(f"Loading page {page_number}...") |
| |
|
| | |
| | no_results = await check_no_results(crawler, url, session_id) |
| | if no_results: |
| | return [], True |
| |
|
| | |
| | result = await crawler.arun( |
| | url=url, |
| | config=CrawlerRunConfig( |
| | cache_mode=CacheMode.BYPASS, |
| | extraction_strategy=llm_strategy, |
| | css_selector=css_selector, |
| | session_id=session_id, |
| | ), |
| | ) |
| | print("----------------------------- Result-----------------------------") |
| | print(result.extracted_content) |
| |
|
| | if not (result.success and result.extracted_content): |
| | print(f"Error fetching page {page_number}: {result.error_message}") |
| | return [], False |
| |
|
| | |
| | extracted_data = json.loads(result.extracted_content) |
| | print("----------------------------Exracted Data----------------------------") |
| | print(extracted_data) |
| | if not extracted_data: |
| | print(f"No businesss found on page {page_number}.") |
| | return [], False |
| |
|
| | |
| | print("Extracted data:", extracted_data) |
| |
|
| | |
| | all_businesses = [] |
| | for business in extracted_data: |
| | |
| | print("Processing business:", business) |
| |
|
| | |
| | if business.get("error") is False: |
| | business.pop("error", None) |
| |
|
| | if is_duplicated(business["name"], seen_names): |
| | print(f"Duplicate business '{business['name']}' found. Skipping.") |
| | continue |
| |
|
| | |
| | seen_names.add(business["name"]) |
| | all_businesses.append(business) |
| |
|
| | if not all_businesses: |
| | print(f"No complete businesss found on page {page_number}.") |
| | return [], False |
| |
|
| | print(f"Extracted {len(all_businesses)} businesss from page {page_number}.") |
| | return all_businesses, False |
| |
|