github-actions
Sync from GitHub 2025-12-17T12:18:53Z
5a3b322
from __future__ import annotations
import argparse
import asyncio
import os
from typing import Optional
import logging
import structlog
from config import load_config
from crawler.export import export_catalog
from crawler.fetcher import PlaywrightFetcher
from crawler.parser_catalog import parse_catalog_page
from crawler.parser_detail import parse_detail_page
from crawler.robots import RobotsManager
from crawler.storage import (
PAGE_TYPE_CATALOG,
PAGE_TYPE_DETAIL,
PARSE_PENDING,
Storage,
)
from crawler.utils import RateLimiter
logger = structlog.get_logger(__name__)
def configure_logging(log_level: str = "INFO") -> None:
logging.basicConfig(level=getattr(logging, log_level.upper(), logging.INFO))
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, log_level.upper(), logging.INFO)),
processors=[
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
async def crawl_catalog(
start_url: str,
storage: Storage,
fetcher: PlaywrightFetcher,
robots: RobotsManager,
max_discover: int | None = None,
allow_robots_bypass: bool = False,
) -> None:
frontier = [start_url]
seen = set()
total_discovered = 0
while frontier:
url = frontier.pop(0)
if url in seen:
continue
seen.add(url)
allowed = allow_robots_bypass or robots.is_allowed(url)
if not allowed:
logger.warning("catalog.fetch.disallowed", url=url)
continue
if allow_robots_bypass:
logger.warning("catalog.fetch.disallowed.bypassed", url=url)
result = await fetcher.fetch(url, page_type=PAGE_TYPE_CATALOG)
storage.upsert_page(result.record)
if result.error or not result.html:
logger.error("catalog.fetch.failed", url=url, error=result.error)
continue
_, discovered_urls, next_pages = parse_catalog_page(result.html, source_url=url, storage=storage)
total_discovered += len(discovered_urls)
for next_url in next_pages:
if next_url not in seen:
frontier.append(next_url)
if max_discover and total_discovered >= max_discover:
logger.info("catalog.max_discover.reached", total=total_discovered, max=max_discover)
break
async def crawl_details(
storage: Storage,
fetcher: PlaywrightFetcher,
robots: RobotsManager,
allow_robots_bypass: bool = False,
) -> None:
pending = storage.get_pages_by_type(PAGE_TYPE_DETAIL, parse_status=PARSE_PENDING)
logger.info("detail.queue", pending=len(pending))
for page in pending:
url = page["url"]
allowed = allow_robots_bypass or robots.is_allowed(url)
if not allowed:
logger.warning("detail.fetch.disallowed", url=url)
continue
if allow_robots_bypass:
logger.warning("detail.fetch.disallowed.bypassed", url=url)
result = await fetcher.fetch(url, page_type=PAGE_TYPE_DETAIL)
storage.upsert_page(result.record)
if result.error or not result.html:
logger.error("detail.fetch.failed", url=url, error=result.error)
continue
parse_detail_page(result.html, url=url, storage=storage)
def main(argv: Optional[list[str]] = None) -> None:
parser = argparse.ArgumentParser(description="Crawler pipeline")
parser.add_argument("--mode", choices=["crawl_all", "discover", "details", "export"], default="crawl_all")
parser.add_argument("--config", type=str, default=os.environ.get("CONFIG_PATH", "configs/config.yaml"))
parser.add_argument("--parquet", type=str, default="data/catalog.parquet")
parser.add_argument("--jsonl", type=str, default="data/catalog.jsonl")
parser.add_argument(
"--max-discover",
type=int,
default=None,
help="Limit number of detail URLs discovered (for smoke tests)",
)
parser.add_argument(
"--limit-export",
type=int,
default=None,
help="Limit number of rows exported (for smoke tests)",
)
parser.add_argument(
"--allow-robots-bypass",
action="store_true",
help="Bypass robots.txt disallow (for testing; use responsibly)",
)
args = parser.parse_args(argv)
config = load_config(args.config)
configure_logging(config.get("app", {}).get("log_level", "INFO"))
crawler_cfg = config.get("crawler", {})
rate_limiter = RateLimiter(
base_delay=float(os.environ.get("REQUEST_DELAY_SECONDS", crawler_cfg.get("request_delay_seconds", 1.5))),
jitter=float(os.environ.get("JITTER_SECONDS", crawler_cfg.get("jitter_seconds", 0.5))),
)
user_agent = os.environ.get("USER_AGENT", crawler_cfg.get("user_agent"))
start_url = os.environ.get("START_URL", crawler_cfg.get("start_url"))
max_retries = int(os.environ.get("MAX_RETRIES", crawler_cfg.get("max_retries", 3)))
sqlite_path = crawler_cfg.get("sqlite_path", "data/crawler.db")
allow_bypass = args.allow_robots_bypass or os.environ.get("ALLOW_ROBOTS_BYPASS", "").lower() in {"1", "true", "yes"}
storage = Storage(sqlite_path)
robots = RobotsManager(robots_url="https://www.shl.com/robots.txt", user_agent=user_agent)
robots.load()
async def _runner():
async with PlaywrightFetcher(user_agent=user_agent, rate_limiter=rate_limiter, max_retries=max_retries) as fetcher:
if args.mode in {"crawl_all", "discover"}:
await crawl_catalog(start_url, storage, fetcher, robots, max_discover=args.max_discover, allow_robots_bypass=allow_bypass)
if args.mode in {"crawl_all", "details"}:
await crawl_details(storage, fetcher, robots, allow_robots_bypass=allow_bypass)
if args.mode in {"crawl_all", "discover", "details"}:
asyncio.run(_runner())
if args.mode == "export":
export_catalog(
storage,
parquet_path=args.parquet,
jsonl_path=args.jsonl,
limit=args.limit_export,
min_count=1 if args.limit_export else 377,
)
if __name__ == "__main__":
main()