Soham Waghmare commited on
Commit
9635653
·
1 Parent(s): 9155a62

feat: concurrent ws conns, parallel scraping, better image extraction

Browse files
Files changed (4) hide show
  1. backend/app.py +38 -10
  2. backend/crawl_ai.py +31 -18
  3. backend/knet.py +5 -2
  4. backend/scraper.py +46 -38
backend/app.py CHANGED
@@ -1,7 +1,8 @@
1
- # pip install asyncio eventlet
2
  # pip install google-genai beautifulsoup4 selenium newspaper3k lxml_html_clean
3
  import json
4
  import logging
 
5
 
6
  import socketio
7
  from dotenv import load_dotenv
@@ -20,26 +21,49 @@ logger = logging.getLogger(__name__)
20
 
21
  app = FastAPI()
22
  app.add_middleware(
23
- CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"]
 
 
 
 
24
  )
25
 
26
  sio = socketio.AsyncServer(cors_allowed_origins="*", ping_timeout=60, ping_interval=10, async_mode="asgi")
27
  app.mount("/", socketio.ASGIApp(sio))
28
 
29
- # Initialize the scraper and KNet
30
- scraper_instance = CrawlForAIScraper()
31
- # scraper_instance = WebScraper()
32
- knet = KNet(scraper_instance)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
 
34
 
35
  @sio.event
36
- def connect(sid, environ, auth):
37
  logger.info(f"Client connected: {sid}")
 
38
 
39
 
40
  @sio.event
41
- def disconnect(sid, reason):
42
  logger.info(f"Client disconnected: {sid}")
 
43
 
44
 
45
  @sio.event
@@ -50,6 +74,8 @@ async def health_check(sid, data):
50
 
51
  @sio.event
52
  async def start_research(sid, data):
 
 
53
  try:
54
  data = json.loads(data) if type(data) != dict else data
55
  topic = data.get("topic")
@@ -60,7 +86,9 @@ async def start_research(sid, data):
60
  try:
61
  logger.debug(f"Progress update: {status['progress']}% - {status['message']}")
62
  await sio.emit(
63
- "status", {"message": status["message"], "progress": status["progress"]}, room=session_id
 
 
64
  )
65
  except Exception as e:
66
  logger.error(f"Error in progress callback: {str(e)}")
@@ -77,11 +105,11 @@ async def start_research(sid, data):
77
 
78
  @sio.event
79
  async def test(sid, data):
 
80
  print("Testing...")
81
  data = json.loads(data) if type(data) != dict else data
82
  res = await knet.scraper._scrape_page(data["url"])
83
  print(json.dumps(res, indent=2))
84
- await scraper_instance.close()
85
  await sio.emit("test", res, room=sid)
86
 
87
 
 
1
+ # pip install asyncio eventlet
2
  # pip install google-genai beautifulsoup4 selenium newspaper3k lxml_html_clean
3
  import json
4
  import logging
5
+ from typing import Dict
6
 
7
  import socketio
8
  from dotenv import load_dotenv
 
21
 
22
  app = FastAPI()
23
  app.add_middleware(
24
+ CORSMiddleware,
25
+ allow_origins=["*"],
26
+ allow_credentials=True,
27
+ allow_methods=["*"],
28
+ allow_headers=["*"],
29
  )
30
 
31
  sio = socketio.AsyncServer(cors_allowed_origins="*", ping_timeout=60, ping_interval=10, async_mode="asgi")
32
  app.mount("/", socketio.ASGIApp(sio))
33
 
34
+
35
+ class SessionManager:
36
+ def __init__(self):
37
+ self.sessions: Dict[str, tuple[KNet, CrawlForAIScraper]] = {}
38
+
39
+ async def get_or_create_session(self, sid: str) -> tuple[KNet, CrawlForAIScraper]:
40
+ if sid not in self.sessions:
41
+ scraper = CrawlForAIScraper()
42
+ await scraper.start()
43
+ knet = KNet(scraper)
44
+ self.sessions[sid] = (knet, scraper)
45
+ return self.sessions[sid]
46
+
47
+ async def cleanup_session(self, sid: str):
48
+ if sid in self.sessions:
49
+ _, scraper = self.sessions[sid]
50
+ await scraper.close()
51
+ del self.sessions[sid]
52
+
53
+
54
+ session_manager = SessionManager()
55
 
56
 
57
  @sio.event
58
+ async def connect(sid, environ, auth):
59
  logger.info(f"Client connected: {sid}")
60
+ await session_manager.get_or_create_session(sid)
61
 
62
 
63
  @sio.event
64
+ async def disconnect(sid, reason):
65
  logger.info(f"Client disconnected: {sid}")
66
+ await session_manager.cleanup_session(sid)
67
 
68
 
69
  @sio.event
 
74
 
75
  @sio.event
76
  async def start_research(sid, data):
77
+ knet, scraper = await session_manager.get_or_create_session(sid)
78
+
79
  try:
80
  data = json.loads(data) if type(data) != dict else data
81
  topic = data.get("topic")
 
86
  try:
87
  logger.debug(f"Progress update: {status['progress']}% - {status['message']}")
88
  await sio.emit(
89
+ "status",
90
+ {"message": status["message"], "progress": status["progress"]},
91
+ room=session_id,
92
  )
93
  except Exception as e:
94
  logger.error(f"Error in progress callback: {str(e)}")
 
105
 
106
  @sio.event
107
  async def test(sid, data):
108
+ knet, scraper = await session_manager.get_or_create_session(sid)
109
  print("Testing...")
110
  data = json.loads(data) if type(data) != dict else data
111
  res = await knet.scraper._scrape_page(data["url"])
112
  print(json.dumps(res, indent=2))
 
113
  await sio.emit("test", res, room=sid)
114
 
115
 
backend/crawl_ai.py CHANGED
@@ -1,14 +1,10 @@
1
  import asyncio
2
  import json
3
- import sys
4
 
5
  from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode
6
 
7
 
8
- # from base64 import b64decode
9
-
10
-
11
- async def main():
12
  base_browser = BrowserConfig(
13
  browser_type="chromium",
14
  headless=True,
@@ -20,18 +16,28 @@ async def main():
20
  # Create an instance of AsyncWebCrawler
21
  async with AsyncWebCrawler(config=base_browser) as crawler:
22
  # Run the crawler on a URL
23
- result = await crawler.arun(url=sys.argv[1], screenshot=False, cache_mode=CacheMode.BYPASS)
24
- # Print the extracted content
25
- hr = lambda: print(("-" * 80) * 2)
26
- hr()
27
- print(result.markdown)
28
- hr()
29
- print(json.dumps(result.media, indent=2))
30
- hr()
31
- print(json.dumps(result.links, indent=2))
32
- hr()
33
- print(json.dumps(result.downloaded_files, indent=2))
34
- hr()
 
 
 
 
 
 
 
 
 
 
35
 
36
  # if result.success:
37
  # # Save screenshot
@@ -50,4 +56,11 @@ async def main():
50
 
51
 
52
  if __name__ == "__main__":
53
- asyncio.run(main())
 
 
 
 
 
 
 
 
1
  import asyncio
2
  import json
 
3
 
4
  from crawl4ai import AsyncWebCrawler, BrowserConfig, CacheMode
5
 
6
 
7
+ async def main(urls):
 
 
 
8
  base_browser = BrowserConfig(
9
  browser_type="chromium",
10
  headless=True,
 
16
  # Create an instance of AsyncWebCrawler
17
  async with AsyncWebCrawler(config=base_browser) as crawler:
18
  # Run the crawler on a URL
19
+ results = await crawler.arun_many(
20
+ urls=urls,
21
+ screenshot=False,
22
+ cache_mode=CacheMode.BYPASS,
23
+ scan_full_page=True,
24
+ semaphore_count=3,
25
+ wait_for_images=True,
26
+ )
27
+ with open("output.json", "w") as f:
28
+ f.write("")
29
+ for result in results:
30
+ if result.success:
31
+ dump_result = {
32
+ "url": result.url,
33
+ "markdown": result.markdown,
34
+ }
35
+ with open("output.json", "a") as f:
36
+ json.dump(dump_result, f)
37
+ # Print the extracted content
38
+ hr = lambda n=1: print(("-" * 80) * 2 * n)
39
+ print("[OK] URL:", result.url)
40
+ hr()
41
 
42
  # if result.success:
43
  # # Save screenshot
 
56
 
57
 
58
  if __name__ == "__main__":
59
+ urls = [
60
+ "https://www.google.com",
61
+ "https://www.amazon.com",
62
+ "https://www.facebook.com",
63
+ "https://www.twitter.com",
64
+ "https://www.instagram.com",
65
+ ]
66
+ asyncio.run(main(urls))
backend/knet.py CHANGED
@@ -82,6 +82,7 @@ class KNet:
82
  self.logger = logging.getLogger(__name__)
83
  self.max_depth = 2
84
  self.max_breadth = 3
 
85
 
86
  self.search_prompt = """Generate 3-5 specific search queries to research the following topic: {topic}
87
 
@@ -152,7 +153,7 @@ class KNet:
152
  if node.data:
153
  findings = ("\n" + "-" * 10 + "Next data" + "-" * 10 + "\n").join(
154
  [json.dumps(d, indent=2) for d in node.data]
155
- )
156
  response = self.llm.generate_content(
157
  f"Extract key findings from the following data related to the topic '{topic}':\n{findings}"
158
  )
@@ -181,6 +182,8 @@ class KNet:
181
  raise e
182
 
183
  async def conduct_research(self, topic: str, progress_callback=None) -> Dict[str, Any]:
 
 
184
  self.token_count = 0
185
  progress = ResearchProgress(progress_callback)
186
  self.logger.info(f"Starting research on topic: {topic}")
@@ -203,7 +206,7 @@ class KNet:
203
 
204
  # Search and scrape
205
  current_node.data = await self.scraper.search_and_scrape(
206
- current_node.query, 3
207
  ) # node -> data = [{url:...}, {url:...}, ...]
208
  self.ctx_researcher.append(json.dumps(current_node.data, indent=2))
209
  explored_queries.add(current_node.query)
 
82
  self.logger = logging.getLogger(__name__)
83
  self.max_depth = 2
84
  self.max_breadth = 3
85
+ self.num_sites_per_query = 5
86
 
87
  self.search_prompt = """Generate 3-5 specific search queries to research the following topic: {topic}
88
 
 
153
  if node.data:
154
  findings = ("\n" + "-" * 10 + "Next data" + "-" * 10 + "\n").join(
155
  [json.dumps(d, indent=2) for d in node.data]
156
+ )
157
  response = self.llm.generate_content(
158
  f"Extract key findings from the following data related to the topic '{topic}':\n{findings}"
159
  )
 
182
  raise e
183
 
184
  async def conduct_research(self, topic: str, progress_callback=None) -> Dict[str, Any]:
185
+ self.ctx_researcher = []
186
+ self.ctx_manager = []
187
  self.token_count = 0
188
  progress = ResearchProgress(progress_callback)
189
  self.logger.info(f"Starting research on topic: {topic}")
 
206
 
207
  # Search and scrape
208
  current_node.data = await self.scraper.search_and_scrape(
209
+ current_node.query, self.num_sites_per_query
210
  ) # node -> data = [{url:...}, {url:...}, ...]
211
  self.ctx_researcher.append(json.dumps(current_node.data, indent=2))
212
  explored_queries.add(current_node.query)
backend/scraper.py CHANGED
@@ -179,31 +179,25 @@ class CrawlForAIScraper:
179
  await self.crawler.close()
180
  self._is_started = False
181
 
182
- async def search_and_scrape(self, query: str, num_sites: int = 3) -> List[Dict[str, Any]]:
183
  await self.start()
184
  self.logger.info(f"Starting search for: {query}")
185
 
186
- # Perform a Google search to get a list of webpages
187
- search_results = await self._google_search(query, num_sites)
188
  self.logger.info(f"Found {len(search_results)} search results")
189
 
190
  # Scrape each webpage
191
  scraped_data = []
192
- for idx, url in enumerate(search_results):
193
- try:
194
- self.logger.info(f"Scraping [{idx + 1}/{len(search_results)}]: {url}")
195
- data = await self._scrape_page(url)
196
- if data:
197
- scraped_data.append(data)
198
- self.logger.info(f"Successfully scraped: {url}")
199
- except Exception as e:
200
- self.logger.error(f"Error scraping {url}: {str(e)}")
201
- continue
202
 
203
  self.logger.info(f"Completed scraping {len(scraped_data)} sites")
204
  return scraped_data
205
 
206
- async def _google_search(self, query: str, num_results: int) -> List[str]:
207
  self.logger.info("Performing Google search...")
208
  try:
209
  encoded_query = quote_plus(query)
@@ -214,7 +208,6 @@ class CrawlForAIScraper:
214
  screenshot=False,
215
  cache_mode=CacheMode.BYPASS,
216
  delay_before_return_html=2,
217
- page_timeout=25000,
218
  scan_full_page=True,
219
  )
220
 
@@ -239,33 +232,36 @@ class CrawlForAIScraper:
239
  self.logger.error(f"Google search error: {str(e)}")
240
  return []
241
 
242
- async def _scrape_page(self, url: str) -> Dict[str, Any]:
243
  await self.start()
244
 
245
  try:
246
  # Run the crawler on a URL
247
- result = await self.crawler.arun(
248
- url=url,
249
  screenshot=False,
250
  cache_mode=CacheMode.BYPASS,
251
- delay_before_return_html=2,
252
- page_timeout=25000,
253
  scan_full_page=True,
 
 
 
254
  )
255
- soup = BeautifulSoup(result.html, "html.parser")
256
- data = {
257
- "url": url,
258
- "text": result.markdown,
259
- "images": self._extract_images(soup, result.url),
260
- "videos": self._extract_videos(soup),
261
- "links": result.links["external"],
262
- }
263
-
264
- return data
 
 
 
265
 
266
  except Exception as e:
267
- self.logger.error(f"Scraping error for {url}: {str(e)}")
268
- # raise e
269
  return {}
270
 
271
  def _extract_images(self, soup: BeautifulSoup, url: str) -> List[str]:
@@ -274,9 +270,14 @@ class CrawlForAIScraper:
274
  for img in soup.find_all("img"):
275
  if "src" in img.attrs:
276
  src = img["src"]
277
- # remove px or any characters from width and height
278
- width = int("".join(filter(str.isdigit, img.get("width", "0"))))
279
- height = int("".join(filter(str.isdigit, img.get("height", "0"))))
 
 
 
 
 
280
  if width > 300 and height > 300 and "pixel" not in src and "icon" not in src:
281
  images.append((src, width, height))
282
  images = sorted(images, key=lambda img: -1 * (img[1] * img[2]))
@@ -310,14 +311,21 @@ class CrawlForAIScraper:
310
  if __name__ == "__main__":
311
  import sys
312
 
313
- url = "https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview"
 
 
 
 
 
 
 
314
  if len(sys.argv) > 1:
315
- url = sys.argv[1]
316
 
317
  async def main():
318
  scraper = CrawlForAIScraper()
319
  await scraper.start()
320
- data = await scraper.search_and_scrape("what is ai")
321
  await scraper.close()
322
  with open("output.json", "w") as f:
323
  f.write(json.dumps(data, indent=2))
 
179
  await self.crawler.close()
180
  self._is_started = False
181
 
182
+ async def search_and_scrape(self, query: str, num_sites: int = 10) -> List[Dict[str, Any]]:
183
  await self.start()
184
  self.logger.info(f"Starting search for: {query}")
185
 
186
+ # Perform a search to get a list of webpages
187
+ search_results = await self._search(query, num_sites)
188
  self.logger.info(f"Found {len(search_results)} search results")
189
 
190
  # Scrape each webpage
191
  scraped_data = []
192
+ self.logger.info(f"Scraping {len(search_results)} sites...")
193
+ data = await self._scrape_pages(search_results)
194
+ if data:
195
+ scraped_data.extend(data)
 
 
 
 
 
 
196
 
197
  self.logger.info(f"Completed scraping {len(scraped_data)} sites")
198
  return scraped_data
199
 
200
+ async def _search(self, query: str, num_results: int) -> List[str]:
201
  self.logger.info("Performing Google search...")
202
  try:
203
  encoded_query = quote_plus(query)
 
208
  screenshot=False,
209
  cache_mode=CacheMode.BYPASS,
210
  delay_before_return_html=2,
 
211
  scan_full_page=True,
212
  )
213
 
 
232
  self.logger.error(f"Google search error: {str(e)}")
233
  return []
234
 
235
+ async def _scrape_pages(self, urls: str) -> Dict[str, Any]:
236
  await self.start()
237
 
238
  try:
239
  # Run the crawler on a URL
240
+ results = await self.crawler.arun_many(
241
+ urls=urls,
242
  screenshot=False,
243
  cache_mode=CacheMode.BYPASS,
 
 
244
  scan_full_page=True,
245
+ semaphore_count=4,
246
+ wait_for_images=True,
247
+ page_timeout=25000,
248
  )
249
+ scraped_sites = []
250
+ for result in results:
251
+ if result.success:
252
+ soup = BeautifulSoup(result.html, "html.parser")
253
+ data = {
254
+ "url": result.url,
255
+ "text": result.markdown,
256
+ "images": self._extract_images(soup, result.url),
257
+ "videos": self._extract_videos(soup),
258
+ "links": result.links["external"],
259
+ }
260
+ scraped_sites.append(data)
261
+ return scraped_sites
262
 
263
  except Exception as e:
264
+ self.logger.error(f"Scraping error while {urls}: {str(e)}")
 
265
  return {}
266
 
267
  def _extract_images(self, soup: BeautifulSoup, url: str) -> List[str]:
 
270
  for img in soup.find_all("img"):
271
  if "src" in img.attrs:
272
  src = img["src"]
273
+ if not "width" or not "height" in img.attrs:
274
+ continue
275
+ if "width" in img.attrs and img.get("width").lower() == "auto":
276
+ images.append((src, 999, 0))
277
+ # Remove units from width and height: get start of the entity till the first non-digit character
278
+ width = "".join([i for i in img.get("width", "0") if i.isdigit() or i == "."])
279
+ height = "".join([i for i in img.get("height", "0") if i.isdigit() or i == "."])
280
+ width, height = float(width), float(height)
281
  if width > 300 and height > 300 and "pixel" not in src and "icon" not in src:
282
  images.append((src, width, height))
283
  images = sorted(images, key=lambda img: -1 * (img[1] * img[2]))
 
311
  if __name__ == "__main__":
312
  import sys
313
 
314
+ urls = [
315
+ "https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview",
316
+ "https://docs.crawl4ai.com/advanced/multi-url-crawling/",
317
+ "https://github.com/SesameAILabs/csm",
318
+ "https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview",
319
+ "https://docs.crawl4ai.com/advanced/multi-url-crawling/",
320
+ "https://github.com/SesameAILabs/csm",
321
+ ]
322
  if len(sys.argv) > 1:
323
+ urls = sys.argv[1:]
324
 
325
  async def main():
326
  scraper = CrawlForAIScraper()
327
  await scraper.start()
328
+ data = await scraper.search_and_scrape("quantum computing")
329
  await scraper.close()
330
  with open("output.json", "w") as f:
331
  f.write(json.dumps(data, indent=2))