| import aiohttp |
| from aiohttp import ClientSession |
| import asyncio |
| import requests |
| from bs4 import BeautifulSoup |
| from urllib.parse import urljoin, urldefrag |
| from modules.config.constants import TIMEOUT |
|
|
|
|
| class WebpageCrawler: |
| def __init__(self): |
| self.dict_href_links = {} |
|
|
| async def fetch(self, session: ClientSession, url: str) -> str: |
| async with session.get(url) as response: |
| try: |
| return await response.text() |
| except UnicodeDecodeError: |
| return await response.text(encoding="latin1") |
|
|
| def url_exists(self, url: str) -> bool: |
| try: |
| response = requests.head(url, timeout=TIMEOUT) |
| return response.status_code == 200 |
| except requests.ConnectionError: |
| return False |
|
|
| async def get_links(self, session: ClientSession, website_link: str, base_url: str): |
| html_data = await self.fetch(session, website_link) |
| soup = BeautifulSoup(html_data, "html.parser") |
| list_links = [] |
| for link in soup.find_all("a", href=True): |
| href = link["href"].strip() |
| full_url = urljoin(base_url, href) |
| normalized_url = self.normalize_url(full_url) |
| if ( |
| normalized_url not in self.dict_href_links |
| and self.is_child_url(normalized_url, base_url) |
| and self.url_exists(normalized_url) |
| ): |
| self.dict_href_links[normalized_url] = None |
| list_links.append(normalized_url) |
|
|
| return list_links |
|
|
| async def get_subpage_links( |
| self, session: ClientSession, urls: list, base_url: str |
| ): |
| tasks = [self.get_links(session, url, base_url) for url in urls] |
| results = await asyncio.gather(*tasks) |
| all_links = [link for sublist in results for link in sublist] |
| return all_links |
|
|
| async def get_all_pages(self, url: str, base_url: str): |
| async with aiohttp.ClientSession() as session: |
| dict_links = {url: "Not-checked"} |
| counter = None |
| while counter != 0: |
| unchecked_links = [ |
| link |
| for link, status in dict_links.items() |
| if status == "Not-checked" |
| ] |
| if not unchecked_links: |
| break |
| new_links = await self.get_subpage_links( |
| session, unchecked_links, base_url |
| ) |
| for link in unchecked_links: |
| dict_links[link] = "Checked" |
| dict_links.update( |
| { |
| link: "Not-checked" |
| for link in new_links |
| if link not in dict_links |
| } |
| ) |
| counter = len( |
| [ |
| status |
| for status in dict_links.values() |
| if status == "Not-checked" |
| ] |
| ) |
|
|
| checked_urls = [ |
| url for url, status in dict_links.items() if status == "Checked" |
| ] |
| return checked_urls |
|
|
| def is_webpage(self, url: str) -> bool: |
| try: |
| response = requests.head(url, allow_redirects=True, timeout=TIMEOUT) |
| content_type = response.headers.get("Content-Type", "").lower() |
| return "text/html" in content_type |
| except requests.RequestException: |
| return False |
|
|
| def clean_url_list(self, urls): |
| files, webpages = [], [] |
|
|
| for url in urls: |
| if self.is_webpage(url): |
| webpages.append(url) |
| else: |
| files.append(url) |
|
|
| return files, webpages |
|
|
| def is_child_url(self, url, base_url): |
| return url.startswith(base_url) |
|
|
| def normalize_url(self, url: str): |
| |
| defragged_url, _ = urldefrag(url) |
| return defragged_url |
|
|