""" FIPI scraper focused on extracting real tasks instead of generic page text. """ from __future__ import annotations import asyncio from datetime import datetime import io import logging import math import os import re import ssl from typing import Dict, Iterable, List, Optional from urllib.parse import urljoin import zipfile from bs4 import BeautifulSoup, Tag import httpx import requests try: from pypdf import PdfReader except ImportError: # pragma: no cover - optional dependency for HF deploy PdfReader = None logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class FIPIScraper: """Collects task candidates from the FIPI bank and official demo archives.""" SUBJECT_CONFIG = { "russian": { "label": "Русский язык", "dynamic_sources": [ { "kind": "ege_bank", "base_url": "https://ege.fipi.ru/bank", "project_guid": "AF0ED3F2557F8FFC4C06F80B6803FD26", "project_name": "ЕГЭ. Русский язык", }, { "kind": "oge_bank", "base_url": "https://oge.fipi.ru/bank", "project_guid": "2F5EE3B12FE2A0EA40B06BF61A015416", "project_name": "ОГЭ. Русский язык", }, ], "official_demo_page": "https://fipi.ru/ege/demoversii-specifikacii-kodifikatory", "official_variant_page": "https://fipi.ru/ege/otkrytyy-bank-zadaniy-ege/otkrytyye-varianty-kim-ege", "archive_prefixes": ("ru_11_",), "variant_prefixes": ("rus_",), "title_keywords": ("русский язык",), } } TASK_TYPE_KEYWORDS = { "writing": ("сочинение", "эссе", "напишите", "сформулируйте", "прокомментируйте"), "test": ("выберите", "укажите", "ответ", "вариант", "расставьте", "определите"), "listening": ("аудио", "прослуш", "запись"), "reading": ("прочитайте", "текст", "абзац", "предложение"), } GENERIC_TITLE_PATTERNS = ( "открытый банк", "демоверсии", "спецификации", "кодификаторы", "федеральный институт", "фипи", "нормативно", "документы", "варианты ким", ) PDF_TASK_START_PATTERNS = ( "Прочитайте текст", "Самостоятельно подберите", "В тексте выделено", "Укажите", "В одном из", "Отредактируйте предложение", "Установите соответствие", "Расставьте", "Определите", "Найдите", "Подберите", ) PDF_NOISE_PATTERNS = ( "Инструкция по выполнению работы", "Пояснения к демонстрационному варианту", "Желаем успеха", "Все бланки ЕГЭ заполняются", "Баллы, полученные", "После завершения работы", "В демонстрационном варианте представлены", "Часть 1 содержит 26 заданий", "На выполнение экзаменационной работы", "Ответами к заданиям 1–26 являются", "Бланк", ) NOISE_PATTERNS = ( "федеральный институт педагогических измерений", "открытый банк тестовых заданий", "открытый банк заданий егэ", "открытый банк заданий огэ", "подбор заданий", "демоверсии, спецификации, кодификаторы", "для предметных комиссий", "аналитические и методические материалы", "видеоконсультации разработчиков ким", "скачать", "изменения в ким", ) def __init__(self, base_url: str = "https://fipi.ru"): self.base_url = base_url.rstrip("/") self.headers = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7", } self.page_size = max(1, int(os.getenv("SCRAPER_BANK_PAGE_SIZE", "10"))) self.max_bank_pages = max(1, int(os.getenv("SCRAPER_MAX_BANK_PAGES", "5"))) self.max_demo_archives = max(1, int(os.getenv("SCRAPER_MAX_DEMO_ARCHIVES", "2"))) self.max_demo_tasks = max(1, int(os.getenv("SCRAPER_MAX_DEMO_TASKS", "20"))) self.min_quality_score = max(1, int(os.getenv("SCRAPER_MIN_QUALITY_SCORE", "45"))) async def fetch_page(self, url: str) -> Optional[str]: response = await self._request("GET", url) return response.text if response else None async def fetch_bytes(self, url: str) -> Optional[bytes]: response = await self._request("GET", url) return response.content if response else None async def _request( self, method: str, url: str, *, data: Optional[Dict[str, str]] = None, ) -> Optional[httpx.Response]: ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE async with httpx.AsyncClient( headers=self.headers, timeout=45.0, verify=ssl_context, follow_redirects=True, trust_env=False, ) as client: try: response = await client.request(method, url, data=data) response.raise_for_status() return response except httpx.HTTPError as e: logger.error("Async request failed for %s: %r", url, e) return await self._request_with_requests_fallback( method=method, url=url, data=data, ) async def _request_with_requests_fallback( self, *, method: str, url: str, data: Optional[Dict[str, str]] = None, ) -> Optional[httpx.Response]: def do_request() -> Optional[httpx.Response]: session = requests.Session() session.trust_env = False try: response = session.request( method=method, url=url, data=data, headers=self.headers, timeout=45, verify=False, allow_redirects=True, ) response.raise_for_status() request = httpx.Request(method, url, headers=self.headers) return httpx.Response( status_code=response.status_code, headers=response.headers, content=response.content, request=request, ) except requests.RequestException as exc: logger.error("Requests fallback failed for %s: %r", url, exc) return None finally: session.close() return await asyncio.to_thread(do_request) async def scrape_tasks( self, subject: str = "russian", *, include_official_archives: bool = True, ) -> List[Dict]: config = self.SUBJECT_CONFIG.get(subject) if not config: logger.warning("Unknown subject %s, falling back to russian", subject) config = self.SUBJECT_CONFIG["russian"] candidates: List[Dict] = [] candidates.extend(await self.scrape_dynamic_bank(subject)) if include_official_archives: candidates.extend(await self.scrape_official_archives(subject)) validated = self._dedupe_candidates(self._filter_candidates(candidates)) logger.info("Accepted %s task candidates after filtering", len(validated)) return validated async def scrape_dynamic_bank(self, subject: str = "russian") -> List[Dict]: config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"]) tasks: List[Dict] = [] for source in config["dynamic_sources"]: project_guid = source["project_guid"] questions_url = f"{source['base_url']}/questions.php" total_tasks = None for page_index in range(self.max_bank_pages): html = await self._fetch_bank_page( questions_url=questions_url, project_guid=project_guid, page_index=page_index, ) if not html: break if total_tasks is None: total_tasks = self._extract_total_count(html) if total_tasks: max_pages = math.ceil(total_tasks / self.page_size) logger.info( "Bank %s reports %s tasks, scraping up to %s pages", source["project_name"], total_tasks, min(max_pages, self.max_bank_pages), ) soup = BeautifulSoup(html, "lxml") blocks = soup.select("div.qblock") if not blocks: logger.warning( "No qblock nodes found for %s page=%s via primary fetch, retrying POST search", source["project_name"], page_index, ) html = await self._fetch_bank_page( questions_url=questions_url, project_guid=project_guid, page_index=page_index, force_post=True, ) if not html: break soup = BeautifulSoup(html, "lxml") blocks = soup.select("div.qblock") if not blocks: logger.warning( "No qblock nodes found for %s page=%s after retry", source["project_name"], page_index, ) break for block in blocks: task = self._parse_bank_question_block( block, project_guid=project_guid, source_name=source["project_name"], questions_url=questions_url, ) if task: tasks.append(task) if total_tasks is not None and (page_index + 1) * self.page_size >= total_tasks: break logger.info("Collected %s candidates from the dynamic bank", len(tasks)) return tasks async def _fetch_bank_page( self, *, questions_url: str, project_guid: str, page_index: int, force_post: bool = False, ) -> Optional[str]: page_url = ( f"{questions_url}?proj={project_guid}" f"&page={page_index}&pagesize={self.page_size}" ) if not force_post: html = await self.fetch_page(page_url) if html: return html return await self._post_bank_page( questions_url=questions_url, project_guid=project_guid, page_index=page_index, ) async def _post_bank_page( self, *, questions_url: str, project_guid: str, page_index: int, ) -> Optional[str]: response = await self._request( "POST", questions_url, data={ "search": "1", "pagesize": str(self.page_size), "proj": project_guid, "page": str(page_index), }, ) return response.text if response else None def _extract_total_count(self, html: str) -> Optional[int]: match = re.search(r"setQCount\((\d+)", html) return int(match.group(1)) if match else None def _parse_bank_question_block( self, block: Tag, *, project_guid: str, source_name: str, questions_url: str, ) -> Optional[Dict]: prompt_cell = block.select_one("td.cell_0") if not prompt_cell: return None content = self._clean_text(prompt_cell.get_text("\n", strip=True)) if not content: return None title = self._build_title_from_content(content, fallback=source_name) question_guid = self._extract_block_guid(block) variants = self._extract_variants_from_block(block) images = self._extract_images(prompt_cell, base_url=questions_url) return { "title": title, "content": content, "source_url": f"{questions_url}?proj={project_guid}&qid={question_guid}", "task_type": self._detect_task_type(title, content), "images": images, "variants": variants, "scraped_at": datetime.utcnow().isoformat(), "source_kind": "dynamic_bank", "task_guid": question_guid, } def _extract_block_guid(self, block: Tag) -> str: guid_input = block.select_one("form input[name='guid']") if guid_input and guid_input.get("value"): return guid_input["value"] return block.get("id", "").lstrip("q") def _extract_variants_from_block(self, block: Tag) -> List[str]: variants: List[str] = [] for label in block.find_all("label"): text = self._clean_text(label.get_text(" ", strip=True)) if text: variants.append(text) if not variants: for option in block.find_all("option"): text = self._clean_text(option.get_text(" ", strip=True)) if text and text.lower() != "выбор": variants.append(text) return variants[:10] async def scrape_official_archives(self, subject: str = "russian") -> List[Dict]: config = self.SUBJECT_CONFIG.get(subject, self.SUBJECT_CONFIG["russian"]) archive_links = await self._discover_official_archive_links(config) variant_links = await self._discover_official_variant_links(config) document_links = self._sort_document_links(archive_links + variant_links) tasks: List[Dict] = [] if not document_links: logger.warning("No official archive links found for %s", subject) return tasks if PdfReader is None: logger.warning("pypdf is not installed, skipping official PDF extraction") return tasks for document_url in document_links[: self.max_demo_archives]: document_bytes = await self.fetch_bytes(document_url) if not document_bytes: continue tasks.extend(self._extract_tasks_from_document_bytes(document_bytes, document_url)) logger.info("Collected %s candidates from official archives", len(tasks)) return tasks async def _discover_official_archive_links(self, config: Dict) -> List[str]: html = await self.fetch_page(config["official_demo_page"]) if not html: return [] soup = BeautifulSoup(html, "lxml") prefixes = config["archive_prefixes"] archive_links: List[str] = [] for link in soup.find_all("a", href=True): href = link["href"] absolute = href if href.startswith("http") else urljoin(config["official_demo_page"], href) href_lower = absolute.lower() if not href_lower.endswith(".zip"): continue if any(prefix in href_lower for prefix in prefixes): archive_links.append(absolute) def sort_key(url: str) -> int: match = re.search(r"/(20\d{2})/", url) return int(match.group(1)) if match else 0 archive_links.sort(key=sort_key, reverse=True) return archive_links async def _discover_official_variant_links(self, config: Dict) -> List[str]: variant_page = config.get("official_variant_page") if not variant_page: return [] html = await self.fetch_page(variant_page) if not html: return [] soup = BeautifulSoup(html, "lxml") prefixes = config.get("variant_prefixes", ()) links: List[str] = [] for link in soup.find_all("a", href=True): href = link["href"] absolute = href if href.startswith("http") else urljoin(variant_page, href) href_lower = absolute.lower() if not href_lower.endswith((".zip", ".pdf")): continue if "braille" in href_lower: continue filename = absolute.rsplit("/", 1)[-1].lower() if prefixes and not any(filename.startswith(prefix) for prefix in prefixes): continue links.append(absolute) return self._sort_document_links(links) def _sort_document_links(self, links: Iterable[str]) -> List[str]: def sort_key(url: str) -> tuple[int, str]: match = re.search(r"(20\d{2})", url) return (int(match.group(1)) if match else 0, url) return sorted(set(links), key=sort_key, reverse=True) def _extract_tasks_from_document_bytes(self, document_bytes: bytes, document_url: str) -> List[Dict]: if document_url.lower().endswith(".zip"): return self._extract_tasks_from_archive(document_bytes, document_url) if document_url.lower().endswith(".pdf"): return self._extract_tasks_from_pdf_document( document_bytes, document_url=document_url, document_name=document_url.rsplit("/", 1)[-1], ) return [] def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]: tasks: List[Dict] = [] try: with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive: for member_name in archive.namelist(): if not member_name.lower().endswith(".pdf"): continue if "демо" not in member_name.lower() and "demo" not in member_name.lower(): continue text = self._extract_text_from_pdf_bytes(archive.read(member_name)) if not text: continue year_match = re.search(r"(20\d{2})", archive_url) year = year_match.group(1) if year_match else "unknown" tasks.extend( self._extract_tasks_from_demo_text( text, archive_url=archive_url, document_name=member_name, year=year, ) ) except zipfile.BadZipFile: logger.error("Invalid archive %s", archive_url) return tasks def _extract_text_from_pdf_bytes(self, pdf_bytes: bytes) -> str: if PdfReader is None: return "" try: reader = PdfReader(io.BytesIO(pdf_bytes)) except Exception as e: # pragma: no cover - parser-dependent logger.error("Failed to open PDF: %s", e) return "" pages: List[str] = [] for page in reader.pages: try: page_text = page.extract_text() or "" except Exception: # pragma: no cover - parser-dependent page_text = "" if page_text: pages.append(page_text) return self._clean_text("\n".join(pages)) def _extract_tasks_from_demo_text( self, text: str, *, archive_url: str, document_name: str, year: str, ) -> List[Dict]: tasks: List[Dict] = [] if not text: return tasks bounded_text = text if not bounded_text: return tasks pattern = re.compile( r"(?ms)(?:^|\n)(\d{1,2})[\.\)]\s*(.+?)(?=(?:\n\d{1,2}[\.\)])|(?:\nЧасть\s+\d)|\Z)" ) for match in pattern.finditer(bounded_text): task_number = int(match.group(1)) content = self._clean_text(match.group(2)) if len(content) < 80: continue title = f"Демоверсия ЕГЭ {year}. Задание {task_number}" tasks.append( { "title": title, "content": content, "source_url": f"{archive_url}#task-{task_number}", "task_type": self._detect_task_type(title, content), "images": [], "variants": self._extract_variants(content), "scraped_at": datetime.utcnow().isoformat(), "source_kind": "official_demo_pdf", "document_name": document_name, "task_number": task_number, } ) if len(tasks) >= self.max_demo_tasks: break return tasks def _slice_demo_section(self, text: str) -> str: start = re.search(r"(Часть\s*1|Ответами к заданиям)", text, re.IGNORECASE) if not start: return text end = re.search(r"(Система оценивания|Ключи|Ответы)", text[start.start() :], re.IGNORECASE) if not end: return text[start.start() :] return text[start.start() : start.start() + end.start()] def _extract_tasks_from_archive(self, archive_bytes: bytes, archive_url: str) -> List[Dict]: tasks: List[Dict] = [] try: with zipfile.ZipFile(io.BytesIO(archive_bytes)) as archive: for member_name in archive.namelist(): if not member_name.lower().endswith(".pdf"): continue if not self._should_parse_pdf_member(member_name, archive_url): continue tasks.extend( self._extract_tasks_from_pdf_document( archive.read(member_name), document_url=archive_url, document_name=member_name, ) ) except zipfile.BadZipFile: logger.error("Invalid archive %s", archive_url) return tasks def _should_parse_pdf_member(self, member_name: str, document_url: str) -> bool: member_lower = member_name.lower() if any(token in member_lower for token in ("спец", "кодиф", "критер", "ответ", "аудио")): return False if "otkrytyye-varianty-kim-ege" in document_url.lower(): return True return "демо" in member_lower or "demo" in member_lower def _extract_tasks_from_pdf_document( self, pdf_bytes: bytes, *, document_url: str, document_name: str, ) -> List[Dict]: text = self._extract_text_from_pdf_bytes(pdf_bytes) if not text: return [] year_match = re.search(r"(20\d{2})", document_url) year = year_match.group(1) if year_match else "unknown" return self._extract_tasks_from_demo_text( text, archive_url=document_url, document_name=document_name, year=year, source_kind=self._detect_document_source_kind(document_url), ) def _detect_document_source_kind(self, document_url: str) -> str: if "otkrytyye-varianty-kim-ege" in document_url.lower(): return "official_open_variant_pdf" return "official_demo_pdf" def _extract_tasks_from_demo_text( self, text: str, *, archive_url: str, document_name: str, year: str, source_kind: str = "official_demo_pdf", ) -> List[Dict]: tasks: List[Dict] = [] if not text: return tasks bounded_text = text if not bounded_text: return tasks for raw_block in self._split_pdf_into_task_blocks(bounded_text): content = self._cleanup_pdf_task_block(raw_block) content = self._trim_to_task_start(content) if not self._looks_like_official_task_block(content): continue task_number = len(tasks) + 1 document_label = "Открытый вариант ЕГЭ" if source_kind == "official_open_variant_pdf" else "Демоверсия ЕГЭ" title = f"{document_label} {year}. Задание {task_number}" tasks.append( { "title": title, "content": content, "source_url": f"{archive_url}#task-{task_number}", "task_type": self._detect_task_type(title, content), "images": [], "variants": self._extract_variants(content), "scraped_at": datetime.utcnow().isoformat(), "source_kind": source_kind, "document_name": document_name, "task_number": task_number, } ) if len(tasks) >= self.max_demo_tasks: break return tasks def _split_pdf_into_task_blocks(self, text: str) -> List[str]: answer_pattern = re.compile(r"(?:^|\n)\s*Ответ\s*:\s*[_\.\s]*", re.IGNORECASE) blocks: List[str] = [] last_pos = 0 for match in answer_pattern.finditer(text): block = text[last_pos:match.start()] if block.strip(): blocks.append(block) last_pos = match.end() return blocks def _cleanup_pdf_task_block(self, block: str) -> str: lines: List[str] = [] for raw_line in block.splitlines(): line = self._clean_text(raw_line) if not line: continue lower = line.lower() if line == "&%end_page&%": continue if re.fullmatch(r"\d{1,2}", line): continue if re.search(r"\d+\s*/\s*\d+$", line): continue if lower.startswith(("демонстрационный вариант егэ", "открытый вариант ким егэ", "единый государственный экзамен")): continue if lower.startswith("© "): continue lines.append(line) return self._clean_text("\n".join(lines)) def _trim_to_task_start(self, text: str) -> str: if not text: return text starts = [text.find(pattern) for pattern in self.PDF_TASK_START_PATTERNS if text.find(pattern) >= 0] if starts: return text[min(starts):].strip() return text.strip() def _looks_like_official_task_block(self, text: str) -> bool: if len(text) < 70 or len(text) > 6000: return False lower = text.lower() if any(pattern.lower() in lower for pattern in self.PDF_NOISE_PATTERNS): return False return any(pattern.lower() in lower for pattern in self.PDF_TASK_START_PATTERNS) def _slice_demo_section(self, text: str) -> str: start_matches = list(re.finditer(r"(?m)^\s*Часть\s*1\s*$", text, re.IGNORECASE)) if start_matches: start_pos = start_matches[-1].start() else: fallback = list(re.finditer(r"Ответами к заданиям", text, re.IGNORECASE)) if not fallback: return text start_pos = fallback[-1].start() end = re.search( r"(Часть\s*2|Задание\s*27|Система оценивания|Критерии оценивания|Ключи)", text[start_pos:], re.IGNORECASE, ) if not end: return text[start_pos:] return text[start_pos : start_pos + end.start()] def parse_task_page(self, html: str, url: str) -> Optional[Dict]: if not html: return None soup = BeautifulSoup(html, "lxml") for selector in ( "div.qblock", "article", "main article", ".field--name-body", ".content", "main", "body", ): container = soup.select_one(selector) if not container: continue candidate = self._build_candidate_from_container(container, url) if candidate: return candidate return None def _build_candidate_from_container(self, container: Tag, url: str) -> Optional[Dict]: cloned = BeautifulSoup(str(container), "lxml") root = cloned.find() if root is None: return None for element in root.find_all(["script", "style", "nav", "header", "footer", "form", "button", "aside"]): element.decompose() title_tag = root.find(["h1", "h2", "h3", "strong", "b"]) title = self._clean_text(title_tag.get_text(" ", strip=True)) if title_tag else "" content = self._clean_text(root.get_text("\n", strip=True)) if not title: title = self._build_title_from_content(content, fallback=url) images = self._extract_images(root, base_url=url) candidate = { "title": title, "content": content, "source_url": url, "task_type": self._detect_task_type(title, content), "images": images, "variants": self._extract_variants(content), "scraped_at": datetime.utcnow().isoformat(), "source_kind": "generic_html", } return candidate if self._passes_quality_gate(candidate) else None async def scrape_task_by_id(self, task_id: str) -> Optional[Dict]: config = self.SUBJECT_CONFIG["russian"]["dynamic_sources"][0] html = await self.fetch_page( f"{config['base_url']}/questions.php?proj={config['project_guid']}&qid={task_id}" ) if not html: return None soup = BeautifulSoup(html, "lxml") block = soup.select_one("div.qblock") if not block: return None return self._parse_bank_question_block( block, project_guid=config["project_guid"], source_name=config["project_name"], questions_url=f"{config['base_url']}/questions.php", ) async def search_tasks(self, query: str) -> List[Dict]: query_lower = query.lower().strip() tasks = await self.scrape_tasks(subject="russian") return [ task for task in tasks if query_lower in task.get("title", "").lower() or query_lower in task.get("content", "").lower() ] def _filter_candidates(self, candidates: Iterable[Dict]) -> List[Dict]: accepted: List[Dict] = [] for candidate in candidates: if self._passes_quality_gate(candidate): accepted.append(candidate) return accepted def _dedupe_candidates(self, candidates: Iterable[Dict]) -> List[Dict]: deduped: List[Dict] = [] seen_keys = set() for candidate in candidates: normalized = self._clean_text(candidate.get("content", ""))[:400] key = (candidate.get("source_url", ""), normalized) if key in seen_keys: continue seen_keys.add(key) deduped.append(candidate) return deduped def _passes_quality_gate(self, candidate: Dict) -> bool: score = self._score_candidate(candidate) candidate["quality_score"] = score return score >= self.min_quality_score def _score_candidate(self, candidate: Dict) -> int: title = candidate.get("title", "").lower() content = candidate.get("content", "").lower() source_kind = candidate.get("source_kind", "") length = len(content) score = 0 if source_kind == "dynamic_bank": score += 60 elif source_kind in {"official_demo_pdf", "official_open_variant_pdf"}: score += 50 else: score += 10 if 80 <= length <= 3500: score += 15 elif length > 5000: score -= 20 else: score -= 10 if any(keyword in content for keywords in self.TASK_TYPE_KEYWORDS.values() for keyword in keywords): score += 10 if any(pattern.lower() in content for pattern in self.PDF_TASK_START_PATTERNS): score += 10 if re.search(r"\b\d+\b", content): score += 5 if any(pattern in title for pattern in self.GENERIC_TITLE_PATTERNS): score -= 45 noise_hits = sum(1 for pattern in self.NOISE_PATTERNS if pattern in content[:1200]) score -= min(noise_hits * 8, 32) if content.count("\n") > 80: score -= 10 return score def _detect_task_type(self, title: str, content: str) -> str: text = f"{title} {content}".lower() for task_type, keywords in self.TASK_TYPE_KEYWORDS.items(): if any(keyword in text for keyword in keywords): return task_type return "other" def _extract_variants(self, content: str) -> List[str]: matches = re.findall(r"(?:^|\n)(?:[1-6]|[A-DА-Г])[.)]\s*([^\n]{2,200})", content) return [self._clean_text(match) for match in matches[:10]] def _extract_images(self, container: Tag, *, base_url: str) -> List[str]: images: List[str] = [] for img in container.find_all("img"): src = img.get("src") or img.get("data-src") if not src: continue images.append(src if src.startswith("http") else urljoin(base_url, src)) return images[:10] def _build_title_from_content(self, content: str, fallback: str) -> str: first_line = next((line.strip() for line in content.splitlines() if line.strip()), "") title = self._clean_text(first_line) if not title: title = fallback return title[:160] def _clean_text(self, text: str) -> str: text = text.replace("\xa0", " ") text = re.sub( r"\b(?:[A-Za-zА-Яа-яЁё]\s+){2,}[A-Za-zА-Яа-яЁё]\b", lambda match: match.group(0).replace(" ", ""), text, ) text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip()