Roman190928 commited on
Commit
f1fd68b
·
verified ·
1 Parent(s): 811f1c4

Delete crawler

Browse files
crawler/__init__.py DELETED
@@ -1,19 +0,0 @@
1
- from .config import (
2
- MAX_SHARD_ROWS,
3
- MAX_SHARDS,
4
- NORMAL_TOTAL_WORKERS,
5
- SUPER_TOTAL_WORKERS,
6
- CrawlerConfig,
7
- compute_worker_split,
8
- )
9
- from .engine import AsyncCrawler
10
-
11
- __all__ = [
12
- "AsyncCrawler",
13
- "CrawlerConfig",
14
- "MAX_SHARD_ROWS",
15
- "MAX_SHARDS",
16
- "NORMAL_TOTAL_WORKERS",
17
- "SUPER_TOTAL_WORKERS",
18
- "compute_worker_split",
19
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/config.py DELETED
@@ -1,97 +0,0 @@
1
- from __future__ import annotations
2
-
3
- from dataclasses import dataclass, field
4
- from pathlib import Path
5
-
6
- NORMAL_TOTAL_WORKERS = 12
7
- SUPER_TOTAL_WORKERS = 24
8
- MAX_SHARD_ROWS = 15_000
9
- MAX_SHARDS = 10
10
-
11
-
12
- def validate_total_workers(total_workers: int) -> int:
13
- value = int(total_workers)
14
- if value not in {NORMAL_TOTAL_WORKERS, SUPER_TOTAL_WORKERS}:
15
- raise ValueError(
16
- f"total_workers must be {NORMAL_TOTAL_WORKERS} or {SUPER_TOTAL_WORKERS}, got {value}."
17
- )
18
- return value
19
-
20
-
21
- def compute_worker_split(total_workers: int) -> tuple[int, int]:
22
- total = validate_total_workers(total_workers)
23
- fetch_workers = (total * 5) // 6
24
- parser_workers = total - fetch_workers
25
- if fetch_workers < 1 or parser_workers < 1:
26
- raise ValueError(f"Invalid worker split for total_workers={total}.")
27
- return fetch_workers, parser_workers
28
-
29
-
30
- @dataclass
31
- class CrawlerConfig:
32
- seed_urls: list[str]
33
-
34
- max_links_per_page: int = 250
35
- request_timeout_seconds: float = 18.0
36
- max_response_bytes: int = 3_000_000
37
- user_agent: str = "HFDBContCrawler/1.0 (+https://huggingface.co/datasets)"
38
- seen_url_cache_size: int = 2_000_000
39
-
40
- fetch_queue_size: int = 100_000
41
- parse_queue_size: int = 25_000
42
- record_queue_size: int = 50_000
43
- report_every_seconds: float = 5.0
44
-
45
- output_dir: Path = field(
46
- default_factory=lambda: Path(__file__).resolve().parents[1] / "shards"
47
- )
48
- shard_size_rows: int = 10_000
49
- max_shards: int = MAX_SHARDS
50
- parquet_compression: str = "zstd"
51
- parquet_compression_level: int = 9
52
-
53
- enable_hf_upload: bool = False
54
- hf_repo_id: str = ""
55
- hf_token: str = ""
56
- hf_repo_type: str = "dataset"
57
- hf_private_repo: bool = False
58
- hf_path_prefix: str = "crawl_shards"
59
-
60
- total_workers: int = NORMAL_TOTAL_WORKERS
61
- request_delay_global_seconds: float = 0.02
62
- request_delay_per_domain_seconds: float = 2.0
63
-
64
- robots_cache_ttl_seconds: float = 3600.0
65
- robots_fail_closed: bool = True
66
- robots_max_bytes: int = 300_000
67
-
68
- fetch_workers: int = field(init=False)
69
- parser_workers: int = field(init=False)
70
-
71
- def __post_init__(self) -> None:
72
- self.seed_urls = [u.strip() for u in self.seed_urls if u and u.strip()]
73
- if not self.seed_urls:
74
- raise ValueError("At least one seed URL is required.")
75
-
76
- self.total_workers = validate_total_workers(self.total_workers)
77
- self.fetch_workers, self.parser_workers = compute_worker_split(self.total_workers)
78
-
79
- self.shard_size_rows = int(self.shard_size_rows)
80
- if self.shard_size_rows < 1 or self.shard_size_rows > MAX_SHARD_ROWS:
81
- raise ValueError(f"shard_size_rows must be between 1 and {MAX_SHARD_ROWS}.")
82
-
83
- self.max_shards = int(self.max_shards)
84
- if self.max_shards < 1 or self.max_shards > MAX_SHARDS:
85
- raise ValueError(f"max_shards must be between 1 and {MAX_SHARDS}.")
86
-
87
- self.output_dir = Path(self.output_dir).expanduser()
88
-
89
- self.hf_repo_id = self.hf_repo_id.strip()
90
- self.hf_token = self.hf_token.strip()
91
- self.hf_path_prefix = self.hf_path_prefix.strip() or "crawl_shards"
92
-
93
- if self.enable_hf_upload:
94
- if not self.hf_repo_id:
95
- raise ValueError("hf_repo_id is required when enable_hf_upload=True.")
96
- if not self.hf_token:
97
- raise ValueError("hf_token is required when enable_hf_upload=True.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/engine.py DELETED
@@ -1,302 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- import contextlib
5
- from collections import deque
6
- from typing import Any
7
-
8
- import aiohttp
9
-
10
- from .config import CrawlerConfig
11
- from .fetch import fetch_url
12
- from .models import CrawlStats, FetchResult
13
- from .parse import parse_page
14
- from .rate_limit import RequestRateLimiter
15
- from .robots import RobotsPolicy
16
- from .shards import ParquetShardWriter, ShardLimitReached
17
- from .utils import has_binary_extension, normalize_url
18
-
19
-
20
- class AsyncCrawler:
21
- def __init__(self, config: CrawlerConfig):
22
- self.config = config
23
- self.stats = CrawlStats()
24
- self.stop_event = asyncio.Event()
25
- self.stop_reason = ""
26
-
27
- self.fetch_queue: asyncio.Queue[str | None] = asyncio.Queue(
28
- maxsize=config.fetch_queue_size
29
- )
30
- self.parse_queue: asyncio.Queue[FetchResult | None] = asyncio.Queue(
31
- maxsize=config.parse_queue_size
32
- )
33
- self.record_queue: asyncio.Queue[dict[str, Any] | None] = asyncio.Queue(
34
- maxsize=config.record_queue_size
35
- )
36
-
37
- self.seen_urls: set[str] = set()
38
- self.seen_order: deque[str] = deque()
39
- self.seen_lock = asyncio.Lock()
40
- self.counter_lock = asyncio.Lock()
41
-
42
- self.active_fetchers = 0
43
- self.active_parsers = 0
44
-
45
- self.writer = ParquetShardWriter(config=config, stats=self.stats)
46
- self.rate_limiter: RequestRateLimiter | None = None
47
- self.robots_policy: RobotsPolicy | None = None
48
-
49
- async def run(self) -> None:
50
- await self.writer.initialize()
51
-
52
- for seed in self.config.seed_urls:
53
- await self.try_enqueue(seed)
54
-
55
- connector = aiohttp.TCPConnector(
56
- limit=max(200, self.config.fetch_workers * 4),
57
- ttl_dns_cache=300,
58
- )
59
- timeout = aiohttp.ClientTimeout(total=self.config.request_timeout_seconds)
60
-
61
- async with aiohttp.ClientSession(
62
- connector=connector,
63
- timeout=timeout,
64
- headers={"User-Agent": self.config.user_agent},
65
- ) as session:
66
- self.rate_limiter = RequestRateLimiter(
67
- global_interval_seconds=self.config.request_delay_global_seconds,
68
- per_domain_interval_seconds=self.config.request_delay_per_domain_seconds,
69
- )
70
- self.robots_policy = RobotsPolicy(
71
- session=session,
72
- user_agent=self.config.user_agent,
73
- cache_ttl_seconds=self.config.robots_cache_ttl_seconds,
74
- fail_closed=self.config.robots_fail_closed,
75
- max_bytes=self.config.robots_max_bytes,
76
- )
77
-
78
- fetchers = [
79
- asyncio.create_task(self.fetcher_worker(worker_id=i, session=session))
80
- for i in range(self.config.fetch_workers)
81
- ]
82
- parsers = [
83
- asyncio.create_task(self.parser_worker(worker_id=i))
84
- for i in range(self.config.parser_workers)
85
- ]
86
- writer_task = asyncio.create_task(self.writer.consume(self.record_queue))
87
- reporter_task = asyncio.create_task(self.progress_reporter())
88
-
89
- try:
90
- await self.wait_until_complete(writer_task)
91
- await self._graceful_shutdown(fetchers, parsers, writer_task)
92
- except ShardLimitReached:
93
- self.stop_reason = "shard_cap_reached"
94
- self.stop_event.set()
95
- await self._hard_shutdown(fetchers, parsers, writer_task)
96
- finally:
97
- reporter_task.cancel()
98
- with contextlib.suppress(asyncio.CancelledError):
99
- await reporter_task
100
-
101
- def request_stop(self, reason: str = "user_requested_stop") -> None:
102
- if not self.stop_reason:
103
- self.stop_reason = reason
104
- self.stop_event.set()
105
-
106
- async def wait_until_complete(self, writer_task: asyncio.Task[None]) -> None:
107
- while True:
108
- if writer_task.done():
109
- exc = writer_task.exception()
110
- if exc is not None:
111
- raise exc
112
- return
113
-
114
- if self.stop_event.is_set():
115
- if not self.stop_reason:
116
- self.stop_reason = "stop_event_set"
117
- if self._is_pipeline_idle():
118
- return
119
- await asyncio.sleep(0.2)
120
- continue
121
-
122
- if self._is_pipeline_idle():
123
- self.stop_reason = "frontier_exhausted"
124
- return
125
-
126
- await asyncio.sleep(0.5)
127
-
128
- async def _graceful_shutdown(
129
- self,
130
- fetchers: list[asyncio.Task[None]],
131
- parsers: list[asyncio.Task[None]],
132
- writer_task: asyncio.Task[None],
133
- ) -> None:
134
- for _ in fetchers:
135
- await self.fetch_queue.put(None)
136
- await asyncio.gather(*fetchers, return_exceptions=True)
137
-
138
- for _ in parsers:
139
- await self.parse_queue.put(None)
140
- await asyncio.gather(*parsers, return_exceptions=True)
141
-
142
- await self.record_queue.put(None)
143
- await writer_task
144
-
145
- async def _hard_shutdown(
146
- self,
147
- fetchers: list[asyncio.Task[None]],
148
- parsers: list[asyncio.Task[None]],
149
- writer_task: asyncio.Task[None],
150
- ) -> None:
151
- for task in fetchers + parsers:
152
- task.cancel()
153
- await asyncio.gather(*fetchers, *parsers, return_exceptions=True)
154
-
155
- if not writer_task.done():
156
- writer_task.cancel()
157
- await asyncio.gather(writer_task, return_exceptions=True)
158
-
159
- async def progress_reporter(self) -> None:
160
- while True:
161
- await asyncio.sleep(self.config.report_every_seconds)
162
- print(
163
- "[stats]"
164
- f" workers={self.config.total_workers}"
165
- f" split={self.config.fetch_workers}/{self.config.parser_workers}"
166
- f" queued={self.stats.queued_urls}"
167
- f" fetched={self.stats.fetch_reserved}"
168
- f" fetch_ok={self.stats.fetch_succeeded}"
169
- f" fetch_fail={self.stats.fetch_failed}"
170
- f" parsed={self.stats.parsed_pages}"
171
- f" parse_fail={self.stats.parse_failed}"
172
- f" robots_blocked={self.stats.robots_blocked}"
173
- f" rows={self.stats.stored_rows}"
174
- f" shards={self.stats.written_shards}/{self.config.max_shards}"
175
- f" tok_shards={self.stats.tokenized_shards}"
176
- f" tok_rows={self.stats.tokenized_rows}"
177
- f" tok_total={self.stats.tokenized_tokens}"
178
- f" uploaded={self.stats.uploaded_shards}"
179
- f" fetch_q={self.fetch_queue.qsize()}"
180
- f" parse_q={self.parse_queue.qsize()}"
181
- f" record_q={self.record_queue.qsize()}"
182
- )
183
-
184
- async def fetcher_worker(self, worker_id: int, session: aiohttp.ClientSession) -> None:
185
- del worker_id
186
- assert self.rate_limiter is not None
187
- assert self.robots_policy is not None
188
-
189
- while True:
190
- url = await self.fetch_queue.get()
191
- if url is None:
192
- self.fetch_queue.task_done()
193
- return
194
-
195
- slot_reserved = await self.reserve_fetch_slot()
196
- if not slot_reserved:
197
- self.fetch_queue.task_done()
198
- continue
199
-
200
- self.active_fetchers += 1
201
- try:
202
- outcome = await fetch_url(
203
- session,
204
- url,
205
- config=self.config,
206
- mark_seen=self._mark_seen,
207
- rate_limiter=self.rate_limiter,
208
- robots_policy=self.robots_policy,
209
- )
210
- if outcome.robots_blocked:
211
- self.stats.robots_blocked += 1
212
- if outcome.result is not None:
213
- self.stats.fetch_succeeded += 1
214
- if outcome.result.html:
215
- await self.parse_queue.put(outcome.result)
216
- else:
217
- self.stats.fetch_failed += 1
218
- finally:
219
- self.active_fetchers -= 1
220
- self.fetch_queue.task_done()
221
-
222
- async def parser_worker(self, worker_id: int) -> None:
223
- del worker_id
224
-
225
- while True:
226
- item = await self.parse_queue.get()
227
- if item is None:
228
- self.parse_queue.task_done()
229
- return
230
-
231
- self.active_parsers += 1
232
- try:
233
- record, links = parse_page(item)
234
- if record is not None:
235
- await self.record_queue.put(record)
236
- self.stats.parsed_pages += 1
237
-
238
- extracted = 0
239
- for link in links:
240
- if extracted >= self.config.max_links_per_page:
241
- break
242
- if await self.try_enqueue(link):
243
- extracted += 1
244
- self.stats.extracted_links += extracted
245
- except Exception:
246
- self.stats.parse_failed += 1
247
- finally:
248
- self.active_parsers -= 1
249
- self.parse_queue.task_done()
250
-
251
- async def reserve_fetch_slot(self) -> bool:
252
- async with self.counter_lock:
253
- if self.stop_event.is_set():
254
- return False
255
- self.stats.fetch_reserved += 1
256
- return True
257
-
258
- async def try_enqueue(self, raw_url: str) -> bool:
259
- if self.stop_event.is_set():
260
- return False
261
-
262
- normalized = normalize_url(raw_url)
263
- if not normalized:
264
- self.stats.dropped_urls += 1
265
- return False
266
-
267
- if has_binary_extension(normalized):
268
- self.stats.dropped_urls += 1
269
- return False
270
-
271
- async with self.seen_lock:
272
- if self.config.seen_url_cache_size > 0 and normalized in self.seen_urls:
273
- return False
274
- self._remember_seen_locked(normalized)
275
- self.stats.queued_urls += 1
276
-
277
- await self.fetch_queue.put(normalized)
278
- return True
279
-
280
- async def _mark_seen(self, url: str) -> None:
281
- async with self.seen_lock:
282
- self._remember_seen_locked(url)
283
-
284
- def _remember_seen_locked(self, url: str) -> None:
285
- if self.config.seen_url_cache_size <= 0:
286
- return
287
- if url in self.seen_urls:
288
- return
289
-
290
- self.seen_urls.add(url)
291
- self.seen_order.append(url)
292
- while len(self.seen_order) > self.config.seen_url_cache_size:
293
- expired = self.seen_order.popleft()
294
- self.seen_urls.discard(expired)
295
-
296
- def _is_pipeline_idle(self) -> bool:
297
- return (
298
- self.fetch_queue.empty()
299
- and self.parse_queue.empty()
300
- and self.active_fetchers == 0
301
- and self.active_parsers == 0
302
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/fetch.py DELETED
@@ -1,87 +0,0 @@
1
- from __future__ import annotations
2
-
3
- from dataclasses import dataclass
4
- from datetime import datetime, timezone
5
- from typing import Awaitable, Callable
6
- from urllib.parse import urlsplit
7
-
8
- import aiohttp
9
-
10
- from .config import CrawlerConfig
11
- from .models import FetchResult
12
- from .rate_limit import RequestRateLimiter
13
- from .robots import RobotsPolicy
14
- from .utils import is_html_response, normalize_url
15
-
16
-
17
- @dataclass
18
- class FetchOutcome:
19
- result: FetchResult | None
20
- robots_blocked: bool = False
21
-
22
-
23
- async def fetch_url(
24
- session: aiohttp.ClientSession,
25
- url: str,
26
- *,
27
- config: CrawlerConfig,
28
- mark_seen: Callable[[str], Awaitable[None]],
29
- rate_limiter: RequestRateLimiter,
30
- robots_policy: RobotsPolicy,
31
- ) -> FetchOutcome:
32
- fetched_at = datetime.now(timezone.utc).isoformat()
33
- requested_domain = (urlsplit(url).hostname or "").lower().strip(".")
34
- if not requested_domain:
35
- return FetchOutcome(result=None)
36
-
37
- if not await robots_policy.can_fetch(url):
38
- return FetchOutcome(result=None, robots_blocked=True)
39
-
40
- await rate_limiter.acquire(requested_domain)
41
-
42
- try:
43
- async with session.get(url, allow_redirects=True) as response:
44
- content_type = response.headers.get("content-type", "").lower()
45
- final_url = normalize_url(str(response.url))
46
- if not final_url:
47
- return FetchOutcome(result=None)
48
-
49
- final_domain = (urlsplit(final_url).hostname or "").lower().strip(".")
50
- if not final_domain:
51
- return FetchOutcome(result=None)
52
-
53
- if not await robots_policy.can_fetch(final_url):
54
- return FetchOutcome(result=None, robots_blocked=True)
55
-
56
- await mark_seen(final_url)
57
-
58
- if response.status >= 400:
59
- return FetchOutcome(result=None)
60
-
61
- if not is_html_response(content_type, final_url):
62
- return FetchOutcome(
63
- result=FetchResult(
64
- url=final_url,
65
- status=response.status,
66
- fetched_at=fetched_at,
67
- content_type=content_type,
68
- html="",
69
- )
70
- )
71
-
72
- raw = await response.content.read(config.max_response_bytes + 1)
73
- if len(raw) > config.max_response_bytes:
74
- raw = raw[: config.max_response_bytes]
75
- html = raw.decode(response.charset or "utf-8", errors="ignore")
76
-
77
- return FetchOutcome(
78
- result=FetchResult(
79
- url=final_url,
80
- status=response.status,
81
- fetched_at=fetched_at,
82
- content_type=content_type,
83
- html=html,
84
- )
85
- )
86
- except Exception:
87
- return FetchOutcome(result=None)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/models.py DELETED
@@ -1,31 +0,0 @@
1
- from __future__ import annotations
2
-
3
- from dataclasses import dataclass
4
-
5
-
6
- @dataclass
7
- class FetchResult:
8
- url: str
9
- status: int
10
- fetched_at: str
11
- content_type: str
12
- html: str
13
-
14
-
15
- @dataclass
16
- class CrawlStats:
17
- queued_urls: int = 0
18
- fetch_reserved: int = 0
19
- fetch_succeeded: int = 0
20
- fetch_failed: int = 0
21
- parsed_pages: int = 0
22
- parse_failed: int = 0
23
- extracted_links: int = 0
24
- dropped_urls: int = 0
25
- robots_blocked: int = 0
26
- stored_rows: int = 0
27
- written_shards: int = 0
28
- uploaded_shards: int = 0
29
- tokenized_shards: int = 0
30
- tokenized_rows: int = 0
31
- tokenized_tokens: int = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/parse.py DELETED
@@ -1,38 +0,0 @@
1
- from __future__ import annotations
2
-
3
- from typing import Any
4
- from urllib.parse import urljoin, urlsplit
5
-
6
- from bs4 import BeautifulSoup
7
-
8
- from .models import FetchResult
9
-
10
-
11
- def parse_page(item: FetchResult) -> tuple[dict[str, Any] | None, list[str]]:
12
- if not item.html:
13
- return None, []
14
-
15
- soup = BeautifulSoup(item.html, "lxml")
16
-
17
- for tag in soup(["script", "style", "noscript", "svg", "iframe", "canvas"]):
18
- tag.decompose()
19
-
20
- text = soup.get_text(" ", strip=True)
21
- if not text:
22
- return None, []
23
-
24
- links: list[str] = []
25
- for anchor in soup.find_all("a", href=True):
26
- href = anchor.get("href", "").strip()
27
- if not href:
28
- continue
29
- links.append(urljoin(item.url, href))
30
-
31
- domain = (urlsplit(item.url).hostname or "").lower().strip(".")
32
- record = {
33
- "text": text,
34
- "url": item.url,
35
- "domain": domain,
36
- "timestamp": item.fetched_at,
37
- }
38
- return record, links
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/rate_limit.py DELETED
@@ -1,66 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- import time
5
- from typing import Awaitable, Callable
6
-
7
-
8
- class RequestRateLimiter:
9
- def __init__(
10
- self,
11
- global_interval_seconds: float,
12
- per_domain_interval_seconds: float,
13
- *,
14
- clock: Callable[[], float] | None = None,
15
- sleep: Callable[[float], Awaitable[None]] | None = None,
16
- ) -> None:
17
- self.global_interval_seconds = max(0.0, float(global_interval_seconds))
18
- self.per_domain_interval_seconds = max(0.0, float(per_domain_interval_seconds))
19
- self._clock = clock or time.monotonic
20
- self._sleep = sleep or asyncio.sleep
21
-
22
- self._global_lock = asyncio.Lock()
23
- self._global_last: float | None = None
24
-
25
- self._domain_guard = asyncio.Lock()
26
- self._domain_locks: dict[str, asyncio.Lock] = {}
27
- self._domain_last: dict[str, float] = {}
28
-
29
- async def acquire(self, domain: str) -> None:
30
- normalized = domain.lower().strip(".")
31
- await self._acquire_global()
32
- await self._acquire_domain(normalized)
33
-
34
- async def _acquire_global(self) -> None:
35
- if self.global_interval_seconds <= 0:
36
- return
37
-
38
- async with self._global_lock:
39
- now = self._clock()
40
- if self._global_last is not None:
41
- wait = self.global_interval_seconds - (now - self._global_last)
42
- if wait > 0:
43
- await self._sleep(wait)
44
- self._global_last = self._clock()
45
-
46
- async def _acquire_domain(self, domain: str) -> None:
47
- if not domain or self.per_domain_interval_seconds <= 0:
48
- return
49
-
50
- lock = await self._get_domain_lock(domain)
51
- async with lock:
52
- now = self._clock()
53
- last = self._domain_last.get(domain)
54
- if last is not None:
55
- wait = self.per_domain_interval_seconds - (now - last)
56
- if wait > 0:
57
- await self._sleep(wait)
58
- self._domain_last[domain] = self._clock()
59
-
60
- async def _get_domain_lock(self, domain: str) -> asyncio.Lock:
61
- async with self._domain_guard:
62
- lock = self._domain_locks.get(domain)
63
- if lock is None:
64
- lock = asyncio.Lock()
65
- self._domain_locks[domain] = lock
66
- return lock
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/robots.py DELETED
@@ -1,96 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- import time
5
- from dataclasses import dataclass
6
- from urllib.parse import urlsplit
7
- from urllib.robotparser import RobotFileParser
8
-
9
- import aiohttp
10
-
11
-
12
- @dataclass
13
- class _RobotsCacheEntry:
14
- parser: RobotFileParser | None
15
- expires_at: float
16
-
17
-
18
- class RobotsPolicy:
19
- def __init__(
20
- self,
21
- session: aiohttp.ClientSession,
22
- user_agent: str,
23
- *,
24
- cache_ttl_seconds: float = 3600.0,
25
- fail_closed: bool = True,
26
- max_bytes: int = 300_000,
27
- ) -> None:
28
- self.session = session
29
- self.user_agent = user_agent
30
- self.cache_ttl_seconds = max(1.0, float(cache_ttl_seconds))
31
- self.fail_closed = bool(fail_closed)
32
- self.max_bytes = int(max_bytes)
33
-
34
- self._cache: dict[str, _RobotsCacheEntry] = {}
35
- self._cache_lock = asyncio.Lock()
36
- self._origin_locks: dict[str, asyncio.Lock] = {}
37
-
38
- async def can_fetch(self, url: str) -> bool:
39
- parts = urlsplit(url)
40
- host = (parts.hostname or "").lower().strip(".")
41
- scheme = parts.scheme.lower()
42
- if scheme not in {"http", "https"} or not host:
43
- return False
44
-
45
- origin = f"{scheme}://{host}"
46
- parser = await self._get_parser(origin)
47
- if parser is None:
48
- return not self.fail_closed
49
- return parser.can_fetch(self.user_agent, url)
50
-
51
- async def _get_parser(self, origin: str) -> RobotFileParser | None:
52
- now = time.monotonic()
53
- async with self._cache_lock:
54
- cached = self._cache.get(origin)
55
- if cached and cached.expires_at > now:
56
- return cached.parser
57
-
58
- lock = self._origin_locks.get(origin)
59
- if lock is None:
60
- lock = asyncio.Lock()
61
- self._origin_locks[origin] = lock
62
-
63
- async with lock:
64
- now = time.monotonic()
65
- async with self._cache_lock:
66
- cached = self._cache.get(origin)
67
- if cached and cached.expires_at > now:
68
- return cached.parser
69
-
70
- parser = await self._download_and_parse(origin)
71
- async with self._cache_lock:
72
- self._cache[origin] = _RobotsCacheEntry(
73
- parser=parser,
74
- expires_at=time.monotonic() + self.cache_ttl_seconds,
75
- )
76
- return parser
77
-
78
- async def _download_and_parse(self, origin: str) -> RobotFileParser | None:
79
- robots_url = f"{origin}/robots.txt"
80
- try:
81
- async with self.session.get(robots_url, allow_redirects=True) as response:
82
- if response.status >= 400:
83
- return None
84
-
85
- raw = await response.content.read(self.max_bytes + 1)
86
- if len(raw) > self.max_bytes:
87
- raw = raw[: self.max_bytes]
88
- charset = response.charset or "utf-8"
89
- text = raw.decode(charset, errors="ignore")
90
- except Exception:
91
- return None
92
-
93
- parser = RobotFileParser()
94
- parser.set_url(robots_url)
95
- parser.parse(text.splitlines())
96
- return parser
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/shards.py DELETED
@@ -1,128 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- from datetime import datetime, timezone
5
- from pathlib import Path
6
- from typing import Any
7
-
8
- import pyarrow as pa
9
- import pyarrow.parquet as pq
10
-
11
- from .config import CrawlerConfig
12
- from .models import CrawlStats
13
- from .tokenizer import LiveShardTokenizer
14
- from .upload import HfShardUploader
15
-
16
-
17
- class ShardLimitReached(RuntimeError):
18
- pass
19
-
20
-
21
- PARQUET_SCHEMA = pa.schema(
22
- [
23
- ("text", pa.string()),
24
- ("url", pa.string()),
25
- ("domain", pa.string()),
26
- ("timestamp", pa.string()),
27
- ]
28
- )
29
-
30
-
31
- class ParquetShardWriter:
32
- def __init__(self, config: CrawlerConfig, stats: CrawlStats):
33
- self.config = config
34
- self.stats = stats
35
- self.buffer: list[dict[str, Any]] = []
36
- self.shard_index = 0
37
- self.uploader: HfShardUploader | None = None
38
- self.live_tokenizer = LiveShardTokenizer()
39
-
40
- async def initialize(self) -> None:
41
- self.config.output_dir.mkdir(parents=True, exist_ok=True)
42
- if not self.config.enable_hf_upload:
43
- return
44
-
45
- self.uploader = HfShardUploader(
46
- repo_id=self.config.hf_repo_id,
47
- token=self.config.hf_token,
48
- repo_type=self.config.hf_repo_type,
49
- private_repo=self.config.hf_private_repo,
50
- path_prefix=self.config.hf_path_prefix,
51
- )
52
- await self.uploader.initialize()
53
-
54
- async def consume(self, record_queue: asyncio.Queue[dict[str, Any] | None]) -> None:
55
- while True:
56
- item = await record_queue.get()
57
- if item is None:
58
- record_queue.task_done()
59
- break
60
-
61
- try:
62
- self.buffer.append(item)
63
- if len(self.buffer) >= self.config.shard_size_rows:
64
- await self.flush()
65
- finally:
66
- record_queue.task_done()
67
-
68
- if self.buffer:
69
- await self.flush()
70
-
71
- async def flush(self) -> None:
72
- if not self.buffer:
73
- return
74
-
75
- if self.shard_index >= self.config.max_shards:
76
- raise ShardLimitReached(f"Reached shard cap of {self.config.max_shards}.")
77
-
78
- rows = self.buffer
79
- self.buffer = []
80
- normalized_rows = [
81
- {
82
- "text": str(row.get("text", "")),
83
- "url": str(row.get("url", "")),
84
- "domain": str(row.get("domain", "")),
85
- "timestamp": str(row.get("timestamp", "")),
86
- }
87
- for row in rows
88
- if row.get("text")
89
- ]
90
- if not normalized_rows:
91
- return
92
-
93
- timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
94
- shard_name = f"shard-{timestamp}-{self.shard_index:04d}.parquet"
95
- shard_path = self.config.output_dir / shard_name
96
-
97
- table = pa.Table.from_pylist(normalized_rows, schema=PARQUET_SCHEMA)
98
- await asyncio.to_thread(
99
- pq.write_table,
100
- table,
101
- shard_path,
102
- compression=self.config.parquet_compression,
103
- compression_level=self.config.parquet_compression_level,
104
- use_dictionary=True,
105
- )
106
-
107
- self.shard_index += 1
108
- self.stats.written_shards = self.shard_index
109
- self.stats.stored_rows += len(normalized_rows)
110
- token_rows, token_count = await asyncio.to_thread(
111
- self.live_tokenizer.tokenize_shard_text, shard_path
112
- )
113
- self.stats.tokenized_shards += 1
114
- self.stats.tokenized_rows += token_rows
115
- self.stats.tokenized_tokens += token_count
116
-
117
- if self.config.enable_hf_upload:
118
- ok = await self._upload_and_delete(shard_path, rows=len(normalized_rows))
119
- if ok:
120
- self.stats.uploaded_shards += 1
121
-
122
- if self.shard_index >= self.config.max_shards:
123
- raise ShardLimitReached(f"Reached shard cap of {self.config.max_shards}.")
124
-
125
- async def _upload_and_delete(self, shard_path: Path, rows: int) -> bool:
126
- if self.uploader is None:
127
- raise RuntimeError("Uploader not initialized.")
128
- return await self.uploader.upload_and_delete(shard_path, rows)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/tokenizer.py DELETED
@@ -1,26 +0,0 @@
1
- from __future__ import annotations
2
-
3
- from pathlib import Path
4
-
5
- import pyarrow.parquet as pq
6
- import tiktoken
7
-
8
-
9
- class LiveShardTokenizer:
10
- def __init__(self, encoding_name: str = "cl100k_base") -> None:
11
- self.encoding = tiktoken.get_encoding(encoding_name)
12
-
13
- def tokenize_shard_text(self, shard_path: Path) -> tuple[int, int]:
14
- table = pq.read_table(shard_path, columns=["text"])
15
- if "text" not in table.column_names:
16
- return 0, 0
17
-
18
- rows = 0
19
- token_count = 0
20
- for value in table.column("text").to_pylist():
21
- if value is None:
22
- continue
23
- text = str(value)
24
- rows += 1
25
- token_count += len(self.encoding.encode(text, disallowed_special=()))
26
- return rows, token_count
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/upload.py DELETED
@@ -1,60 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import asyncio
4
- import contextlib
5
- from pathlib import Path
6
-
7
- from huggingface_hub import HfApi
8
-
9
-
10
- class HfShardUploader:
11
- def __init__(
12
- self,
13
- *,
14
- repo_id: str,
15
- token: str,
16
- repo_type: str = "dataset",
17
- private_repo: bool = False,
18
- path_prefix: str = "crawl_shards",
19
- ) -> None:
20
- self.repo_id = repo_id.strip()
21
- self.token = token.strip()
22
- self.repo_type = repo_type
23
- self.private_repo = bool(private_repo)
24
- self.path_prefix = path_prefix.strip("/")
25
- self.api: HfApi | None = None
26
-
27
- async def initialize(self) -> None:
28
- self.api = HfApi(token=self.token or None)
29
- await asyncio.to_thread(
30
- self.api.create_repo,
31
- repo_id=self.repo_id,
32
- repo_type=self.repo_type,
33
- private=self.private_repo,
34
- exist_ok=True,
35
- )
36
-
37
- async def upload_and_delete(self, shard_path: Path, rows: int) -> bool:
38
- if self.api is None:
39
- raise RuntimeError("Uploader was not initialized.")
40
-
41
- if self.path_prefix:
42
- path_in_repo = f"{self.path_prefix}/{shard_path.name}"
43
- else:
44
- path_in_repo = shard_path.name
45
-
46
- try:
47
- await asyncio.to_thread(
48
- self.api.upload_file,
49
- path_or_fileobj=str(shard_path),
50
- path_in_repo=path_in_repo,
51
- repo_id=self.repo_id,
52
- repo_type=self.repo_type,
53
- commit_message=f"Add crawl shard {shard_path.name} ({rows} rows)",
54
- )
55
- except Exception:
56
- return False
57
-
58
- with contextlib.suppress(FileNotFoundError):
59
- shard_path.unlink()
60
- return True
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
crawler/utils.py DELETED
@@ -1,124 +0,0 @@
1
- from __future__ import annotations
2
-
3
- import re
4
- from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
5
-
6
- BINARY_EXTENSIONS = {
7
- ".7z",
8
- ".avi",
9
- ".bin",
10
- ".bz2",
11
- ".csv",
12
- ".doc",
13
- ".docx",
14
- ".epub",
15
- ".gif",
16
- ".gz",
17
- ".ico",
18
- ".jpeg",
19
- ".jpg",
20
- ".json",
21
- ".m4a",
22
- ".m4v",
23
- ".mov",
24
- ".mp3",
25
- ".mp4",
26
- ".mpeg",
27
- ".ogg",
28
- ".pdf",
29
- ".png",
30
- ".ppt",
31
- ".pptx",
32
- ".rar",
33
- ".svg",
34
- ".tar",
35
- ".tgz",
36
- ".tif",
37
- ".tiff",
38
- ".wav",
39
- ".webm",
40
- ".webp",
41
- ".xls",
42
- ".xlsx",
43
- ".xml",
44
- ".xz",
45
- ".zip",
46
- }
47
-
48
- TRACKING_QUERY_KEYS = {
49
- "fbclid",
50
- "gclid",
51
- "mc_cid",
52
- "mc_eid",
53
- "ref",
54
- "source",
55
- "spm",
56
- "yclid",
57
- }
58
-
59
-
60
- def normalize_url(raw_url: str) -> str | None:
61
- try:
62
- parts = urlsplit(raw_url.strip())
63
- except ValueError:
64
- return None
65
-
66
- scheme = parts.scheme.lower()
67
- if scheme not in {"http", "https"}:
68
- return None
69
-
70
- host = (parts.hostname or "").lower().strip(".")
71
- if not host:
72
- return None
73
-
74
- try:
75
- port = parts.port
76
- except ValueError:
77
- return None
78
-
79
- if (scheme == "http" and port == 80) or (scheme == "https" and port == 443):
80
- netloc = host
81
- elif port:
82
- netloc = f"{host}:{port}"
83
- else:
84
- netloc = host
85
-
86
- path = parts.path or "/"
87
- path = re.sub(r"/{2,}", "/", path)
88
-
89
- query_pairs: list[tuple[str, str]] = []
90
- for key, value in parse_qsl(parts.query, keep_blank_values=True):
91
- lowered = key.lower()
92
- if lowered.startswith("utm_") or lowered in TRACKING_QUERY_KEYS:
93
- continue
94
- query_pairs.append((key, value))
95
- query = urlencode(query_pairs, doseq=True)
96
-
97
- return urlunsplit((scheme, netloc, path, query, ""))
98
-
99
-
100
- def has_binary_extension(url: str) -> bool:
101
- path = urlsplit(url).path.lower()
102
- if not path:
103
- return False
104
-
105
- dot_index = path.rfind(".")
106
- if dot_index == -1:
107
- return False
108
-
109
- return path[dot_index:] in BINARY_EXTENSIONS
110
-
111
-
112
- def is_html_response(content_type: str, final_url: str) -> bool:
113
- if has_binary_extension(final_url):
114
- return False
115
-
116
- if not content_type:
117
- return True
118
-
119
- lowered = content_type.lower()
120
- return (
121
- "text/html" in lowered
122
- or "application/xhtml+xml" in lowered
123
- or "text/plain" in lowered
124
- )