| """ |
| FIPI scraper focused on extracting real tasks instead of generic page text. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| from datetime import datetime |
| import io |
| import logging |
| import math |
| import os |
| import re |
| import ssl |
| from typing import Dict, Iterable, List, Optional |
| from urllib.parse import urljoin |
| import zipfile |
|
|
| from bs4 import BeautifulSoup, Tag |
| import httpx |
| import requests |
|
|
| try: |
| from pypdf import PdfReader |
| except ImportError: |
| PdfReader = None |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class FIPIScraper: |
| """Collects task candidates from the FIPI bank and official demo archives.""" |
|
|
| SUBJECT_CONFIG = { |
| "russian": { |
| "label": "Русский язык", |
| "dynamic_sources": [ |
| { |
| "kind": "ege_bank", |
| "base_url": "https://ege.fipi.ru/bank", |
| "project_guid": "AF0ED3F2557F8FFC4C06F80B6803FD26", |
| "project_name": "ЕГЭ. Русский язык", |
| }, |
| { |
| "kind": "oge_bank", |
| "base_url": "https://oge.fipi.ru/bank", |
| "project_guid": "2F5EE3B12FE2A0EA40B06BF61A015416", |
| "project_name": "ОГЭ. Русский язык", |
| }, |
| ], |
| "official_demo_page": "https://fipi.ru/ege/demoversii-specifikacii-kodifikatory", |
| "official_variant_page": "https://fipi.ru/ege/otkrytyy-bank-zadaniy-ege/otkrytyye-varianty-kim-ege", |
| "archive_prefixes": ("ru_11_",), |
| "variant_prefixes": ("rus_",), |
| "title_keywords": ("русский язык",), |
| } |
| } |
|
|
| TASK_TYPE_KEYWORDS = { |
| "writing": ("сочинение", "эссе", "напишите", "сформулируйте", "прокомментируйте"), |
| "test": ("выберите", "укажите", "ответ", "вариант", "расставьте", "определите"), |
| "listening": ("аудио", "прослуш", "запись"), |
| "reading": ("прочитайте", "текст", "абзац", "предложение"), |
| } |
|
|
| GENERIC_TITLE_PATTERNS = ( |
| "открытый банк", |
| "демоверсии", |
| "спецификации", |
| "кодификаторы", |
| "федеральный институт", |
| "фипи", |
| "нормативно", |
| "документы", |
| "варианты ким", |
| ) |
|
|
| PDF_TASK_START_PATTERNS = ( |
| "Прочитайте текст", |
| "Самостоятельно подберите", |
| "В тексте выделено", |
| "Укажите", |
| "В одном из", |
| "Отредактируйте предложение", |
| "Установите соответствие", |
| "Расставьте", |
| "Определите", |
| "Найдите", |
| "Подберите", |
| ) |
|
|
| PDF_NOISE_PATTERNS = ( |
| "Инструкция по выполнению работы", |
| "Пояснения к демонстрационному варианту", |
| "Желаем успеха", |
| "Все бланки ЕГЭ заполняются", |
| "Баллы, полученные", |
| "После завершения работы", |
| "В демонстрационном варианте представлены", |
| "Часть 1 содержит 26 заданий", |
| "На выполнение экзаменационной работы", |
| "Ответами к заданиям 1–26 являются", |
| "Бланк", |
| ) |
|
|
| NOISE_PATTERNS = ( |
| "федеральный институт педагогических измерений", |
| "открытый банк тестовых заданий", |
| "открытый банк заданий егэ", |
| "открытый банк заданий огэ", |
| "подбор заданий", |
| "демоверсии, спецификации, кодификаторы", |
| "для предметных комиссий", |
| "аналитические и методические материалы", |
| "видеоконсультации разработчиков ким", |
| "скачать", |
| "изменения в ким", |
| ) |
|
|
| def __init__(self, base_url: str = "https://fipi.ru"): |
| self.base_url = base_url.rstrip("/") |
| self.headers = { |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" |
| ), |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", |
| "Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7", |
| } |
| self.page_size = max(1, int(os.getenv("SCRAPER_BANK_PAGE_SIZE", "10"))) |
| self.max_bank_pages = max(1, int(os.getenv("SCRAPER_MAX_BANK_PAGES", "5"))) |
| self.max_demo_archives = max(1, int(os.getenv("SCRAPER_MAX_DEMO_ARCHIVES", "2"))) |
| self.max_demo_tasks = max(1, int(os.getenv("SCRAPER_MAX_DEMO_TASKS", "20"))) |
| self.min_quality_score = max(1, int(os.getenv("SCRAPER_MIN_QUALITY_SCORE", "45"))) |
|
|
| async def fetch_page(self, url: str) -> Optional[str]: |
| response = await self._request("GET", url) |
| return response.text if response else None |
|
|
| async def fetch_bytes(self, url: str) -> Optional[bytes]: |
| response = await self._request("GET", url) |
| return response.content if response else None |
|
|
| async def _request( |
| self, |
| method: str, |
| url: str, |
| *, |
| data: Optional[Dict[str, str]] = None, |
| ) -> Optional[httpx.Response]: |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
|
|
| async with httpx.AsyncClient( |
| headers=self.headers, |
| timeout=45.0, |
| verify=ssl_context, |
| follow_redirects=True, |
| trust_env=False, |
| ) as client: |
| try: |
| response = await client.request(method, url, data=data) |
| response.raise_for_status() |
| return response |
| except httpx.HTTPError as e: |
| logger.error("Async request failed for %s: %r", url, e) |
|
|
| return await self._request_with_requests_fallback( |
| method=method, |
| url=url, |
| data=data, |
| ) |
|
|
| async def _request_with_requests_fallback( |
| self, |
| *, |
| method: str, |
| url: str, |
| data: Optional[Dict[str, str]] = None, |
| ) -> Optional[httpx.Response]: |
| def do_request() -> Optional[httpx.Response]: |
| session = requests.Session() |
| session.trust_env = False |
|
|
| try: |
| response = session.request( |
| method=method, |
| url=url, |
| data=data, |
| headers=self.headers, |
| timeout=45, |
| verify=False, |
| allow_redirects=True, |
| ) |
| response.raise_for_status() |
|
|
| request = httpx.Request(method, url, headers=self.headers) |
| return httpx.Response( |
| status_code=response.status_code, |
| headers=response.headers, |
| content=response.content, |
| request=request, |
| ) |
| except requests.RequestException as exc: |
| logger.error("Requests fallback failed for %s: %r", url, exc) |
| return None |
| finally: |
| session.close() |
|
|
| return await asyncio.to_thread(do_request) |
|
|
| async def scrape_tasks( |
| self, |
| subject: str = "russian", |
| *, |
| include_official_archives: bool = True, |
| ) -> List[Dict]: |
| config = self.SUBJECT_CONFIG.get(subject) |
| if not config: |
| logger.warning("Unknown subject %s, falling back to russian", subject) |
| config = self.SUBJECT_CONFIG["russian"] |
|
|
| candidates: List[Dict] = [] |
| candidates.extend(await self.scrape_dynamic_bank(subject)) |
| if include_official_archives: |
| candidates.extend(await self.scrape_official_archives(subject)) |
| validated = self._dedupe_candidates(self._filter_candidates(candidates)) |
| logger.info("Accepted %s task candidates after filtering", len(validated)) |
| return validated |
|
|
| async def scrape_dynamic_bank(self, subject: str = "russian") -> List[Dict]: |
| config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"]) |
| tasks: List[Dict] = [] |
|
|
| for source in config["dynamic_sources"]: |
| project_guid = source["project_guid"] |
| questions_url = f"{source['base_url']}/questions.php" |
| total_tasks = None |
|
|
| for page_index in range(self.max_bank_pages): |
| html = await self._fetch_bank_page( |
| questions_url=questions_url, |
| project_guid=project_guid, |
| page_index=page_index, |
| ) |
| if not html: |
| break |
|
|
| if total_tasks is None: |
| total_tasks = self._extract_total_count(html) |
| if total_tasks: |
| max_pages = math.ceil(total_tasks / self.page_size) |
| logger.info( |
| "Bank %s reports %s tasks, scraping up to %s pages", |
| source["project_name"], |
| total_tasks, |
| min(max_pages, self.max_bank_pages), |
| ) |
|
|
| soup = BeautifulSoup(html, "lxml") |
| blocks = soup.select("div.qblock") |
| if not blocks: |
| logger.warning( |
| "No qblock nodes found for %s page=%s via primary fetch, retrying POST search", |
| source["project_name"], |
| page_index, |
| ) |
| html = await self._fetch_bank_page( |
| questions_url=questions_url, |
| project_guid=project_guid, |
| page_index=page_index, |
| force_post=True, |
| ) |
| if not html: |
| break |
|
|
| soup = BeautifulSoup(html, "lxml") |
| blocks = soup.select("div.qblock") |
| if not blocks: |
| logger.warning( |
| "No qblock nodes found for %s page=%s after retry", |
| source["project_name"], |
| page_index, |
| ) |
| break |
|
|
| for block in blocks: |
| task = self._parse_bank_question_block( |
| block, |
| project_guid=project_guid, |
| source_name=source["project_name"], |
| questions_url=questions_url, |
| ) |
| if task: |
| tasks.append(task) |
|
|
| if total_tasks is not None and (page_index + 1) * self.page_size >= total_tasks: |
| break |
|
|
| logger.info("Collected %s candidates from the dynamic bank", len(tasks)) |
| return tasks |
|
|
| async def _fetch_bank_page( |
| self, |
| *, |
| questions_url: str, |
| project_guid: str, |
| page_index: int, |
| force_post: bool = False, |
| ) -> Optional[str]: |
| page_url = ( |
| f"{questions_url}?proj={project_guid}" |
| f"&page={page_index}&pagesize={self.page_size}" |
| ) |
|
|
| if not force_post: |
| html = await self.fetch_page(page_url) |
| if html: |
| return html |
|
|
| return await self._post_bank_page( |
| questions_url=questions_url, |
| project_guid=project_guid, |
| page_index=page_index, |
| ) |
|
|
| async def _post_bank_page( |
| self, |
| *, |
| questions_url: str, |
| project_guid: str, |
| page_index: int, |
| ) -> Optional[str]: |
| response = await self._request( |
| "POST", |
| questions_url, |
| data={ |
| "search": "1", |
| "pagesize": str(self.page_size), |
| "proj": project_guid, |
| "page": str(page_index), |
| }, |
| ) |
| return response.text if response else None |
|
|
| def _extract_total_count(self, html: str) -> Optional[int]: |
| match = re.search(r"setQCount\((\d+)", html) |
| return int(match.group(1)) if match else None |
|
|
| def _parse_bank_question_block( |
| self, |
| block: Tag, |
| *, |
| project_guid: str, |
| source_name: str, |
| questions_url: str, |
| ) -> Optional[Dict]: |
| prompt_cell = block.select_one("td.cell_0") |
| if not prompt_cell: |
| return None |
|
|
| content = self._clean_text(prompt_cell.get_text("\n", strip=True)) |
| if not content: |
| return None |
|
|
| title = self._build_title_from_content(content, fallback=source_name) |
| question_guid = self._extract_block_guid(block) |
| variants = self._extract_variants_from_block(block) |
| images = self._extract_images(prompt_cell, base_url=questions_url) |
|
|
| return { |
| "title": title, |
| "content": content, |
| "source_url": f"{questions_url}?proj={project_guid}&qid={question_guid}", |
| "task_type": self._detect_task_type(title, content), |
| "images": images, |
| "variants": variants, |
| "scraped_at": datetime.utcnow().isoformat(), |
| "source_kind": "dynamic_bank", |
| "task_guid": question_guid, |
| } |
|
|
| def _extract_block_guid(self, block: Tag) -> str: |
| guid_input = block.select_one("form input[name='guid']") |
| if guid_input and guid_input.get("value"): |
| return guid_input["value"] |
| return block.get("id", "").lstrip("q") |
|
|
| def _extract_variants_from_block(self, block: Tag) -> List[str]: |
| variants: List[str] = [] |
|
|
| for label in block.find_all("label"): |
| text = self._clean_text(label.get_text(" ", strip=True)) |
| if text: |
| variants.append(text) |
|
|
| if not variants: |
| for option in block.find_all("option"): |
| text = self._clean_text(option.get_text(" ", strip=True)) |
| if text and text.lower() != "выбор": |
| variants.append(text) |
|
|
| return variants[:10] |
|
|
| async def scrape_official_archives(self, subject: str = "russian") -> List[Dict]: |
| config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"]) |
| archive_links = await self._discover_official_archive_links(config) |
| variant_links = await self._discover_official_variant_links(config) |
| document_links = self._sort_document_links(archive_links + variant_links) |
| tasks: List[Dict] = [] |
|
|
| if not document_links: |
| logger.warning("No official archive links found for %s", subject) |
| return tasks |
|
|
| if PdfReader is None: |
| logger.warning("pypdf is not installed, skipping official PDF extraction") |
| return tasks |
|
|
| for document_url in document_links[: self.max_demo_archives]: |
| document_bytes = await self.fetch_bytes(document_url) |
| if not document_bytes: |
| continue |
| tasks.extend(self._extract_tasks_from_document_bytes(document_bytes, document_url)) |
|
|
| logger.info("Collected %s candidates from official archives", len(tasks)) |
| return tasks |
|
|
| async def _discover_official_archive_links(self, config: Dict) -> List[str]: |
| html = await self.fetch_page(config["official_demo_page"]) |
| if not html: |
| return [] |
|
|
| soup = BeautifulSoup(html, "lxml") |
| prefixes = config["archive_prefixes"] |
| archive_links: List[str] = [] |
|
|
| for link in soup.find_all("a", href=True): |
| href = link["href"] |
| absolute = href if href.startswith("http") else urljoin(config["official_demo_page"], href) |
| href_lower = absolute.lower() |
| if not href_lower.endswith(".zip"): |
| continue |
| if any(prefix in href_lower for prefix in prefixes): |
| archive_links.append(absolute) |
|
|
| def sort_key(url: str) -> int: |
| match = re.search(r"/(20\d{2})/", url) |
| return int(match.group(1)) if match else 0 |
|
|
| archive_links.sort(key=sort_key, reverse=True) |
| return archive_links |
|
|
| async def _discover_official_variant_links(self, config: Dict) -> List[str]: |
| variant_page = config.get("official_variant_page") |
| if not variant_page: |
| return [] |
|
|
| html = await self.fetch_page(variant_page) |
| if not html: |
| return [] |
|
|
| soup = BeautifulSoup(html, "lxml") |
| prefixes = config.get("variant_prefixes", ()) |
| links: List[str] = [] |
|
|
| for link in soup.find_all("a", href=True): |
| href = link["href"] |
| absolute = href if href.startswith("http") else urljoin(variant_page, href) |
| href_lower = absolute.lower() |
| if not href_lower.endswith((".zip", ".pdf")): |
| continue |
| if "braille" in href_lower: |
| continue |
| filename = absolute.rsplit("/", 1)[-1].lower() |
| if prefixes and not any(filename.startswith(prefix) for prefix in prefixes): |
| continue |
| links.append(absolute) |
|
|
| return self._sort_document_links(links) |
|
|
| def _sort_document_links(self, links: Iterable[str]) -> List[str]: |
| def sort_key(url: str) -> tuple[int, str]: |
| match = re.search(r"(20\d{2})", url) |
| return (int(match.group(1)) if match else 0, url) |
|
|
| return sorted(set(links), key=sort_key, reverse=True) |
|
|
| def _extract_tasks_from_document_bytes(self, document_bytes: bytes, document_url: str) -> List[Dict]: |
| if document_url.lower().endswith(".zip"): |
| return self._extract_tasks_from_archive(document_bytes, document_url) |
| if document_url.lower().endswith(".pdf"): |
| return self._extract_tasks_from_pdf_document( |
| document_bytes, |
| document_url=document_url, |
| document_name=document_url.rsplit("/", 1)[-1], |
| ) |
| return [] |
|
|
| def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]: |
| tasks: List[Dict] = [] |
|
|
| try: |
| with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive: |
| for member_name in archive.namelist(): |
| if not member_name.lower().endswith(".pdf"): |
| continue |
| if "демо" not in member_name.lower() and "demo" not in member_name.lower(): |
| continue |
|
|
| text = self._extract_text_from_pdf_bytes(archive.read(member_name)) |
| if not text: |
| continue |
|
|
| year_match = re.search(r"(20\d{2})", archive_url) |
| year = year_match.group(1) if year_match else "unknown" |
| tasks.extend( |
| self._extract_tasks_from_demo_text( |
| text, |
| archive_url=archive_url, |
| document_name=member_name, |
| year=year, |
| ) |
| ) |
| except zipfile.BadZipFile: |
| logger.error("Invalid archive %s", archive_url) |
|
|
| return tasks |
|
|
| def _extract_text_from_pdf_bytes(self, pdf_bytes: bytes) -> str: |
| if PdfReader is None: |
| return "" |
|
|
| try: |
| reader = PdfReader(io.BytesIO(pdf_bytes)) |
| except Exception as e: |
| logger.error("Failed to open PDF: %s", e) |
| return "" |
|
|
| pages: List[str] = [] |
| for page in reader.pages: |
| try: |
| page_text = page.extract_text() or "" |
| except Exception: |
| page_text = "" |
| if page_text: |
| pages.append(page_text) |
|
|
| return self._clean_text("\n".join(pages)) |
|
|
| def _extract_tasks_from_demo_text( |
| self, |
| text: str, |
| *, |
| archive_url: str, |
| document_name: str, |
| year: str, |
| ) -> List[Dict]: |
| tasks: List[Dict] = [] |
| if not text: |
| return tasks |
|
|
| bounded_text = text |
| if not bounded_text: |
| return tasks |
|
|
| pattern = re.compile( |
| r"(?ms)(?:^|\n)(\d{1,2})[\.\)]\s*(.+?)(?=(?:\n\d{1,2}[\.\)])|(?:\nЧасть\s+\d)|\Z)" |
| ) |
|
|
| for match in pattern.finditer(bounded_text): |
| task_number = int(match.group(1)) |
| content = self._clean_text(match.group(2)) |
| if len(content) < 80: |
| continue |
|
|
| title = f"Демоверсия ЕГЭ {year}. Задание {task_number}" |
| tasks.append( |
| { |
| "title": title, |
| "content": content, |
| "source_url": f"{archive_url}#task-{task_number}", |
| "task_type": self._detect_task_type(title, content), |
| "images": [], |
| "variants": self._extract_variants(content), |
| "scraped_at": datetime.utcnow().isoformat(), |
| "source_kind": "official_demo_pdf", |
| "document_name": document_name, |
| "task_number": task_number, |
| } |
| ) |
|
|
| if len(tasks) >= self.max_demo_tasks: |
| break |
|
|
| return tasks |
|
|
| def _slice_demo_section(self, text: str) -> str: |
| start = re.search(r"(Часть\s*1|Ответами к заданиям)", text, re.IGNORECASE) |
| if not start: |
| return text |
|
|
| end = re.search(r"(Система оценивания|Ключи|Ответы)", text[start.start() :], re.IGNORECASE) |
| if not end: |
| return text[start.start() :] |
|
|
| return text[start.start() : start.start() + end.start()] |
|
|
| def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]: |
| tasks: List[Dict] = [] |
|
|
| try: |
| with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive: |
| for member_name in archive.namelist(): |
| if not member_name.lower().endswith(".pdf"): |
| continue |
| if not self._should_parse_pdf_member(member_name, archive_url): |
| continue |
| tasks.extend( |
| self._extract_tasks_from_pdf_document( |
| archive.read(member_name), |
| document_url=archive_url, |
| document_name=member_name, |
| ) |
| ) |
| except zipfile.BadZipFile: |
| logger.error("Invalid archive %s", archive_url) |
|
|
| return tasks |
|
|
| def _should_parse_pdf_member(self, member_name: str, document_url: str) -> bool: |
| member_lower = member_name.lower() |
| if any(token in member_lower for token in ("спец", "кодиф", "критер", "ответ", "аудио")): |
| return False |
| if "otkrytyye-varianty-kim-ege" in document_url.lower(): |
| return True |
| return "демо" in member_lower or "demo" in member_lower |
|
|
| def _extract_tasks_from_pdf_document( |
| self, |
| pdf_bytes: bytes, |
| *, |
| document_url: str, |
| document_name: str, |
| ) -> List[Dict]: |
| text = self._extract_text_from_pdf_bytes(pdf_bytes) |
| if not text: |
| return [] |
|
|
| year_match = re.search(r"(20\d{2})", document_url) |
| year = year_match.group(1) if year_match else "unknown" |
| return self._extract_tasks_from_demo_text( |
| text, |
| archive_url=document_url, |
| document_name=document_name, |
| year=year, |
| source_kind=self._detect_document_source_kind(document_url), |
| ) |
|
|
| def _detect_document_source_kind(self, document_url: str) -> str: |
| if "otkrytyye-varianty-kim-ege" in document_url.lower(): |
| return "official_open_variant_pdf" |
| return "official_demo_pdf" |
|
|
| def _extract_tasks_from_demo_text( |
| self, |
| text: str, |
| *, |
| archive_url: str, |
| document_name: str, |
| year: str, |
| source_kind: str = "official_demo_pdf", |
| ) -> List[Dict]: |
| tasks: List[Dict] = [] |
| if not text: |
| return tasks |
|
|
| bounded_text = text |
| if not bounded_text: |
| return tasks |
|
|
| for raw_block in self._split_pdf_into_task_blocks(bounded_text): |
| content = self._cleanup_pdf_task_block(raw_block) |
| content = self._trim_to_task_start(content) |
| if not self._looks_like_official_task_block(content): |
| continue |
|
|
| task_number = len(tasks) + 1 |
| document_label = "Открытый вариант ЕГЭ" if source_kind == "official_open_variant_pdf" else "Демоверсия ЕГЭ" |
| title = f"{document_label} {year}. Задание {task_number}" |
| tasks.append( |
| { |
| "title": title, |
| "content": content, |
| "source_url": f"{archive_url}#task-{task_number}", |
| "task_type": self._detect_task_type(title, content), |
| "images": [], |
| "variants": self._extract_variants(content), |
| "scraped_at": datetime.utcnow().isoformat(), |
| "source_kind": source_kind, |
| "document_name": document_name, |
| "task_number": task_number, |
| } |
| ) |
|
|
| if len(tasks) >= self.max_demo_tasks: |
| break |
|
|
| return tasks |
|
|
| def _split_pdf_into_task_blocks(self, text: str) -> List[str]: |
| answer_pattern = re.compile(r"(?:^|\n)\s*Ответ\s*:\s*[_\.\s]*", re.IGNORECASE) |
| blocks: List[str] = [] |
| last_pos = 0 |
|
|
| for match in answer_pattern.finditer(text): |
| block = text[last_pos:match.start()] |
| if block.strip(): |
| blocks.append(block) |
| last_pos = match.end() |
|
|
| return blocks |
|
|
| def _cleanup_pdf_task_block(self, block: str) -> str: |
| lines: List[str] = [] |
| for raw_line in block.splitlines(): |
| line = self._clean_text(raw_line) |
| if not line: |
| continue |
| lower = line.lower() |
| if line == "&%end_page&%": |
| continue |
| if re.fullmatch(r"\d{1,2}", line): |
| continue |
| if re.search(r"\d+\s*/\s*\d+$", line): |
| continue |
| if lower.startswith(("демонстрационный вариант егэ", "открытый вариант ким егэ", "единый государственный экзамен")): |
| continue |
| if lower.startswith("© "): |
| continue |
| lines.append(line) |
|
|
| return self._clean_text("\n".join(lines)) |
|
|
| def _trim_to_task_start(self, text: str) -> str: |
| if not text: |
| return text |
|
|
| starts = [text.find(pattern) for pattern in self.PDF_TASK_START_PATTERNS if text.find(pattern) >= 0] |
| if starts: |
| return text[min(starts):].strip() |
| return text.strip() |
|
|
| def _looks_like_official_task_block(self, text: str) -> bool: |
| if len(text) < 70 or len(text) > 6000: |
| return False |
|
|
| lower = text.lower() |
| if any(pattern.lower() in lower for pattern in self.PDF_NOISE_PATTERNS): |
| return False |
|
|
| return any(pattern.lower() in lower for pattern in self.PDF_TASK_START_PATTERNS) |
|
|
| def _slice_demo_section(self, text: str) -> str: |
| start_matches = list(re.finditer(r"(?m)^\s*Часть\s*1\s*$", text, re.IGNORECASE)) |
| if start_matches: |
| start_pos = start_matches[-1].start() |
| else: |
| fallback = list(re.finditer(r"Ответами к заданиям", text, re.IGNORECASE)) |
| if not fallback: |
| return text |
| start_pos = fallback[-1].start() |
|
|
| end = re.search( |
| r"(Часть\s*2|Задание\s*27|Система оценивания|Критерии оценивания|Ключи)", |
| text[start_pos:], |
| re.IGNORECASE, |
| ) |
| if not end: |
| return text[start_pos:] |
|
|
| return text[start_pos : start_pos + end.start()] |
|
|
| def parse_task_page(self, html: str, url: str) -> Optional[Dict]: |
| if not html: |
| return None |
|
|
| soup = BeautifulSoup(html, "lxml") |
| for selector in ( |
| "div.qblock", |
| "article", |
| "main article", |
| ".field--name-body", |
| ".content", |
| "main", |
| "body", |
| ): |
| container = soup.select_one(selector) |
| if not container: |
| continue |
|
|
| candidate = self._build_candidate_from_container(container, url) |
| if candidate: |
| return candidate |
|
|
| return None |
|
|
| def _build_candidate_from_container(self, container: Tag, url: str) -> Optional[Dict]: |
| cloned = BeautifulSoup(str(container), "lxml") |
| root = cloned.find() |
| if root is None: |
| return None |
|
|
| for element in root.find_all(["script", "style", "nav", "header", "footer", "form", "button", "aside"]): |
| element.decompose() |
|
|
| title_tag = root.find(["h1", "h2", "h3", "strong", "b"]) |
| title = self._clean_text(title_tag.get_text(" ", strip=True)) if title_tag else "" |
| content = self._clean_text(root.get_text("\n", strip=True)) |
| if not title: |
| title = self._build_title_from_content(content, fallback=url) |
|
|
| images = self._extract_images(root, base_url=url) |
| candidate = { |
| "title": title, |
| "content": content, |
| "source_url": url, |
| "task_type": self._detect_task_type(title, content), |
| "images": images, |
| "variants": self._extract_variants(content), |
| "scraped_at": datetime.utcnow().isoformat(), |
| "source_kind": "generic_html", |
| } |
| return candidate if self._passes_quality_gate(candidate) else None |
|
|
| async def scrape_task_by_id(self, task_id: str) -> Optional[Dict]: |
| config = self.SUBJECT_CONFIG["russian"]["dynamic_sources"][0] |
| html = await self.fetch_page( |
| f"{config['base_url']}/questions.php?proj={config['project_guid']}&qid={task_id}" |
| ) |
| if not html: |
| return None |
|
|
| soup = BeautifulSoup(html, "lxml") |
| block = soup.select_one("div.qblock") |
| if not block: |
| return None |
|
|
| return self._parse_bank_question_block( |
| block, |
| project_guid=config["project_guid"], |
| source_name=config["project_name"], |
| questions_url=f"{config['base_url']}/questions.php", |
| ) |
|
|
| async def search_tasks(self, query: str) -> List[Dict]: |
| query_lower = query.lower().strip() |
| tasks = await self.scrape_tasks(subject="russian") |
| return [ |
| task |
| for task in tasks |
| if query_lower in task.get("title", "").lower() |
| or query_lower in task.get("content", "").lower() |
| ] |
|
|
| def _filter_candidates(self, candidates: Iterable[Dict]) -> List[Dict]: |
| accepted: List[Dict] = [] |
| for candidate in candidates: |
| if self._passes_quality_gate(candidate): |
| accepted.append(candidate) |
| return accepted |
|
|
| def _dedupe_candidates(self, candidates: Iterable[Dict]) -> List[Dict]: |
| deduped: List[Dict] = [] |
| seen_keys = set() |
|
|
| for candidate in candidates: |
| normalized = self._clean_text(candidate.get("content", ""))[:400] |
| key = (candidate.get("source_url", ""), normalized) |
| if key in seen_keys: |
| continue |
| seen_keys.add(key) |
| deduped.append(candidate) |
|
|
| return deduped |
|
|
| def _passes_quality_gate(self, candidate: Dict) -> bool: |
| score = self._score_candidate(candidate) |
| candidate["quality_score"] = score |
| return score >= self.min_quality_score |
|
|
| def _score_candidate(self, candidate: Dict) -> int: |
| title = candidate.get("title", "").lower() |
| content = candidate.get("content", "").lower() |
| source_kind = candidate.get("source_kind", "") |
| length = len(content) |
|
|
| score = 0 |
|
|
| if source_kind == "dynamic_bank": |
| score += 60 |
| elif source_kind in {"official_demo_pdf", "official_open_variant_pdf"}: |
| score += 50 |
| else: |
| score += 10 |
|
|
| if 80 <= length <= 3500: |
| score += 15 |
| elif length > 5000: |
| score -= 20 |
| else: |
| score -= 10 |
|
|
| if any(keyword in content for keywords in self.TASK_TYPE_KEYWORDS.values() for keyword in keywords): |
| score += 10 |
|
|
| if any(pattern.lower() in content for pattern in self.PDF_TASK_START_PATTERNS): |
| score += 10 |
|
|
| if re.search(r"\b\d+\b", content): |
| score += 5 |
|
|
| if any(pattern in title for pattern in self.GENERIC_TITLE_PATTERNS): |
| score -= 45 |
|
|
| noise_hits = sum(1 for pattern in self.NOISE_PATTERNS if pattern in content[:1200]) |
| score -= min(noise_hits * 8, 32) |
|
|
| if content.count("\n") > 80: |
| score -= 10 |
|
|
| return score |
|
|
| def _detect_task_type(self, title: str, content: str) -> str: |
| text = f"{title} {content}".lower() |
|
|
| for task_type, keywords in self.TASK_TYPE_KEYWORDS.items(): |
| if any(keyword in text for keyword in keywords): |
| return task_type |
|
|
| return "other" |
|
|
| def _extract_variants(self, content: str) -> List[str]: |
| matches = re.findall(r"(?:^|\n)(?:[1-6]|[A-DА-Г])[.)]\s*([^\n]{2,200})", content) |
| return [self._clean_text(match) for match in matches[:10]] |
|
|
| def _extract_images(self, container: Tag, *, base_url: str) -> List[str]: |
| images: List[str] = [] |
| for img in container.find_all("img"): |
| src = img.get("src") or img.get("data-src") |
| if not src: |
| continue |
| images.append(src if src.startswith("http") else urljoin(base_url, src)) |
| return images[:10] |
|
|
| def _build_title_from_content(self, content: str, fallback: str) -> str: |
| first_line = next((line.strip() for line in content.splitlines() if line.strip()), "") |
| title = self._clean_text(first_line) |
| if not title: |
| title = fallback |
| return title[:160] |
|
|
| def _clean_text(self, text: str) -> str: |
| text = text.replace("\xa0", " ") |
| text = re.sub( |
| r"\b(?:[A-Za-zА-Яа-яЁё]\s+){2,}[A-Za-zА-Яа-яЁё]\b", |
| lambda match: match.group(0).replace(" ", ""), |
| text, |
| ) |
| text = re.sub(r"[ \t]+", " ", text) |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| return text.strip() |
|
|