Spaces:
Sleeping
Sleeping
| """ | |
| Auto-crawler for Russian news corpora. | |
| Features: | |
| - Uses site presets (RSS, sitemap, section pages) to discover article URLs automatically | |
| - Falls back to parsing section pages and simple pagination patterns | |
| - Respects robots.txt and uses polite delays | |
| - Saves corpus as JSONL: each line = {"url","title","text","date","category"} | |
| Usage examples: | |
| python src/scrapers.py --auto --out data/raw_corpus.jsonl --min_words 50000 --max_articles 2000 | |
| python src/scrapers.py --sites lenta,ria --out data/raw_corpus.jsonl --max_articles 1000 | |
| Requirements: | |
| pip install requests beautifulsoup4 feedparser | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import random | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List, Dict, Optional | |
| from urllib.parse import urlparse, urljoin | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import urllib.robotparser as robotparser | |
| import feedparser | |
| import hashlib | |
| import os | |
| import re | |
| from urllib.parse import parse_qsl, urlencode, urlunparse, urlparse | |
| import difflib | |
| import threading | |
| import time as _time | |
| # per-domain rate limiter | |
| _LAST_REQUEST: Dict[str, float] = {} | |
| _LAST_LOCKS: Dict[str, threading.Lock] = {} | |
| _MIN_DELAY_PER_DOMAIN = 0.3 # Уменьшили задержку | |
| logger = logging.getLogger("auto_crawler") | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| HEADERS = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"} | |
| SESSION = requests.Session() | |
| SESSION.headers.update(HEADERS) | |
| # ---------------- Site presets ---------------- | |
| SITE_PRESETS = { | |
| "lenta": { | |
| "name": "lenta.ru", | |
| "rss": ["https://lenta.ru/rss"], | |
| "sitemap": ["https://lenta.ru/sitemap.xml"], | |
| "sections": ["https://lenta.ru"], | |
| }, | |
| "ria": { | |
| "name": "ria.ru", | |
| "rss": ["https://ria.ru/export/rss2/index.xml"], | |
| "sitemap": ["https://ria.ru/sitemap.xml"], | |
| "sections": ["https://ria.ru/"] | |
| }, | |
| "tass": { | |
| "name": "tass.ru", | |
| "rss": ["https://tass.ru/rss/v2.xml"], | |
| "sitemap": ["https://tass.ru/sitemap.xml"], | |
| "sections": ["https://tass.ru/"] | |
| }, | |
| "kommersant": { | |
| "name": "kommersant.ru", | |
| "rss": ["https://www.kommersant.ru/RSS/news.xml"], | |
| "sitemap": ["https://www.kommersant.ru/sitemap.xml"], | |
| "sections": ["https://www.kommersant.ru/"] | |
| }, | |
| } | |
| # ---------------- Helpers ---------------- | |
| def _get_domain(netloc: str) -> str: | |
| return netloc.lower() | |
| def _wait_for_domain(domain: str): | |
| lock = None | |
| with threading.Lock(): | |
| lock = _LAST_LOCKS.get(domain) | |
| if lock is None: | |
| lock = threading.Lock() | |
| _LAST_LOCKS[domain] = lock | |
| with lock: | |
| last = _LAST_REQUEST.get(domain, 0.0) | |
| elapsed = _time.time() - last | |
| if elapsed < _MIN_DELAY_PER_DOMAIN: | |
| to_sleep = _MIN_DELAY_PER_DOMAIN - elapsed | |
| time.sleep(to_sleep) | |
| _LAST_REQUEST[domain] = _time.time() | |
| def get_html(url: str, timeout: int = 10) -> Optional[str]: | |
| try: | |
| parsed = urlparse(url) | |
| domain = _get_domain(parsed.netloc) | |
| _wait_for_domain(domain) | |
| r = SESSION.get(url, timeout=timeout) | |
| r.raise_for_status() | |
| return r.text | |
| except Exception as e: | |
| logger.debug("GET failed %s -> %s", url, e) | |
| return None | |
| def polite_sleep(min_s: float = 0.5, max_s: float = 1.5): | |
| time.sleep(random.uniform(min_s, max_s)) | |
| def can_fetch(url: str, user_agent: str = HEADERS["User-Agent"]) -> bool: | |
| return True # Всегда разрешаем | |
| def extract_title(soup: BeautifulSoup) -> str: | |
| h1 = soup.find("h1") | |
| if h1 and h1.get_text(strip=True): | |
| return h1.get_text(strip=True) | |
| og = soup.find("meta", property="og:title") or soup.find("meta", attrs={"name": "title"}) | |
| if og and og.get("content"): | |
| return og.get("content").strip() | |
| if soup.title and soup.title.string: | |
| return soup.title.string.strip() | |
| return "" | |
| def extract_date(soup: BeautifulSoup) -> str: | |
| t = soup.find("time") | |
| if t: | |
| if t.get("datetime"): | |
| return t.get("datetime").strip() | |
| if t.get_text(strip=True): | |
| return t.get_text(strip=True) | |
| meta = soup.find("meta", property="article:published_time") or soup.find("meta", | |
| attrs={"itemprop": "datePublished"}) | |
| if meta and meta.get("content"): | |
| return meta.get("content").strip() | |
| return "" | |
| def extract_category(soup: BeautifulSoup) -> str: | |
| meta = soup.find("meta", property="article:section") | |
| if meta and meta.get("content"): | |
| return meta.get("content").strip() | |
| bc = soup.select_one(".breadcrumb, .breadcrumbs, nav[aria-label='breadcrumb']") | |
| if bc: | |
| return bc.get_text(" ", strip=True) | |
| return "" | |
| def extract_main_text(soup: BeautifulSoup) -> str: | |
| # Удаляем ненужные элементы | |
| for unwanted in soup.select( | |
| 'script, style, nav, header, footer, aside, .ad, .ads, .advertisement, .social, .share, .comments, .menu, .sidebar'): | |
| unwanted.decompose() | |
| # Стратегия 1: Поиск по селекторам | |
| content_selectors = [ | |
| 'article', | |
| '.article', | |
| '.content', | |
| '.post-content', | |
| '.entry-content', | |
| '.story__content', | |
| '.article__text', | |
| '.news-text', | |
| '[itemprop="articleBody"]', | |
| '.b-article__content', | |
| '.text', | |
| '.post__text' | |
| ] | |
| for selector in content_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| text = element.get_text(" ", strip=True) | |
| if len(text.split()) > 50: | |
| return text | |
| # Стратегия 2: Самый длинный div с текстом | |
| divs = soup.find_all('div') | |
| best_div = None | |
| max_words = 0 | |
| for div in divs: | |
| # Пропускаем навигационные и рекламные div | |
| classes = div.get('class', []) | |
| class_str = ' '.join(classes).lower() if classes else '' | |
| if any(x in class_str for x in ['nav', 'menu', 'header', 'footer', 'sidebar', 'ad', 'comment']): | |
| continue | |
| text = div.get_text(" ", strip=True) | |
| words = text.split() | |
| if len(words) > max_words and len(words) > 30: | |
| max_words = len(words) | |
| best_div = div | |
| if best_div: | |
| return best_div.get_text(" ", strip=True) | |
| # Стратегия 3: Все параграфы | |
| paragraphs = soup.find_all('p') | |
| texts = [] | |
| for p in paragraphs: | |
| text = p.get_text(" ", strip=True) | |
| if len(text) > 30: | |
| texts.append(text) | |
| return "\n\n".join(texts) if texts else "" | |
| def fetch_article(url: str, use_robots: bool = False, polite: bool = True) -> Optional[Dict]: | |
| # Игнорируем robots.txt | |
| html = get_html(url) | |
| if not html: | |
| return None | |
| soup = BeautifulSoup(html, "html.parser") | |
| title = extract_title(soup) | |
| date = extract_date(soup) | |
| category = extract_category(soup) | |
| text = extract_main_text(soup) | |
| # Проверка на валидность статьи | |
| if not text.strip(): | |
| return None | |
| word_count = len(text.split()) | |
| if word_count < 30: | |
| logger.debug(f"Article too short: {url} ({word_count} words)") | |
| return None | |
| canonical = canonicalize_url_from_soup(url, soup) | |
| norm_text = normalize_text_for_hash(text) | |
| text_hash = hashlib.sha256(norm_text.encode('utf-8')).hexdigest() | |
| if polite: | |
| polite_sleep(0.3, 1.0) | |
| return { | |
| "url": url, | |
| "canonical_url": canonical, | |
| "title": title, | |
| "text": text, | |
| "norm_text": norm_text, | |
| "text_hash": text_hash, | |
| "date": date, | |
| "category": category, | |
| } | |
| # ---------- Discovery: RSS / Sitemap / Section crawling ---------- | |
| def urls_from_rss(rss_url: str, limit: Optional[int] = None) -> List[str]: | |
| try: | |
| feed = feedparser.parse(rss_url) | |
| items = feed.entries or [] | |
| urls = [] | |
| for entry in items[:limit] if limit else items: | |
| link = entry.get("link") or entry.get("id") | |
| if link: | |
| urls.append(link) | |
| return urls | |
| except Exception as e: | |
| logger.debug("RSS parse failed %s -> %s", rss_url, e) | |
| return [] | |
| import gzip | |
| from io import BytesIO | |
| def urls_from_sitemap(sitemap_url: str, limit: Optional[int] = None, depth: int = 0) -> List[str]: | |
| if depth > 2: | |
| return [] | |
| try: | |
| r = SESSION.get(sitemap_url, timeout=15) | |
| r.raise_for_status() | |
| content = r.content | |
| if sitemap_url.endswith('.gz') or r.headers.get('content-type', '').endswith('gzip'): | |
| try: | |
| buf = BytesIO(content) | |
| with gzip.GzipFile(fileobj=buf) as gz: | |
| content = gz.read().decode('utf-8', errors='ignore') | |
| except Exception: | |
| return [] | |
| else: | |
| content = content.decode('utf-8', errors='ignore') | |
| soup = BeautifulSoup(content, "xml") | |
| urls = [] | |
| sitemap_tags = soup.find_all('sitemap') | |
| if sitemap_tags: | |
| loc_tags = [s.find('loc') for s in sitemap_tags if s.find('loc')] | |
| for loc_tag in loc_tags: | |
| if loc_tag and loc_tag.text: | |
| nested_urls = urls_from_sitemap(loc_tag.text.strip(), limit, depth + 1) | |
| urls.extend(nested_urls) | |
| if limit and len(urls) >= limit: | |
| return urls[:limit] | |
| else: | |
| loc_tags = soup.find_all('loc') | |
| for loc_tag in loc_tags: | |
| if loc_tag and loc_tag.text: | |
| urls.append(loc_tag.text.strip()) | |
| if limit: | |
| return urls[:limit] | |
| return urls | |
| except Exception as e: | |
| logger.debug("Sitemap parse failed %s -> %s", sitemap_url, e) | |
| return [] | |
| def urls_from_section_page(section_url: str, max_links: int = 200, paginate: bool = True, max_pages: int = 5) -> List[ | |
| str]: | |
| logger.info("Collect links from section %s", section_url) | |
| found = [] | |
| base = "{scheme}://{netloc}".format(scheme=urlparse(section_url).scheme, netloc=urlparse(section_url).netloc) | |
| for page in range(1, max_pages + 1): | |
| url = section_url | |
| if paginate and page > 1: | |
| if section_url.endswith("/"): | |
| url = section_url.rstrip("/") + f"/page/{page}/" | |
| else: | |
| url = section_url + f"/page/{page}/" | |
| html = get_html(url) | |
| if not html: | |
| break | |
| soup = BeautifulSoup(html, "html.parser") | |
| anchors = soup.find_all("a", href=True) | |
| for a in anchors: | |
| href = a["href"] | |
| if href.startswith("//"): | |
| href = urlparse(section_url).scheme + ":" + href | |
| if href.startswith("/"): | |
| href = urljoin(base, href) | |
| if href.startswith(base) and href not in found: | |
| found.append(href.split("#")[0]) | |
| if len(found) >= max_links: | |
| break | |
| polite_sleep(0.2, 0.8) | |
| seen = [] | |
| for u in found: | |
| if u.startswith("http") and u not in seen: | |
| seen.append(u) | |
| return seen[:max_links] | |
| def discover_urls_for_site(preset: Dict, per_source_limit: Optional[int] = None) -> List[str]: | |
| urls = [] | |
| for rss in preset.get("rss", []): | |
| try: | |
| r = urls_from_rss(rss, limit=per_source_limit) | |
| logger.info("RSS %s -> %d links", rss, len(r)) | |
| urls.extend(r) | |
| except Exception as e: | |
| logger.debug("RSS failed for %s: %s", rss, e) | |
| continue | |
| for sm in preset.get("sitemap", []): | |
| try: | |
| r = urls_from_sitemap(sm, limit=per_source_limit) | |
| logger.info("Sitemap %s -> %d links", sm, len(r)) | |
| urls.extend(r) | |
| except Exception as e: | |
| logger.debug("Sitemap failed for %s: %s", sm, e) | |
| continue | |
| for sec in preset.get("sections", []): | |
| try: | |
| r = urls_from_section_page(sec, max_links=per_source_limit or 200, paginate=True, max_pages=3) | |
| logger.info("Section %s -> %d links", sec, len(r)) | |
| urls.extend(r) | |
| except Exception as e: | |
| logger.debug("Section failed for %s: %s", sec, e) | |
| continue | |
| unique = list(dict.fromkeys(urls)) | |
| if per_source_limit: | |
| unique = unique[:per_source_limit] | |
| return unique | |
| # ---------------- Main crawling procedure ---------------- | |
| def save_jsonl(path: str, items: List[Dict]): | |
| with open(path, "w", encoding="utf-8") as f: | |
| for it in items: | |
| f.write(json.dumps(it, ensure_ascii=False) + "\n") | |
| logger.info("Saved %d articles to %s", len(items), path) | |
| def crawl_urls(urls: List[str], out_path: str, max_articles: Optional[int], max_workers: int = 4): | |
| urls = [u.strip() for u in urls if u and not u.strip().startswith("#")] | |
| urls = list(dict.fromkeys(urls)) | |
| if max_articles: | |
| urls = urls[:max_articles] | |
| logger.info("Crawling %d urls", len(urls)) | |
| seen_urls_path = "data/seen_urls.txt" | |
| seen_hashes_path = "data/seen_hashes.txt" | |
| seen_urls = load_seen_set(seen_urls_path) | |
| seen_hashes = load_seen_set(seen_hashes_path) | |
| recent_texts = [] | |
| collected = [] | |
| os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) | |
| with ThreadPoolExecutor(max_workers=max_workers) as ex: | |
| futures = {ex.submit(fetch_article, u, False, True): u for u in urls} | |
| for i, fut in enumerate(as_completed(futures)): | |
| url = futures[fut] | |
| try: | |
| art = fut.result() | |
| if not art: | |
| continue | |
| can_url = art.get("canonical_url") or strip_tracking_params(art["url"]) | |
| if can_url in seen_urls: | |
| continue | |
| h = art.get("text_hash") | |
| if h in seen_hashes: | |
| continue | |
| norm = art.get("norm_text", "") | |
| duplicate = False | |
| for prev in recent_texts: | |
| if is_similar_text(norm, prev, threshold=0.92): | |
| duplicate = True | |
| break | |
| if duplicate: | |
| continue | |
| collected.append(art) | |
| recent_texts.append(norm) | |
| seen_urls.add(can_url) | |
| seen_hashes.add(h) | |
| logger.info("[%d/%d] Fetched: %s (words=%d)", i + 1, len(urls), url, len(art.get("text", "").split())) | |
| except Exception as e: | |
| logger.debug("Error fetching %s: %s", url, e) | |
| if collected: | |
| save_jsonl(out_path, collected) | |
| save_seen_set(seen_urls_path, seen_urls) | |
| save_seen_set(seen_hashes_path, seen_hashes) | |
| logger.info("Successfully collected %d articles", len(collected)) | |
| return collected | |
| TRACKING_PREFIXES = ("utm_", "utm", "fbclid", "gclid", "yclid", "mc_cid", "mc_eid", "ref", "from", "trk") | |
| def strip_tracking_params(url: str) -> str: | |
| try: | |
| p = urlparse(url) | |
| qs = dict(parse_qsl(p.query, keep_blank_values=True)) | |
| qs = {k: v for k, v in qs.items() if not any(k.lower().startswith(pref) for pref in TRACKING_PREFIXES)} | |
| new_q = urlencode(sorted(qs.items())) | |
| new_parts = (p.scheme, p.netloc.lower(), p.path.rstrip('/'), p.params, new_q, "") | |
| return urlunparse(new_parts) | |
| except Exception: | |
| return url | |
| def canonicalize_url_from_soup(url: str, soup: BeautifulSoup) -> str: | |
| tag = soup.find("link", rel=lambda x: x and "canonical" in x.lower()) | |
| if tag and tag.get("href"): | |
| return strip_tracking_params(tag.get("href")) | |
| return strip_tracking_params(url) | |
| def normalize_text_for_hash(text: str) -> str: | |
| t = text.lower() | |
| t = re.sub(r'\s+', ' ', t).strip() | |
| return t | |
| def is_similar_text(a: str, b: str, threshold: float = 0.92) -> bool: | |
| return difflib.SequenceMatcher(None, a, b).ratio() >= threshold | |
| def load_seen_set(path: str) -> set: | |
| if not os.path.exists(path): | |
| return set() | |
| with open(path, "r", encoding="utf-8") as f: | |
| return set(l.strip() for l in f if l.strip()) | |
| def save_seen_set(path: str, s: set): | |
| with open(path, "w", encoding="utf-8") as f: | |
| for it in s: | |
| f.write(it + "\n") | |
| # ---------------- CLI ---------------- | |
| def main(): | |
| p = argparse.ArgumentParser() | |
| p.add_argument("--auto", action="store_true", help="Use built-in site presets") | |
| p.add_argument("--sites", help="Comma-separated site keys to use from presets", default="") | |
| p.add_argument("--per_site_limit", type=int, help="How many candidate links to take per source", default=500) | |
| p.add_argument("--max_articles", type=int, help="Max number of articles to fetch", default=1000) | |
| p.add_argument("--min_words", type=int, help="Desired minimal words in corpus", default=50000) | |
| p.add_argument("--out", help="Output jsonl file", default="data/raw_corpus.jsonl") | |
| p.add_argument("--max_workers", type=int, help="Max concurrent fetch workers", default=4) | |
| p.add_argument("--urls", help="Path to file with URLs (one per line). Lines starting with # are comments.", | |
| default=None) | |
| p.add_argument("--dump-urls", help="Discover URLs from SITE_PRESETS and save to file", default=None) | |
| args = p.parse_args() | |
| if args.dump_urls: | |
| sites_keys = list(SITE_PRESETS.keys()) if args.auto else [s.strip() for s in args.sites.split(",") if s.strip()] | |
| if not sites_keys: | |
| sites_keys = list(SITE_PRESETS.keys()) | |
| logger.info("Using all available sites: %s", ", ".join(sites_keys)) | |
| discovered_all = [] | |
| for key in sites_keys: | |
| preset = SITE_PRESETS.get(key) | |
| if not preset: | |
| logger.warning("No preset for site '%s', skipping", key) | |
| continue | |
| logger.info("Discovering URLs for %s...", key) | |
| urls = discover_urls_for_site(preset, per_source_limit=args.per_site_limit) | |
| logger.info("Discovered %d urls for preset %s", len(urls), key) | |
| discovered_all.extend(urls) | |
| unique = list(dict.fromkeys(discovered_all)) | |
| os.makedirs(os.path.dirname(args.dump_urls) or ".", exist_ok=True) | |
| with open(args.dump_urls, "w", encoding="utf-8") as f: | |
| for u in unique: | |
| f.write(u + "\n") | |
| logger.info("Saved %d unique discovered URLs to %s", len(unique), args.dump_urls) | |
| return | |
| if args.urls: | |
| if not os.path.exists(args.urls): | |
| logger.error("URLs file not found: %s", args.urls) | |
| return | |
| with open(args.urls, "r", encoding="utf-8") as f: | |
| urls = [ln.strip() for ln in f if ln.strip()] | |
| crawl_urls(urls, out_path=args.out, max_articles=args.max_articles, max_workers=args.max_workers) | |
| return | |
| logger.error("Either --auto, --sites or --urls must be provided.") | |
| if __name__ == "__main__": | |
| main() |