llm-ready-data / app /services /scraper_service.py
light-infer-chat's picture
ok
f02b0c0
Raw
History Blame Contribute Delete
4.34 kB
from __future__ import annotations
import asyncio
import logging
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class ScraperService:
"""
Generic web scraping service using the Scrapling library.
Supports three fetcher modes:
- ``http`` — lightweight HTTP requests (AsyncFetcher)
- ``dynamic`` — Playwright-based full browser (DynamicFetcher)
- ``stealth`` — Undetectable headless browser (StealthyFetcher)
"""
FETCHERS_AVAILABLE: List[str] = []
@staticmethod
async def _check_fetchers() -> List[str]:
available: List[str] = []
try:
from scrapling.fetchers import AsyncFetcher # noqa: F401
available.append("http")
except ImportError:
pass
try:
from scrapling.fetchers import StealthyFetcher # noqa: F401
available.append("stealth")
except ImportError:
pass
try:
from scrapling.fetchers import DynamicFetcher # noqa: F401
available.append("dynamic")
except ImportError:
pass
return available
@classmethod
async def _fetch_page(
cls,
url: str,
fetcher_type: str,
proxy: Optional[str],
network_idle: bool,
) -> Any:
from scrapling.fetchers import AsyncFetcher, DynamicFetcher, StealthyFetcher
if fetcher_type == "stealth":
logger.info("fetcher=stealth url=%s", url)
return await asyncio.to_thread(
StealthyFetcher.fetch,
url,
headless=True,
network_idle=network_idle,
proxy=proxy,
)
if fetcher_type == "dynamic":
logger.info("fetcher=dynamic url=%s", url)
return await asyncio.to_thread(
DynamicFetcher.fetch,
url,
headless=True,
network_idle=network_idle,
proxy=proxy,
)
logger.info("fetcher=http url=%s", url)
return await AsyncFetcher.get(url, proxy=proxy)
@staticmethod
def _run_selector(page: Any, rule: Dict[str, Any]) -> Any:
kwargs: Dict[str, Any] = {
"auto_save": rule.get("auto_save", False),
"adaptive": rule.get("auto_match", False),
}
selector = rule["selector"]
if rule.get("selector_type", "css") == "xpath":
return page.xpath(selector, **kwargs)
return page.css(selector, **kwargs)
@staticmethod
def _unpack(elements: Any, extract_all: bool) -> Any:
if extract_all:
if hasattr(elements, "getall"):
return elements.getall()
return [str(e) for e in elements]
if hasattr(elements, "get"):
return elements.get()
if elements:
return str(elements[0])
return None
@classmethod
async def extract(
cls,
url: str,
fetcher_type: str,
rules: List[Dict[str, Any]],
proxy: Optional[str] = None,
network_idle: bool = False,
) -> Dict[str, Any]:
try:
page = await cls._fetch_page(url, fetcher_type, proxy, network_idle)
except Exception as exc:
logger.exception("fetch_failed url=%s", url)
return {"error": f"Failed to fetch {url}: {exc}"}
result: Dict[str, Any] = {}
for rule in rules:
field = rule.get("field_name", "unknown")
try:
elements = cls._run_selector(page, rule)
result[field] = cls._unpack(elements, rule.get("extract_all", False))
except Exception as exc:
logger.warning("parse_failed field=%s selector=%s error=%s", field, rule.get("selector"), exc)
result[field] = None
return result
@classmethod
async def health(cls) -> Dict[str, Any]:
available = await cls._check_fetchers()
cls.FETCHERS_AVAILABLE = available
try:
import scrapling
version = getattr(scrapling, "__version__", "0.4+")
except ImportError:
version = "not installed"
return {
"available": available,
"version": version,
}