scraper / scraper.py
greeta's picture
Upload scraper.py
90649c5 verified
"""
FIPI scraper focused on extracting real tasks instead of generic page text.
"""
from __future__ import annotations
import asyncio
from datetime import datetime
import io
import logging
import math
import os
import re
import ssl
from typing import Dict, Iterable, List, Optional
from urllib.parse import urljoin
import zipfile
from bs4 import BeautifulSoup, Tag
import httpx
import requests
try:
from pypdf import PdfReader
except ImportError: # pragma: no cover - optional dependency for HF deploy
PdfReader = None
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FIPIScraper:
"""Collects task candidates from the FIPI bank and official demo archives."""
SUBJECT_CONFIG = {
"russian": {
"label": "Русский язык",
"dynamic_sources": [
{
"kind": "ege_bank",
"base_url": "https://ege.fipi.ru/bank",
"project_guid": "AF0ED3F2557F8FFC4C06F80B6803FD26",
"project_name": "ЕГЭ. Русский язык",
},
{
"kind": "oge_bank",
"base_url": "https://oge.fipi.ru/bank",
"project_guid": "2F5EE3B12FE2A0EA40B06BF61A015416",
"project_name": "ОГЭ. Русский язык",
},
],
"official_demo_page": "https://fipi.ru/ege/demoversii-specifikacii-kodifikatory",
"official_variant_page": "https://fipi.ru/ege/otkrytyy-bank-zadaniy-ege/otkrytyye-varianty-kim-ege",
"archive_prefixes": ("ru_11_",),
"variant_prefixes": ("rus_",),
"title_keywords": ("русский язык",),
}
}
TASK_TYPE_KEYWORDS = {
"writing": ("сочинение", "эссе", "напишите", "сформулируйте", "прокомментируйте"),
"test": ("выберите", "укажите", "ответ", "вариант", "расставьте", "определите"),
"listening": ("аудио", "прослуш", "запись"),
"reading": ("прочитайте", "текст", "абзац", "предложение"),
}
GENERIC_TITLE_PATTERNS = (
"открытый банк",
"демоверсии",
"спецификации",
"кодификаторы",
"федеральный институт",
"фипи",
"нормативно",
"документы",
"варианты ким",
)
PDF_TASK_START_PATTERNS = (
"Прочитайте текст",
"Самостоятельно подберите",
"В тексте выделено",
"Укажите",
"В одном из",
"Отредактируйте предложение",
"Установите соответствие",
"Расставьте",
"Определите",
"Найдите",
"Подберите",
)
PDF_NOISE_PATTERNS = (
"Инструкция по выполнению работы",
"Пояснения к демонстрационному варианту",
"Желаем успеха",
"Все бланки ЕГЭ заполняются",
"Баллы, полученные",
"После завершения работы",
"В демонстрационном варианте представлены",
"Часть 1 содержит 26 заданий",
"На выполнение экзаменационной работы",
"Ответами к заданиям 1–26 являются",
"Бланк",
)
NOISE_PATTERNS = (
"федеральный институт педагогических измерений",
"открытый банк тестовых заданий",
"открытый банк заданий егэ",
"открытый банк заданий огэ",
"подбор заданий",
"демоверсии, спецификации, кодификаторы",
"для предметных комиссий",
"аналитические и методические материалы",
"видеоконсультации разработчиков ким",
"скачать",
"изменения в ким",
)
def __init__(self, base_url: str = "https://fipi.ru"):
self.base_url = base_url.rstrip("/")
self.headers = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7",
}
self.page_size = max(1, int(os.getenv("SCRAPER_BANK_PAGE_SIZE", "10")))
self.max_bank_pages = max(1, int(os.getenv("SCRAPER_MAX_BANK_PAGES", "5")))
self.max_demo_archives = max(1, int(os.getenv("SCRAPER_MAX_DEMO_ARCHIVES", "2")))
self.max_demo_tasks = max(1, int(os.getenv("SCRAPER_MAX_DEMO_TASKS", "20")))
self.min_quality_score = max(1, int(os.getenv("SCRAPER_MIN_QUALITY_SCORE", "45")))
async def fetch_page(self, url: str) -> Optional[str]:
response = await self._request("GET", url)
return response.text if response else None
async def fetch_bytes(self, url: str) -> Optional[bytes]:
response = await self._request("GET", url)
return response.content if response else None
async def _request(
self,
method: str,
url: str,
*,
data: Optional[Dict[str, str]] = None,
) -> Optional[httpx.Response]:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with httpx.AsyncClient(
headers=self.headers,
timeout=45.0,
verify=ssl_context,
follow_redirects=True,
trust_env=False,
) as client:
try:
response = await client.request(method, url, data=data)
response.raise_for_status()
return response
except httpx.HTTPError as e:
logger.error("Async request failed for %s: %r", url, e)
return await self._request_with_requests_fallback(
method=method,
url=url,
data=data,
)
async def _request_with_requests_fallback(
self,
*,
method: str,
url: str,
data: Optional[Dict[str, str]] = None,
) -> Optional[httpx.Response]:
def do_request() -> Optional[httpx.Response]:
session = requests.Session()
session.trust_env = False
try:
response = session.request(
method=method,
url=url,
data=data,
headers=self.headers,
timeout=45,
verify=False,
allow_redirects=True,
)
response.raise_for_status()
request = httpx.Request(method, url, headers=self.headers)
return httpx.Response(
status_code=response.status_code,
headers=response.headers,
content=response.content,
request=request,
)
except requests.RequestException as exc:
logger.error("Requests fallback failed for %s: %r", url, exc)
return None
finally:
session.close()
return await asyncio.to_thread(do_request)
async def scrape_tasks(
self,
subject: str = "russian",
*,
include_official_archives: bool = True,
) -> List[Dict]:
config = self.SUBJECT_CONFIG.get(subject)
if not config:
logger.warning("Unknown subject %s, falling back to russian", subject)
config = self.SUBJECT_CONFIG["russian"]
candidates: List[Dict] = []
candidates.extend(await self.scrape_dynamic_bank(subject))
if include_official_archives:
candidates.extend(await self.scrape_official_archives(subject))
validated = self._dedupe_candidates(self._filter_candidates(candidates))
logger.info("Accepted %s task candidates after filtering", len(validated))
return validated
async def scrape_dynamic_bank(self, subject: str = "russian") -> List[Dict]:
config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"])
tasks: List[Dict] = []
for source in config["dynamic_sources"]:
project_guid = source["project_guid"]
questions_url = f"{source['base_url']}/questions.php"
total_tasks = None
for page_index in range(self.max_bank_pages):
html = await self._fetch_bank_page(
questions_url=questions_url,
project_guid=project_guid,
page_index=page_index,
)
if not html:
break
if total_tasks is None:
total_tasks = self._extract_total_count(html)
if total_tasks:
max_pages = math.ceil(total_tasks / self.page_size)
logger.info(
"Bank %s reports %s tasks, scraping up to %s pages",
source["project_name"],
total_tasks,
min(max_pages, self.max_bank_pages),
)
soup = BeautifulSoup(html, "lxml")
blocks = soup.select("div.qblock")
if not blocks:
logger.warning(
"No qblock nodes found for %s page=%s via primary fetch, retrying POST search",
source["project_name"],
page_index,
)
html = await self._fetch_bank_page(
questions_url=questions_url,
project_guid=project_guid,
page_index=page_index,
force_post=True,
)
if not html:
break
soup = BeautifulSoup(html, "lxml")
blocks = soup.select("div.qblock")
if not blocks:
logger.warning(
"No qblock nodes found for %s page=%s after retry",
source["project_name"],
page_index,
)
break
for block in blocks:
task = self._parse_bank_question_block(
block,
project_guid=project_guid,
source_name=source["project_name"],
questions_url=questions_url,
)
if task:
tasks.append(task)
if total_tasks is not None and (page_index + 1) * self.page_size >= total_tasks:
break
logger.info("Collected %s candidates from the dynamic bank", len(tasks))
return tasks
async def _fetch_bank_page(
self,
*,
questions_url: str,
project_guid: str,
page_index: int,
force_post: bool = False,
) -> Optional[str]:
page_url = (
f"{questions_url}?proj={project_guid}"
f"&page={page_index}&pagesize={self.page_size}"
)
if not force_post:
html = await self.fetch_page(page_url)
if html:
return html
return await self._post_bank_page(
questions_url=questions_url,
project_guid=project_guid,
page_index=page_index,
)
async def _post_bank_page(
self,
*,
questions_url: str,
project_guid: str,
page_index: int,
) -> Optional[str]:
response = await self._request(
"POST",
questions_url,
data={
"search": "1",
"pagesize": str(self.page_size),
"proj": project_guid,
"page": str(page_index),
},
)
return response.text if response else None
def _extract_total_count(self, html: str) -> Optional[int]:
match = re.search(r"setQCount\((\d+)", html)
return int(match.group(1)) if match else None
def _parse_bank_question_block(
self,
block: Tag,
*,
project_guid: str,
source_name: str,
questions_url: str,
) -> Optional[Dict]:
prompt_cell = block.select_one("td.cell_0")
if not prompt_cell:
return None
content = self._clean_text(prompt_cell.get_text("\n", strip=True))
if not content:
return None
title = self._build_title_from_content(content, fallback=source_name)
question_guid = self._extract_block_guid(block)
variants = self._extract_variants_from_block(block)
images = self._extract_images(prompt_cell, base_url=questions_url)
return {
"title": title,
"content": content,
"source_url": f"{questions_url}?proj={project_guid}&qid={question_guid}",
"task_type": self._detect_task_type(title, content),
"images": images,
"variants": variants,
"scraped_at": datetime.utcnow().isoformat(),
"source_kind": "dynamic_bank",
"task_guid": question_guid,
}
def _extract_block_guid(self, block: Tag) -> str:
guid_input = block.select_one("form input[name='guid']")
if guid_input and guid_input.get("value"):
return guid_input["value"]
return block.get("id", "").lstrip("q")
def _extract_variants_from_block(self, block: Tag) -> List[str]:
variants: List[str] = []
for label in block.find_all("label"):
text = self._clean_text(label.get_text(" ", strip=True))
if text:
variants.append(text)
if not variants:
for option in block.find_all("option"):
text = self._clean_text(option.get_text(" ", strip=True))
if text and text.lower() != "выбор":
variants.append(text)
return variants[:10]
async def scrape_official_archives(self, subject: str = "russian") -> List[Dict]:
config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"])
archive_links = await self._discover_official_archive_links(config)
variant_links = await self._discover_official_variant_links(config)
document_links = self._sort_document_links(archive_links + variant_links)
tasks: List[Dict] = []
if not document_links:
logger.warning("No official archive links found for %s", subject)
return tasks
if PdfReader is None:
logger.warning("pypdf is not installed, skipping official PDF extraction")
return tasks
for document_url in document_links[: self.max_demo_archives]:
document_bytes = await self.fetch_bytes(document_url)
if not document_bytes:
continue
tasks.extend(self._extract_tasks_from_document_bytes(document_bytes, document_url))
logger.info("Collected %s candidates from official archives", len(tasks))
return tasks
async def _discover_official_archive_links(self, config: Dict) -> List[str]:
html = await self.fetch_page(config["official_demo_page"])
if not html:
return []
soup = BeautifulSoup(html, "lxml")
prefixes = config["archive_prefixes"]
archive_links: List[str] = []
for link in soup.find_all("a", href=True):
href = link["href"]
absolute = href if href.startswith("http") else urljoin(config["official_demo_page"], href)
href_lower = absolute.lower()
if not href_lower.endswith(".zip"):
continue
if any(prefix in href_lower for prefix in prefixes):
archive_links.append(absolute)
def sort_key(url: str) -> int:
match = re.search(r"/(20\d{2})/", url)
return int(match.group(1)) if match else 0
archive_links.sort(key=sort_key, reverse=True)
return archive_links
async def _discover_official_variant_links(self, config: Dict) -> List[str]:
variant_page = config.get("official_variant_page")
if not variant_page:
return []
html = await self.fetch_page(variant_page)
if not html:
return []
soup = BeautifulSoup(html, "lxml")
prefixes = config.get("variant_prefixes", ())
links: List[str] = []
for link in soup.find_all("a", href=True):
href = link["href"]
absolute = href if href.startswith("http") else urljoin(variant_page, href)
href_lower = absolute.lower()
if not href_lower.endswith((".zip", ".pdf")):
continue
if "braille" in href_lower:
continue
filename = absolute.rsplit("/", 1)[-1].lower()
if prefixes and not any(filename.startswith(prefix) for prefix in prefixes):
continue
links.append(absolute)
return self._sort_document_links(links)
def _sort_document_links(self, links: Iterable[str]) -> List[str]:
def sort_key(url: str) -> tuple[int, str]:
match = re.search(r"(20\d{2})", url)
return (int(match.group(1)) if match else 0, url)
return sorted(set(links), key=sort_key, reverse=True)
def _extract_tasks_from_document_bytes(self, document_bytes: bytes, document_url: str) -> List[Dict]:
if document_url.lower().endswith(".zip"):
return self._extract_tasks_from_archive(document_bytes, document_url)
if document_url.lower().endswith(".pdf"):
return self._extract_tasks_from_pdf_document(
document_bytes,
document_url=document_url,
document_name=document_url.rsplit("/", 1)[-1],
)
return []
def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]:
tasks: List[Dict] = []
try:
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive:
for member_name in archive.namelist():
if not member_name.lower().endswith(".pdf"):
continue
if "демо" not in member_name.lower() and "demo" not in member_name.lower():
continue
text = self._extract_text_from_pdf_bytes(archive.read(member_name))
if not text:
continue
year_match = re.search(r"(20\d{2})", archive_url)
year = year_match.group(1) if year_match else "unknown"
tasks.extend(
self._extract_tasks_from_demo_text(
text,
archive_url=archive_url,
document_name=member_name,
year=year,
)
)
except zipfile.BadZipFile:
logger.error("Invalid archive %s", archive_url)
return tasks
def _extract_text_from_pdf_bytes(self, pdf_bytes: bytes) -> str:
if PdfReader is None:
return ""
try:
reader = PdfReader(io.BytesIO(pdf_bytes))
except Exception as e: # pragma: no cover - parser-dependent
logger.error("Failed to open PDF: %s", e)
return ""
pages: List[str] = []
for page in reader.pages:
try:
page_text = page.extract_text() or ""
except Exception: # pragma: no cover - parser-dependent
page_text = ""
if page_text:
pages.append(page_text)
return self._clean_text("\n".join(pages))
def _extract_tasks_from_demo_text(
self,
text: str,
*,
archive_url: str,
document_name: str,
year: str,
) -> List[Dict]:
tasks: List[Dict] = []
if not text:
return tasks
bounded_text = text
if not bounded_text:
return tasks
pattern = re.compile(
r"(?ms)(?:^|\n)(\d{1,2})[\.\)]\s*(.+?)(?=(?:\n\d{1,2}[\.\)])|(?:\nЧасть\s+\d)|\Z)"
)
for match in pattern.finditer(bounded_text):
task_number = int(match.group(1))
content = self._clean_text(match.group(2))
if len(content) < 80:
continue
title = f"Демоверсия ЕГЭ {year}. Задание {task_number}"
tasks.append(
{
"title": title,
"content": content,
"source_url": f"{archive_url}#task-{task_number}",
"task_type": self._detect_task_type(title, content),
"images": [],
"variants": self._extract_variants(content),
"scraped_at": datetime.utcnow().isoformat(),
"source_kind": "official_demo_pdf",
"document_name": document_name,
"task_number": task_number,
}
)
if len(tasks) >= self.max_demo_tasks:
break
return tasks
def _slice_demo_section(self, text: str) -> str:
start = re.search(r"(Часть\s*1|Ответами к заданиям)", text, re.IGNORECASE)
if not start:
return text
end = re.search(r"(Система оценивания|Ключи|Ответы)", text[start.start() :], re.IGNORECASE)
if not end:
return text[start.start() :]
return text[start.start() : start.start() + end.start()]
def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]:
tasks: List[Dict] = []
try:
with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive:
for member_name in archive.namelist():
if not member_name.lower().endswith(".pdf"):
continue
if not self._should_parse_pdf_member(member_name, archive_url):
continue
tasks.extend(
self._extract_tasks_from_pdf_document(
archive.read(member_name),
document_url=archive_url,
document_name=member_name,
)
)
except zipfile.BadZipFile:
logger.error("Invalid archive %s", archive_url)
return tasks
def _should_parse_pdf_member(self, member_name: str, document_url: str) -> bool:
member_lower = member_name.lower()
if any(token in member_lower for token in ("спец", "кодиф", "критер", "ответ", "аудио")):
return False
if "otkrytyye-varianty-kim-ege" in document_url.lower():
return True
return "демо" in member_lower or "demo" in member_lower
def _extract_tasks_from_pdf_document(
self,
pdf_bytes: bytes,
*,
document_url: str,
document_name: str,
) -> List[Dict]:
text = self._extract_text_from_pdf_bytes(pdf_bytes)
if not text:
return []
year_match = re.search(r"(20\d{2})", document_url)
year = year_match.group(1) if year_match else "unknown"
return self._extract_tasks_from_demo_text(
text,
archive_url=document_url,
document_name=document_name,
year=year,
source_kind=self._detect_document_source_kind(document_url),
)
def _detect_document_source_kind(self, document_url: str) -> str:
if "otkrytyye-varianty-kim-ege" in document_url.lower():
return "official_open_variant_pdf"
return "official_demo_pdf"
def _extract_tasks_from_demo_text(
self,
text: str,
*,
archive_url: str,
document_name: str,
year: str,
source_kind: str = "official_demo_pdf",
) -> List[Dict]:
tasks: List[Dict] = []
if not text:
return tasks
bounded_text = text
if not bounded_text:
return tasks
for raw_block in self._split_pdf_into_task_blocks(bounded_text):
content = self._cleanup_pdf_task_block(raw_block)
content = self._trim_to_task_start(content)
if not self._looks_like_official_task_block(content):
continue
task_number = len(tasks) + 1
document_label = "Открытый вариант ЕГЭ" if source_kind == "official_open_variant_pdf" else "Демоверсия ЕГЭ"
title = f"{document_label} {year}. Задание {task_number}"
tasks.append(
{
"title": title,
"content": content,
"source_url": f"{archive_url}#task-{task_number}",
"task_type": self._detect_task_type(title, content),
"images": [],
"variants": self._extract_variants(content),
"scraped_at": datetime.utcnow().isoformat(),
"source_kind": source_kind,
"document_name": document_name,
"task_number": task_number,
}
)
if len(tasks) >= self.max_demo_tasks:
break
return tasks
def _split_pdf_into_task_blocks(self, text: str) -> List[str]:
answer_pattern = re.compile(r"(?:^|\n)\s*Ответ\s*:\s*[_\.\s]*", re.IGNORECASE)
blocks: List[str] = []
last_pos = 0
for match in answer_pattern.finditer(text):
block = text[last_pos:match.start()]
if block.strip():
blocks.append(block)
last_pos = match.end()
return blocks
def _cleanup_pdf_task_block(self, block: str) -> str:
lines: List[str] = []
for raw_line in block.splitlines():
line = self._clean_text(raw_line)
if not line:
continue
lower = line.lower()
if line == "&%end_page&%":
continue
if re.fullmatch(r"\d{1,2}", line):
continue
if re.search(r"\d+\s*/\s*\d+$", line):
continue
if lower.startswith(("демонстрационный вариант егэ", "открытый вариант ким егэ", "единый государственный экзамен")):
continue
if lower.startswith("© "):
continue
lines.append(line)
return self._clean_text("\n".join(lines))
def _trim_to_task_start(self, text: str) -> str:
if not text:
return text
starts = [text.find(pattern) for pattern in self.PDF_TASK_START_PATTERNS if text.find(pattern) >= 0]
if starts:
return text[min(starts):].strip()
return text.strip()
def _looks_like_official_task_block(self, text: str) -> bool:
if len(text) < 70 or len(text) > 6000:
return False
lower = text.lower()
if any(pattern.lower() in lower for pattern in self.PDF_NOISE_PATTERNS):
return False
return any(pattern.lower() in lower for pattern in self.PDF_TASK_START_PATTERNS)
def _slice_demo_section(self, text: str) -> str:
start_matches = list(re.finditer(r"(?m)^\s*Часть\s*1\s*$", text, re.IGNORECASE))
if start_matches:
start_pos = start_matches[-1].start()
else:
fallback = list(re.finditer(r"Ответами к заданиям", text, re.IGNORECASE))
if not fallback:
return text
start_pos = fallback[-1].start()
end = re.search(
r"(Часть\s*2|Задание\s*27|Система оценивания|Критерии оценивания|Ключи)",
text[start_pos:],
re.IGNORECASE,
)
if not end:
return text[start_pos:]
return text[start_pos : start_pos + end.start()]
def parse_task_page(self, html: str, url: str) -> Optional[Dict]:
if not html:
return None
soup = BeautifulSoup(html, "lxml")
for selector in (
"div.qblock",
"article",
"main article",
".field--name-body",
".content",
"main",
"body",
):
container = soup.select_one(selector)
if not container:
continue
candidate = self._build_candidate_from_container(container, url)
if candidate:
return candidate
return None
def _build_candidate_from_container(self, container: Tag, url: str) -> Optional[Dict]:
cloned = BeautifulSoup(str(container), "lxml")
root = cloned.find()
if root is None:
return None
for element in root.find_all(["script", "style", "nav", "header", "footer", "form", "button", "aside"]):
element.decompose()
title_tag = root.find(["h1", "h2", "h3", "strong", "b"])
title = self._clean_text(title_tag.get_text(" ", strip=True)) if title_tag else ""
content = self._clean_text(root.get_text("\n", strip=True))
if not title:
title = self._build_title_from_content(content, fallback=url)
images = self._extract_images(root, base_url=url)
candidate = {
"title": title,
"content": content,
"source_url": url,
"task_type": self._detect_task_type(title, content),
"images": images,
"variants": self._extract_variants(content),
"scraped_at": datetime.utcnow().isoformat(),
"source_kind": "generic_html",
}
return candidate if self._passes_quality_gate(candidate) else None
async def scrape_task_by_id(self, task_id: str) -> Optional[Dict]:
config = self.SUBJECT_CONFIG["russian"]["dynamic_sources"][0]
html = await self.fetch_page(
f"{config['base_url']}/questions.php?proj={config['project_guid']}&qid={task_id}"
)
if not html:
return None
soup = BeautifulSoup(html, "lxml")
block = soup.select_one("div.qblock")
if not block:
return None
return self._parse_bank_question_block(
block,
project_guid=config["project_guid"],
source_name=config["project_name"],
questions_url=f"{config['base_url']}/questions.php",
)
async def search_tasks(self, query: str) -> List[Dict]:
query_lower = query.lower().strip()
tasks = await self.scrape_tasks(subject="russian")
return [
task
for task in tasks
if query_lower in task.get("title", "").lower()
or query_lower in task.get("content", "").lower()
]
def _filter_candidates(self, candidates: Iterable[Dict]) -> List[Dict]:
accepted: List[Dict] = []
for candidate in candidates:
if self._passes_quality_gate(candidate):
accepted.append(candidate)
return accepted
def _dedupe_candidates(self, candidates: Iterable[Dict]) -> List[Dict]:
deduped: List[Dict] = []
seen_keys = set()
for candidate in candidates:
normalized = self._clean_text(candidate.get("content", ""))[:400]
key = (candidate.get("source_url", ""), normalized)
if key in seen_keys:
continue
seen_keys.add(key)
deduped.append(candidate)
return deduped
def _passes_quality_gate(self, candidate: Dict) -> bool:
score = self._score_candidate(candidate)
candidate["quality_score"] = score
return score >= self.min_quality_score
def _score_candidate(self, candidate: Dict) -> int:
title = candidate.get("title", "").lower()
content = candidate.get("content", "").lower()
source_kind = candidate.get("source_kind", "")
length = len(content)
score = 0
if source_kind == "dynamic_bank":
score += 60
elif source_kind in {"official_demo_pdf", "official_open_variant_pdf"}:
score += 50
else:
score += 10
if 80 <= length <= 3500:
score += 15
elif length > 5000:
score -= 20
else:
score -= 10
if any(keyword in content for keywords in self.TASK_TYPE_KEYWORDS.values() for keyword in keywords):
score += 10
if any(pattern.lower() in content for pattern in self.PDF_TASK_START_PATTERNS):
score += 10
if re.search(r"\b\d+\b", content):
score += 5
if any(pattern in title for pattern in self.GENERIC_TITLE_PATTERNS):
score -= 45
noise_hits = sum(1 for pattern in self.NOISE_PATTERNS if pattern in content[:1200])
score -= min(noise_hits * 8, 32)
if content.count("\n") > 80:
score -= 10
return score
def _detect_task_type(self, title: str, content: str) -> str:
text = f"{title} {content}".lower()
for task_type, keywords in self.TASK_TYPE_KEYWORDS.items():
if any(keyword in text for keyword in keywords):
return task_type
return "other"
def _extract_variants(self, content: str) -> List[str]:
matches = re.findall(r"(?:^|\n)(?:[1-6]|[A-DА-Г])[.)]\s*([^\n]{2,200})", content)
return [self._clean_text(match) for match in matches[:10]]
def _extract_images(self, container: Tag, *, base_url: str) -> List[str]:
images: List[str] = []
for img in container.find_all("img"):
src = img.get("src") or img.get("data-src")
if not src:
continue
images.append(src if src.startswith("http") else urljoin(base_url, src))
return images[:10]
def _build_title_from_content(self, content: str, fallback: str) -> str:
first_line = next((line.strip() for line in content.splitlines() if line.strip()), "")
title = self._clean_text(first_line)
if not title:
title = fallback
return title[:160]
def _clean_text(self, text: str) -> str:
text = text.replace("\xa0", " ")
text = re.sub(
r"\b(?:[A-Za-zА-Яа-яЁё]\s+){2,}[A-Za-zА-Яа-яЁё]\b",
lambda match: match.group(0).replace(" ", ""),
text,
)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()