Roman190928 commited on
Commit
378a0c0
·
verified ·
1 Parent(s): 14c0877

Update crawler: incomplete shard upload + UI options

Browse files
Files changed (4) hide show
  1. app.py +127 -6
  2. crawler/config.py +7 -1
  3. crawler/engine.py +14 -0
  4. crawler/shards.py +32 -1
app.py CHANGED
@@ -6,6 +6,7 @@ import contextlib
6
  import inspect
7
  import threading
8
  import traceback
 
9
  from dataclasses import dataclass
10
  from datetime import datetime, timezone
11
  from html import escape
@@ -111,6 +112,61 @@ APP_CSS = """
111
  --border: #2a5d36;
112
  }
113
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
  .gradio-container {
115
  background:
116
  radial-gradient(1200px 550px at 8% 0%, color-mix(in srgb, var(--accent) 18%, transparent), transparent),
@@ -404,6 +460,8 @@ def build_crawler_config(
404
  max_response_bytes: int,
405
  shard_size_rows: int,
406
  enable_hf_upload: bool,
 
 
407
  hf_repo_id: str,
408
  hf_token: str,
409
  hf_private_repo: bool,
@@ -421,6 +479,8 @@ def build_crawler_config(
421
  shard_size_rows=int(shard_size_rows),
422
  output_dir=Path(__file__).resolve().parent / "shards",
423
  enable_hf_upload=bool(enable_hf_upload),
 
 
424
  hf_repo_id=hf_repo_id.strip(),
425
  hf_token=hf_token.strip(),
426
  hf_private_repo=bool(hf_private_repo),
@@ -446,7 +506,7 @@ class CrawlerRunManager:
446
  self._loop: asyncio.AbstractEventLoop | None = None
447
  self._crawler: AsyncCrawler | None = None
448
  self._state = RunState()
449
- self._logs: list[str] = []
450
  self._last_snapshot: dict[str, Any] | None = None
451
 
452
  def start(self, config: CrawlerConfig) -> str:
@@ -594,7 +654,7 @@ class CrawlerRunManager:
594
  "record_queue": 0,
595
  "stop_event": False,
596
  }
597
- logs_text = "\n".join(self._logs[-500:])
598
 
599
  status_lines = [
600
  "### Crawler Status",
@@ -638,6 +698,8 @@ def _start_crawl(
638
  max_response_bytes: int,
639
  shard_size_rows: int,
640
  enable_hf_upload: bool,
 
 
641
  hf_repo_id: str,
642
  hf_token: str,
643
  hf_private_repo: bool,
@@ -651,6 +713,8 @@ def _start_crawl(
651
  max_response_bytes=max_response_bytes,
652
  shard_size_rows=shard_size_rows,
653
  enable_hf_upload=enable_hf_upload,
 
 
654
  hf_repo_id=hf_repo_id,
655
  hf_token=hf_token,
656
  hf_private_repo=hf_private_repo,
@@ -672,6 +736,8 @@ def start_crawl_standard(
672
  max_response_bytes: int,
673
  shard_size_rows: int,
674
  enable_hf_upload: bool,
 
 
675
  hf_repo_id: str,
676
  hf_token: str,
677
  hf_private_repo: bool,
@@ -685,6 +751,8 @@ def start_crawl_standard(
685
  max_response_bytes=max_response_bytes,
686
  shard_size_rows=shard_size_rows,
687
  enable_hf_upload=enable_hf_upload,
 
 
688
  hf_repo_id=hf_repo_id,
689
  hf_token=hf_token,
690
  hf_private_repo=hf_private_repo,
@@ -699,6 +767,8 @@ def start_crawl_super(
699
  max_response_bytes: int,
700
  shard_size_rows: int,
701
  enable_hf_upload: bool,
 
 
702
  hf_repo_id: str,
703
  hf_token: str,
704
  hf_private_repo: bool,
@@ -712,6 +782,8 @@ def start_crawl_super(
712
  max_response_bytes=max_response_bytes,
713
  shard_size_rows=shard_size_rows,
714
  enable_hf_upload=enable_hf_upload,
 
 
715
  hf_repo_id=hf_repo_id,
716
  hf_token=hf_token,
717
  hf_private_repo=hf_private_repo,
@@ -730,9 +802,16 @@ def poll_dashboard() -> tuple[str, str, str, str]:
730
  return _format_dashboard_response(status, snapshot, logs)
731
 
732
 
733
- def toggle_hf_fields(enable_hf_upload: bool) -> tuple[Any, Any, Any, Any]:
734
  update = gr.update(visible=enable_hf_upload)
735
- return update, update, update, update
 
 
 
 
 
 
 
736
 
737
 
738
  def build_ui() -> gr.Blocks:
@@ -752,7 +831,18 @@ def build_ui() -> gr.Blocks:
752
 
753
  with gr.Row():
754
  theme_name = gr.Dropdown(
755
- choices=["red", "blue", "light", "dark", "green"],
 
 
 
 
 
 
 
 
 
 
 
756
  value="dark",
757
  label="Theme",
758
  interactive=True,
@@ -838,6 +928,19 @@ def build_ui() -> gr.Blocks:
838
  value="crawl_shards",
839
  visible=False,
840
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
841
 
842
  with gr.Row():
843
  start_button = gr.Button("Start Crawl (12 Threads)", variant="primary")
@@ -856,6 +959,8 @@ def build_ui() -> gr.Blocks:
856
  max_response_bytes,
857
  shard_size_rows,
858
  enable_hf_upload,
 
 
859
  hf_repo_id,
860
  hf_token,
861
  hf_private_repo,
@@ -871,7 +976,23 @@ def build_ui() -> gr.Blocks:
871
  enable_hf_upload.change(
872
  toggle_hf_fields,
873
  inputs=enable_hf_upload,
874
- outputs=[hf_repo_id, hf_token, hf_private_repo, hf_path_prefix],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
875
  )
876
 
877
  seed_urls_table.change(
 
6
  import inspect
7
  import threading
8
  import traceback
9
+ from collections import deque
10
  from dataclasses import dataclass
11
  from datetime import datetime, timezone
12
  from html import escape
 
112
  --border: #2a5d36;
113
  }
114
 
115
+ :root[data-crawler-theme="sunset"] {
116
+ --bg-main: #1c0f0b;
117
+ --bg-surface: #2f1810;
118
+ --bg-panel: #422015;
119
+ --text-main: #ffeede;
120
+ --text-muted: #d9b59d;
121
+ --accent: #ff7f3f;
122
+ --accent-2: #ff4f81;
123
+ --border: #6e3525;
124
+ }
125
+
126
+ :root[data-crawler-theme="ocean"] {
127
+ --bg-main: #04131d;
128
+ --bg-surface: #092230;
129
+ --bg-panel: #0e3144;
130
+ --text-main: #e8fbff;
131
+ --text-muted: #9fc3cf;
132
+ --accent: #2cd9ff;
133
+ --accent-2: #38ffcb;
134
+ --border: #1b5062;
135
+ }
136
+
137
+ :root[data-crawler-theme="amber"] {
138
+ --bg-main: #171104;
139
+ --bg-surface: #2a1d07;
140
+ --bg-panel: #3a2a0a;
141
+ --text-main: #fff6dc;
142
+ --text-muted: #d2bd84;
143
+ --accent: #ffb300;
144
+ --accent-2: #ffd54f;
145
+ --border: #6b4d12;
146
+ }
147
+
148
+ :root[data-crawler-theme="graphite"] {
149
+ --bg-main: #0f1012;
150
+ --bg-surface: #1a1d21;
151
+ --bg-panel: #242a30;
152
+ --text-main: #f1f3f6;
153
+ --text-muted: #adb5bf;
154
+ --accent: #8fa0b7;
155
+ --accent-2: #d7dce2;
156
+ --border: #3a424c;
157
+ }
158
+
159
+ :root[data-crawler-theme="mint"] {
160
+ --bg-main: #06140f;
161
+ --bg-surface: #0b2319;
162
+ --bg-panel: #123125;
163
+ --text-main: #e8fff4;
164
+ --text-muted: #a4d8bf;
165
+ --accent: #3dffb2;
166
+ --accent-2: #7df9d1;
167
+ --border: #256347;
168
+ }
169
+
170
  .gradio-container {
171
  background:
172
  radial-gradient(1200px 550px at 8% 0%, color-mix(in srgb, var(--accent) 18%, transparent), transparent),
 
460
  max_response_bytes: int,
461
  shard_size_rows: int,
462
  enable_hf_upload: bool,
463
+ upload_incomplete_shards: bool,
464
+ incomplete_shard_flush_seconds: float,
465
  hf_repo_id: str,
466
  hf_token: str,
467
  hf_private_repo: bool,
 
479
  shard_size_rows=int(shard_size_rows),
480
  output_dir=Path(__file__).resolve().parent / "shards",
481
  enable_hf_upload=bool(enable_hf_upload),
482
+ upload_incomplete_shards=bool(upload_incomplete_shards),
483
+ incomplete_shard_flush_seconds=float(incomplete_shard_flush_seconds),
484
  hf_repo_id=hf_repo_id.strip(),
485
  hf_token=hf_token.strip(),
486
  hf_private_repo=bool(hf_private_repo),
 
506
  self._loop: asyncio.AbstractEventLoop | None = None
507
  self._crawler: AsyncCrawler | None = None
508
  self._state = RunState()
509
+ self._logs: deque[str] = deque(maxlen=600)
510
  self._last_snapshot: dict[str, Any] | None = None
511
 
512
  def start(self, config: CrawlerConfig) -> str:
 
654
  "record_queue": 0,
655
  "stop_event": False,
656
  }
657
+ logs_text = "\n".join(self._logs)
658
 
659
  status_lines = [
660
  "### Crawler Status",
 
698
  max_response_bytes: int,
699
  shard_size_rows: int,
700
  enable_hf_upload: bool,
701
+ upload_incomplete_shards: bool,
702
+ incomplete_shard_flush_seconds: float,
703
  hf_repo_id: str,
704
  hf_token: str,
705
  hf_private_repo: bool,
 
713
  max_response_bytes=max_response_bytes,
714
  shard_size_rows=shard_size_rows,
715
  enable_hf_upload=enable_hf_upload,
716
+ upload_incomplete_shards=upload_incomplete_shards,
717
+ incomplete_shard_flush_seconds=incomplete_shard_flush_seconds,
718
  hf_repo_id=hf_repo_id,
719
  hf_token=hf_token,
720
  hf_private_repo=hf_private_repo,
 
736
  max_response_bytes: int,
737
  shard_size_rows: int,
738
  enable_hf_upload: bool,
739
+ upload_incomplete_shards: bool,
740
+ incomplete_shard_flush_seconds: float,
741
  hf_repo_id: str,
742
  hf_token: str,
743
  hf_private_repo: bool,
 
751
  max_response_bytes=max_response_bytes,
752
  shard_size_rows=shard_size_rows,
753
  enable_hf_upload=enable_hf_upload,
754
+ upload_incomplete_shards=upload_incomplete_shards,
755
+ incomplete_shard_flush_seconds=incomplete_shard_flush_seconds,
756
  hf_repo_id=hf_repo_id,
757
  hf_token=hf_token,
758
  hf_private_repo=hf_private_repo,
 
767
  max_response_bytes: int,
768
  shard_size_rows: int,
769
  enable_hf_upload: bool,
770
+ upload_incomplete_shards: bool,
771
+ incomplete_shard_flush_seconds: float,
772
  hf_repo_id: str,
773
  hf_token: str,
774
  hf_private_repo: bool,
 
782
  max_response_bytes=max_response_bytes,
783
  shard_size_rows=shard_size_rows,
784
  enable_hf_upload=enable_hf_upload,
785
+ upload_incomplete_shards=upload_incomplete_shards,
786
+ incomplete_shard_flush_seconds=incomplete_shard_flush_seconds,
787
  hf_repo_id=hf_repo_id,
788
  hf_token=hf_token,
789
  hf_private_repo=hf_private_repo,
 
802
  return _format_dashboard_response(status, snapshot, logs)
803
 
804
 
805
+ def toggle_hf_fields(enable_hf_upload: bool) -> tuple[Any, Any, Any, Any, Any]:
806
  update = gr.update(visible=enable_hf_upload)
807
+ return update, update, update, update, update
808
+
809
+
810
+ def toggle_incomplete_flush_field(
811
+ enable_hf_upload: bool,
812
+ upload_incomplete_shards: bool,
813
+ ) -> Any:
814
+ return gr.update(visible=bool(enable_hf_upload and upload_incomplete_shards))
815
 
816
 
817
  def build_ui() -> gr.Blocks:
 
831
 
832
  with gr.Row():
833
  theme_name = gr.Dropdown(
834
+ choices=[
835
+ "red",
836
+ "blue",
837
+ "light",
838
+ "dark",
839
+ "green",
840
+ "sunset",
841
+ "ocean",
842
+ "amber",
843
+ "graphite",
844
+ "mint",
845
+ ],
846
  value="dark",
847
  label="Theme",
848
  interactive=True,
 
928
  value="crawl_shards",
929
  visible=False,
930
  )
931
+ upload_incomplete_shards = gr.Checkbox(
932
+ label="Upload incomplete shard buffers",
933
+ value=False,
934
+ visible=False,
935
+ )
936
+ incomplete_shard_flush_seconds = gr.Slider(
937
+ label="Incomplete Upload Flush Interval (seconds)",
938
+ minimum=5,
939
+ maximum=300,
940
+ step=1,
941
+ value=int(defaults.incomplete_shard_flush_seconds),
942
+ visible=False,
943
+ )
944
 
945
  with gr.Row():
946
  start_button = gr.Button("Start Crawl (12 Threads)", variant="primary")
 
959
  max_response_bytes,
960
  shard_size_rows,
961
  enable_hf_upload,
962
+ upload_incomplete_shards,
963
+ incomplete_shard_flush_seconds,
964
  hf_repo_id,
965
  hf_token,
966
  hf_private_repo,
 
976
  enable_hf_upload.change(
977
  toggle_hf_fields,
978
  inputs=enable_hf_upload,
979
+ outputs=[
980
+ hf_repo_id,
981
+ hf_token,
982
+ hf_private_repo,
983
+ hf_path_prefix,
984
+ upload_incomplete_shards,
985
+ ],
986
+ )
987
+ enable_hf_upload.change(
988
+ toggle_incomplete_flush_field,
989
+ inputs=[enable_hf_upload, upload_incomplete_shards],
990
+ outputs=[incomplete_shard_flush_seconds],
991
+ )
992
+ upload_incomplete_shards.change(
993
+ toggle_incomplete_flush_field,
994
+ inputs=[enable_hf_upload, upload_incomplete_shards],
995
+ outputs=[incomplete_shard_flush_seconds],
996
  )
997
 
998
  seed_urls_table.change(
crawler/config.py CHANGED
@@ -51,6 +51,8 @@ class CrawlerConfig:
51
  parquet_compression_level: int = 9
52
 
53
  enable_hf_upload: bool = False
 
 
54
  hf_repo_id: str = ""
55
  hf_token: str = ""
56
  hf_repo_type: str = "dataset"
@@ -59,7 +61,8 @@ class CrawlerConfig:
59
 
60
  total_workers: int = NORMAL_TOTAL_WORKERS
61
  request_delay_global_seconds: float = 0.02
62
- request_delay_per_domain_seconds: float = 2.0
 
63
 
64
  robots_cache_ttl_seconds: float = 3600.0
65
  robots_fail_closed: bool = True
@@ -89,6 +92,9 @@ class CrawlerConfig:
89
  self.hf_repo_id = self.hf_repo_id.strip()
90
  self.hf_token = self.hf_token.strip()
91
  self.hf_path_prefix = self.hf_path_prefix.strip() or "crawl_shards"
 
 
 
92
 
93
  if self.enable_hf_upload:
94
  if not self.hf_repo_id:
 
51
  parquet_compression_level: int = 9
52
 
53
  enable_hf_upload: bool = False
54
+ upload_incomplete_shards: bool = False
55
+ incomplete_shard_flush_seconds: float = 30.0
56
  hf_repo_id: str = ""
57
  hf_token: str = ""
58
  hf_repo_type: str = "dataset"
 
61
 
62
  total_workers: int = NORMAL_TOTAL_WORKERS
63
  request_delay_global_seconds: float = 0.02
64
+ request_delay_per_domain_seconds: float = 0.0
65
+ same_site_delay_per_worker_seconds: float = 0.5
66
 
67
  robots_cache_ttl_seconds: float = 3600.0
68
  robots_fail_closed: bool = True
 
92
  self.hf_repo_id = self.hf_repo_id.strip()
93
  self.hf_token = self.hf_token.strip()
94
  self.hf_path_prefix = self.hf_path_prefix.strip() or "crawl_shards"
95
+ self.incomplete_shard_flush_seconds = float(self.incomplete_shard_flush_seconds)
96
+ if self.incomplete_shard_flush_seconds <= 0:
97
+ raise ValueError("incomplete_shard_flush_seconds must be > 0.")
98
 
99
  if self.enable_hf_upload:
100
  if not self.hf_repo_id:
crawler/engine.py CHANGED
@@ -4,6 +4,7 @@ import asyncio
4
  import contextlib
5
  from collections import deque
6
  from typing import Any
 
7
 
8
  import aiohttp
9
 
@@ -185,6 +186,9 @@ class AsyncCrawler:
185
  del worker_id
186
  assert self.rate_limiter is not None
187
  assert self.robots_policy is not None
 
 
 
188
 
189
  while True:
190
  url = await self.fetch_queue.get()
@@ -199,6 +203,16 @@ class AsyncCrawler:
199
 
200
  self.active_fetchers += 1
201
  try:
 
 
 
 
 
 
 
 
 
 
202
  outcome = await fetch_url(
203
  session,
204
  url,
 
4
  import contextlib
5
  from collections import deque
6
  from typing import Any
7
+ from urllib.parse import urlsplit
8
 
9
  import aiohttp
10
 
 
186
  del worker_id
187
  assert self.rate_limiter is not None
188
  assert self.robots_policy is not None
189
+ loop = asyncio.get_running_loop()
190
+ last_domain = ""
191
+ last_request_started = 0.0
192
 
193
  while True:
194
  url = await self.fetch_queue.get()
 
203
 
204
  self.active_fetchers += 1
205
  try:
206
+ requested_domain = (urlsplit(url).hostname or "").lower().strip(".")
207
+ if requested_domain and requested_domain == last_domain:
208
+ elapsed = loop.time() - last_request_started
209
+ wait = self.config.same_site_delay_per_worker_seconds - elapsed
210
+ if wait > 0:
211
+ await asyncio.sleep(wait)
212
+ if requested_domain:
213
+ last_domain = requested_domain
214
+ last_request_started = loop.time()
215
+
216
  outcome = await fetch_url(
217
  session,
218
  url,
crawler/shards.py CHANGED
@@ -52,8 +52,33 @@ class ParquetShardWriter:
52
  await self.uploader.initialize()
53
 
54
  async def consume(self, record_queue: asyncio.Queue[dict[str, Any] | None]) -> None:
 
 
 
 
 
55
  while True:
56
- item = await record_queue.get()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
  if item is None:
58
  record_queue.task_done()
59
  break
@@ -62,6 +87,12 @@ class ParquetShardWriter:
62
  self.buffer.append(item)
63
  if len(self.buffer) >= self.config.shard_size_rows:
64
  await self.flush()
 
 
 
 
 
 
65
  finally:
66
  record_queue.task_done()
67
 
 
52
  await self.uploader.initialize()
53
 
54
  async def consume(self, record_queue: asyncio.Queue[dict[str, Any] | None]) -> None:
55
+ allow_incomplete_upload = (
56
+ self.config.enable_hf_upload and self.config.upload_incomplete_shards
57
+ )
58
+ loop = asyncio.get_running_loop()
59
+ last_flush_started = loop.time()
60
  while True:
61
+ try:
62
+ if allow_incomplete_upload:
63
+ timeout = self.config.incomplete_shard_flush_seconds
64
+ if self.buffer:
65
+ elapsed = loop.time() - last_flush_started
66
+ timeout = max(
67
+ 0.0,
68
+ self.config.incomplete_shard_flush_seconds - elapsed,
69
+ )
70
+ item = await asyncio.wait_for(
71
+ record_queue.get(),
72
+ timeout=timeout,
73
+ )
74
+ else:
75
+ item = await record_queue.get()
76
+ except asyncio.TimeoutError:
77
+ if self.buffer:
78
+ await self.flush()
79
+ last_flush_started = loop.time()
80
+ continue
81
+
82
  if item is None:
83
  record_queue.task_done()
84
  break
 
87
  self.buffer.append(item)
88
  if len(self.buffer) >= self.config.shard_size_rows:
89
  await self.flush()
90
+ last_flush_started = loop.time()
91
+ elif allow_incomplete_upload and self.buffer:
92
+ elapsed = loop.time() - last_flush_started
93
+ if elapsed >= self.config.incomplete_shard_flush_seconds:
94
+ await self.flush()
95
+ last_flush_started = loop.time()
96
  finally:
97
  record_queue.task_done()
98