Karim shoair commited on
Commit
7145885
·
1 Parent(s): aa7527f

feat(spiders): Add a stream mode to collect items in realtime

Browse files
scrapling/spiders/engine.py CHANGED
@@ -1,14 +1,14 @@
1
  import json
2
 
3
  import anyio
4
- from anyio import create_task_group, CapacityLimiter
5
 
6
  from scrapling.core.utils import log
7
  from scrapling.spiders.request import Request
8
  from scrapling.spiders.result import CrawlStats, ItemList
9
  from scrapling.spiders.scheduler import Scheduler
10
  from scrapling.spiders.session import SessionManager
11
- from scrapling.core._types import Dict, TYPE_CHECKING, Any
12
 
13
  if TYPE_CHECKING:
14
  from scrapling.spiders.spider import Spider
@@ -34,6 +34,7 @@ class CrawlerEngine:
34
  self._active_tasks: int = 0
35
  self._running: bool = False
36
  self._items: ItemList = ItemList()
 
37
 
38
  def _is_domain_allowed(self, request: Request) -> bool:
39
  """Check if the request's domain is in allowed_domains."""
@@ -102,6 +103,8 @@ class CrawlerEngine:
102
  log.debug(f"Filtered offsite request to: {result.url}")
103
  elif isinstance(result, dict):
104
  await self._handle_item(result)
 
 
105
  log.debug(f"Scraped from {str(response)}\n{result}")
106
  except Exception as e:
107
  await self.spider.on_error(request, e)
@@ -164,3 +167,25 @@ class CrawlerEngine:
164
  def items(self) -> ItemList:
165
  """Access scraped items."""
166
  return self._items
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
 
3
  import anyio
4
+ from anyio import create_task_group, CapacityLimiter, create_memory_object_stream, EndOfStream
5
 
6
  from scrapling.core.utils import log
7
  from scrapling.spiders.request import Request
8
  from scrapling.spiders.result import CrawlStats, ItemList
9
  from scrapling.spiders.scheduler import Scheduler
10
  from scrapling.spiders.session import SessionManager
11
+ from scrapling.core._types import Dict, TYPE_CHECKING, Any, AsyncGenerator
12
 
13
  if TYPE_CHECKING:
14
  from scrapling.spiders.spider import Spider
 
34
  self._active_tasks: int = 0
35
  self._running: bool = False
36
  self._items: ItemList = ItemList()
37
+ self._item_stream: Any = None
38
 
39
  def _is_domain_allowed(self, request: Request) -> bool:
40
  """Check if the request's domain is in allowed_domains."""
 
103
  log.debug(f"Filtered offsite request to: {result.url}")
104
  elif isinstance(result, dict):
105
  await self._handle_item(result)
106
+ if self._item_stream:
107
+ await self._item_stream.send(result)
108
  log.debug(f"Scraped from {str(response)}\n{result}")
109
  except Exception as e:
110
  await self.spider.on_error(request, e)
 
167
  def items(self) -> ItemList:
168
  """Access scraped items."""
169
  return self._items
170
+
171
+ def __aiter__(self) -> AsyncGenerator[dict, None]:
172
+ return self._stream()
173
+
174
+ async def _stream(self) -> AsyncGenerator[dict, None]:
175
+ """Async generator that runs crawl and yields items."""
176
+ send, recv = create_memory_object_stream[dict](100)
177
+ self._item_stream = send
178
+
179
+ async def run():
180
+ try:
181
+ await self.crawl()
182
+ finally:
183
+ await send.aclose()
184
+
185
+ async with create_task_group() as tg:
186
+ tg.start_soon(run)
187
+ try:
188
+ async for item in recv:
189
+ yield item
190
+ except EndOfStream:
191
+ pass
scrapling/spiders/spider.py CHANGED
@@ -1,14 +1,14 @@
1
  import logging
2
- from pathlib import Path
3
  from abc import ABC
 
4
 
5
  import anyio
6
 
7
  from scrapling.spiders.request import Request
8
- from scrapling.spiders.result import CrawlResult
9
  from scrapling.spiders.engine import CrawlerEngine
10
  from scrapling.spiders.session import SessionManager
11
  from scrapling.core.utils import set_logger, reset_logger
 
12
  from scrapling.core._types import Set, Any, Dict, Optional, TYPE_CHECKING, AsyncGenerator
13
 
14
  BLOCKED_CODES = {401, 403, 407, 429, 444, 500, 502, 503, 504}
@@ -108,6 +108,8 @@ class Spider(ABC):
108
  self.logger.addHandler(file_handler)
109
 
110
  self._session_manager = SessionManager()
 
 
111
  try:
112
  self.configure_sessions(self._session_manager)
113
  except Exception as e:
@@ -211,3 +213,29 @@ class Spider(ABC):
211
  if use_uvloop:
212
  backend_options.update({"use_uvloop": True})
213
  return anyio.run(self.__run, backend="asyncio", backend_options=backend_options)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import logging
 
2
  from abc import ABC
3
+ from pathlib import Path
4
 
5
  import anyio
6
 
7
  from scrapling.spiders.request import Request
 
8
  from scrapling.spiders.engine import CrawlerEngine
9
  from scrapling.spiders.session import SessionManager
10
  from scrapling.core.utils import set_logger, reset_logger
11
+ from scrapling.spiders.result import CrawlResult, CrawlStats
12
  from scrapling.core._types import Set, Any, Dict, Optional, TYPE_CHECKING, AsyncGenerator
13
 
14
  BLOCKED_CODES = {401, 403, 407, 429, 444, 500, 502, 503, 504}
 
108
  self.logger.addHandler(file_handler)
109
 
110
  self._session_manager = SessionManager()
111
+ self._stream_engine: CrawlerEngine | None = None
112
+
113
  try:
114
  self.configure_sessions(self._session_manager)
115
  except Exception as e:
 
213
  if use_uvloop:
214
  backend_options.update({"use_uvloop": True})
215
  return anyio.run(self.__run, backend="asyncio", backend_options=backend_options)
216
+
217
+ async def stream(self) -> AsyncGenerator[Dict[str, Any], None]:
218
+ """Stream items as they're scraped. Ideal for long-running spiders or building applications on top of the spiders.
219
+
220
+ Must be called from an async context. Yields items one by one as they are scraped.
221
+ Access `spider.stats` during iteration for real-time statistics.
222
+ """
223
+ token = set_logger(self.logger)
224
+ try:
225
+ self._stream_engine = CrawlerEngine(self, self._session_manager)
226
+ async for item in self._stream_engine:
227
+ yield item
228
+ finally:
229
+ self._stream_engine = None
230
+ reset_logger(token)
231
+ if self.log_file:
232
+ for handler in self.logger.handlers:
233
+ if isinstance(handler, logging.FileHandler):
234
+ handler.close()
235
+
236
+ @property
237
+ def stats(self) -> CrawlStats:
238
+ """Access current crawl stats (works during streaming)."""
239
+ if self._stream_engine:
240
+ return self._stream_engine.stats
241
+ raise RuntimeError("No active crawl. Use this property inside `async for item in spider.stream():`")