File size: 2,587 Bytes
7e24b41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from concurrent.futures import ThreadPoolExecutor
from src.utils.exceptions import CustomException
from urllib.parse import urlparse, urljoin
from src.utils.functions import getConfig
from src.utils.functions import cleanText
from src.utils.logging import logger
from bs4 import BeautifulSoup
import time
import requests


class WebsiteCrawler:
    def __init__(self):
        self.config = getConfig(path = "config.ini")

    def getLinksFromPage(self, url: str):
        response = requests.get(url)
        soup = BeautifulSoup(response.content, "html.parser")
        anchors = soup.find_all("a")
        links = []
        for anchor in anchors:
            if "href" in anchor.attrs:
                if urlparse(anchor.attrs["href"]).netloc == urlparse(url).netloc:
                    links.append(anchor.attrs["href"])
                elif not anchor.attrs["href"].startswith(("//", "file", "javascript", "tel", "mailto", "http")):
                    links.append(urljoin(url + "/", anchor.attrs["href"]))
                else:
                    pass
                links = [link for link in links if "#" not in link]
                links = list(set(links))
            else:
                continue
        return links
    
    def getLinks(self, url: str):
        try:
            logger.info("fetching links from url")
            start = time.time()
            links = self.getLinksFromPage(url)
            uniqueLinks = set()
            for link in links:
                now = time.time()
                if now - start > self.config.getint("WEBCRAWLER", "timeout"):
                    break
                else:
                    uniqueLinks = uniqueLinks.union(set(self.getLinksFromPage(link)))
            return list(set([x[:len(x) - 1] if x[-1] == "/" else x for x in uniqueLinks]))
        except Exception as e:
            logger.error(CustomException(e))

    def extractTextFromUrl(self, url: str):
        response = requests.get(url)
        response.raise_for_status()
        html = response.text
        soup = BeautifulSoup(html, 'html.parser')
        return cleanText(text = soup.get_text(separator=' ', strip=True))

    def extractTextFromUrlList(self, urls: list[str]):
        try:
            logger.info("extracting text from urls")
            with ThreadPoolExecutor() as executor:
                texts = list(executor.map(self.extractTextFromUrl, urls))
            return "\n".join(texts) 
        except Exception as e:
            logger.error(CustomException(e))