| from __future__ import annotations | |
| import re | |
| from typing import Dict, Iterable, List, Optional | |
| import structlog | |
| from bs4 import BeautifulSoup, Tag | |
| from crawler.storage import PARSE_PARSED, Storage | |
| from crawler.utils import canonicalize_url, now_iso | |
| logger = structlog.get_logger(__name__) | |
| ALLOWED_TEST_TYPES = {"A", "B", "C", "D", "E", "K", "P", "S"} | |
| STOP_LABELS = [ | |
| "Job levels", | |
| "Job level", | |
| "Languages", | |
| "Language", | |
| "Assessment length", | |
| "Assessment Length", | |
| "Test Type", | |
| "Remote Testing", | |
| "Adaptive/IRT", | |
| "Adaptive", | |
| "Downloads", | |
| ] | |
| STOP_LABELS_LOWER = [s.lower() for s in STOP_LABELS] | |
| TEST_TYPE_LABELS = { | |
| "A": "Ability & Aptitude", | |
| "B": "Biodata & Situational Judgement", | |
| "C": "Competencies", | |
| "D": "Development & 360", | |
| "E": "Assessment Exercises", | |
| "K": "Knowledge & Skills", | |
| "P": "Personality & Behavior", | |
| "S": "Simulations", | |
| } | |
| def _normalize(text: str) -> str: | |
| return re.sub(r"\s+", " ", (text or "")).strip() | |
| def _extract_text(soup: BeautifulSoup, selector: str) -> Optional[str]: | |
| node = soup.select_one(selector) | |
| if not node: | |
| return None | |
| text = _normalize(node.get_text(" ", strip=True)) | |
| return text or None | |
| def _find_label_node(soup: BeautifulSoup, label: str) -> Optional[Tag]: | |
| label_l = label.lower() | |
| candidates = soup.find_all(["h1", "h2", "h3", "h4", "h5", "h6", "p", "div", "span", "strong", "dt", "th", "li"]) | |
| for node in candidates: | |
| txt = _normalize(node.get_text(" ", strip=True)).lower() | |
| if txt == label_l or txt.startswith(label_l + ":") or txt.startswith(label_l): | |
| return node | |
| for node in candidates: | |
| txt = _normalize(node.get_text(" ", strip=True)).lower() | |
| if re.search(rf"\b{re.escape(label_l)}\b", txt): | |
| return node | |
| return None | |
| def _extract_section_until(soup: BeautifulSoup, start_label: str, stop_labels: Iterable[str]) -> Optional[str]: | |
| start = _find_label_node(soup, start_label) | |
| if not start: | |
| return None | |
| chunks: List[str] = [] | |
| start_txt = _normalize(start.get_text(" ", strip=True)) | |
| if re.match(rf"^{re.escape(start_label)}\s*:", start_txt, flags=re.I): | |
| after = re.split(rf"^{re.escape(start_label)}\s*:\s*", start_txt, flags=re.I)[-1] | |
| if after: | |
| chunks.append(after) | |
| for node in start.find_all_next(): | |
| if node == start: | |
| continue | |
| if not isinstance(node, Tag): | |
| continue | |
| node_txt = _normalize(node.get_text(" ", strip=True)) | |
| if not node_txt: | |
| continue | |
| for stop in stop_labels: | |
| if re.match(rf"^{re.escape(stop)}\b", node_txt, flags=re.I): | |
| return _normalize(" ".join(chunks)) or None | |
| if node.name in {"p", "li"}: | |
| chunks.append(node_txt) | |
| elif node.name in {"div", "span"} and len(node_txt) > 40: | |
| chunks.append(node_txt) | |
| return _normalize(" ".join(chunks)) or None | |
| def _extract_segment(text: str, label: str, stop_labels: Iterable[str]) -> Optional[str]: | |
| """Extract substring after a label up to the next stop label in raw text.""" | |
| text_norm = _normalize(text) | |
| lower = text_norm.lower() | |
| label_l = label.lower() | |
| start = lower.find(label_l) | |
| if start == -1: | |
| return None | |
| start = start + len(label_l) | |
| while start < len(text_norm) and text_norm[start] in " :": | |
| start += 1 | |
| stop_pos = len(text_norm) | |
| for stop in stop_labels: | |
| pos = lower.find(stop, start) | |
| if pos != -1 and pos < stop_pos: | |
| stop_pos = pos | |
| segment = text_norm[start:stop_pos].strip(" :-") | |
| return segment or None | |
| def _extract_kv_value(soup: BeautifulSoup, label: str) -> Optional[str]: | |
| node = _find_label_node(soup, label) | |
| if not node: | |
| return None | |
| txt = _normalize(node.get_text(" ", strip=True)) | |
| m = re.match(rf"^{re.escape(label)}\s*:\s*(.+)$", txt, flags=re.I) | |
| if m: | |
| return m.group(1).strip() or None | |
| remainder = re.sub(rf"^{re.escape(label)}\s*", "", txt, flags=re.I).strip(" :-") | |
| if remainder and remainder.lower() != label.lower(): | |
| return remainder | |
| for sib in node.next_siblings: | |
| if isinstance(sib, Tag): | |
| v = _normalize(sib.get_text(" ", strip=True)) | |
| if v: | |
| return v | |
| parent = node.parent if isinstance(node.parent, Tag) else None | |
| if parent: | |
| parent_txt = _normalize(parent.get_text(" ", strip=True)) | |
| parent_remainder = re.sub(rf"\b{re.escape(label)}\b", "", parent_txt, flags=re.I).strip(" :-") | |
| if parent_remainder: | |
| return parent_remainder | |
| for sib in parent.find_next_siblings(): | |
| v = _normalize(sib.get_text(" ", strip=True)) | |
| if v: | |
| return v | |
| return None | |
| def _extract_duration_minutes(soup: BeautifulSoup) -> Optional[int]: | |
| text = _normalize(soup.get_text(" ", strip=True)) | |
| patterns = [ | |
| r"minutes?\s*=\s*(\d+)", | |
| r"(\d+)\s*(?:minute|min)\b", | |
| r"completion time.*?(\d+)\s*(?:minute|min)\b", | |
| ] | |
| for pat in patterns: | |
| m = re.search(pat, text, flags=re.I) | |
| if m: | |
| try: | |
| return int(m.group(1)) | |
| except Exception: | |
| continue | |
| return None | |
| def _extract_test_type_from_meta(soup: BeautifulSoup) -> Optional[str]: | |
| label = _find_label_node(soup, "Test Type") | |
| scope = label.parent if label and isinstance(label.parent, Tag) else label or soup | |
| tokens: List[str] = [] | |
| for el in scope.find_all(["span", "button", "a"], limit=30): | |
| t = _normalize(el.get_text("", strip=True)) | |
| if len(t) == 1 and t in ALLOWED_TEST_TYPES: | |
| tokens.append(t) | |
| if not tokens: | |
| for el in label.find_all_next(["span", "button", "a"], limit=30) if label else []: | |
| t = _normalize(el.get_text("", strip=True)) | |
| if len(t) == 1 and t in ALLOWED_TEST_TYPES: | |
| tokens.append(t) | |
| if not tokens: | |
| return None | |
| out = [] | |
| seen = set() | |
| for t in tokens: | |
| if t not in seen: | |
| seen.add(t) | |
| out.append(t) | |
| return ",".join(out) | |
| def _map_test_types_full(test_type: Optional[str]) -> Optional[str]: | |
| if not test_type: | |
| return None | |
| parts = [] | |
| for token in test_type.split(","): | |
| token = token.strip() | |
| if not token: | |
| continue | |
| full = TEST_TYPE_LABELS.get(token) | |
| if full: | |
| parts.append(full) | |
| return ", ".join(parts) if parts else None | |
| def _split_list(value: Optional[str]) -> Optional[list[str]]: | |
| if not value: | |
| return None | |
| parts = [p.strip() for p in value.replace(";", ",").split(",") if p.strip()] | |
| return parts or None | |
| def _is_positive_indicator(node: Tag) -> bool: | |
| if not node: | |
| return False | |
| attrs = " ".join( | |
| [ | |
| " ".join(node.get("class", [])) if isinstance(node.get("class"), list) else str(node.get("class") or ""), | |
| str(node.get("aria-label") or ""), | |
| str(node.get("title") or ""), | |
| str(node.get("style") or ""), | |
| ] | |
| ).lower() | |
| positive_tokens = ["green", "yes", "true", "available", "supported", "active", "enabled", "tick", "check", "on"] | |
| return any(tok in attrs for tok in positive_tokens) | |
| def _extract_boolean_from_meta(soup: BeautifulSoup, label_text: str) -> Optional[bool]: | |
| label = _find_label_node(soup, label_text) | |
| if not label: | |
| return None | |
| container = label.parent if isinstance(label.parent, Tag) else label | |
| for el in container.find_all(["span", "i", "svg", "img"], limit=20): | |
| if _is_positive_indicator(el): | |
| return True | |
| for el in label.find_all_next(["span", "i", "svg", "img"], limit=20): | |
| if _is_positive_indicator(el): | |
| return True | |
| return False | |
| def extract_detail_fields(html: str) -> Dict: | |
| soup = BeautifulSoup(html, "lxml") | |
| title = _extract_text(soup, "h1") or _extract_text(soup, "title") | |
| full_text = _normalize(soup.get_text(" ", strip=True)) | |
| description = _extract_segment(full_text, "description", STOP_LABELS_LOWER) | |
| if not description: | |
| description = _extract_section_until(soup, "Description", STOP_LABELS) | |
| job_levels_raw = _extract_kv_value(soup, "Job levels") or _extract_segment(full_text, "job levels", STOP_LABELS_LOWER) | |
| job_levels = _split_list(job_levels_raw) | |
| languages_raw = _extract_kv_value(soup, "Languages") or _extract_segment(full_text, "languages", STOP_LABELS_LOWER) | |
| languages = _split_list(languages_raw) | |
| duration = _extract_duration_minutes(soup) | |
| if duration is None: | |
| segment = _extract_segment(full_text, "assessment length", STOP_LABELS_LOWER) | |
| if segment: | |
| match = re.search(r"(\d+)\s*(?:minute|min)", segment, flags=re.I) | |
| if match: | |
| try: | |
| duration = int(match.group(1)) | |
| except Exception: | |
| duration = None | |
| test_type = _extract_test_type_from_meta(soup) | |
| test_type_full = _map_test_types_full(test_type) | |
| remote_support = _extract_boolean_from_meta(soup, "Remote Testing") | |
| adaptive_support = _extract_boolean_from_meta(soup, "Adaptive/IRT") | |
| if adaptive_support is None: | |
| adaptive_support = _extract_boolean_from_meta(soup, "Adaptive") | |
| if adaptive_support is None: | |
| adaptive_support = _extract_boolean_from_meta(soup, "Adaptive/IRT Testing") | |
| downloads = [] | |
| downloads_label = _find_label_node(soup, "Downloads") | |
| scope = downloads_label.parent if downloads_label and isinstance(downloads_label.parent, Tag) else soup | |
| for link in scope.find_all("a", href=True): | |
| text = _normalize(link.get_text(" ", strip=True)) | |
| href = link["href"] | |
| if text and any(keyword in text.lower() for keyword in ["report", "fact sheet", "sample", "pdf", "download", "brochure"]): | |
| downloads.append({"text": text, "url": href}) | |
| return { | |
| "name": title, | |
| "description": description, | |
| "test_type": test_type, | |
| "test_type_full": test_type_full, | |
| "remote_support": remote_support, | |
| "adaptive_support": adaptive_support, | |
| "duration_minutes": duration, | |
| "job_levels": job_levels, | |
| "languages": languages, | |
| "downloads": downloads or None, | |
| } | |
| def parse_detail_page(html: str, url: str, storage: Storage) -> Dict: | |
| fields = extract_detail_fields(html) | |
| storage.upsert_assessment( | |
| { | |
| "url": canonicalize_url(url), | |
| **fields, | |
| "last_updated_at": now_iso(), | |
| } | |
| ) | |
| storage.update_parse_status(url, PARSE_PARSED) | |
| logger.info("detail.parse.success", url=url, name=fields.get("name")) | |
| return fields | |