import requests from bs4 import BeautifulSoup import re import csv from .DatabaseDataSaver import save_product import httpx from httpx_socks import SyncProxyTransport class UltaScraper: def __init__(self): self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36" } self.base_url = "https://www.ulta.com/" self.categories = ['men', 'gifts', 'luxury-beauty', 'tools-brushes', 'fragrance', 'body-care', 'hair', 'skin-care', 'make-up'] try: self.session = self.create_session() except Exception as e: print(e) self.session = self.create_session() def create_session(self): transport = SyncProxyTransport.from_url("http://W4a8IruR4dkhNGb6:Hesj0mkBfnJ1n95M_country-us@geo.iproyal.com:12321") session = httpx.Client(transport=transport) return session def product_detail(self, url): try: response = self.session.get(url, headers=self.headers) except: try: self.session = self.create_session() response = self.session.get(url, headers=self.headers) except: return False soup = BeautifulSoup(response.text, 'html.parser') product_name = soup.find('h1').find('span', { 'class': 'Text-ds--title-5' }).text product_id_pattern = r'(\d+)\?' product_id = re.search(product_id_pattern, url).group(1) try: ingridient_summary = ingredients_element = soup.find(id="Ingredients") ingridients = ingridient_summary.find_next_sibling().text except: ingridients = '' return [product_name, product_id, ingridients] def get_number_of_results(self, category): """ Get the number of results for a given category. """ url = f"{self.base_url}shop/{category}/all" print(url) try: response = self.session.get(url, headers=self.headers) except: try: self.session = self.create_session() response = self.session.get(url, headers=self.headers) except: return False soup = BeautifulSoup(response.text, "html.parser") total_results_text = soup.find( "div", { "class": "ProductListingWrapper__resultslabel" }).text # print(total_results_text) result = re.search(r'\d+', total_results_text) return int(result.group()) def get_product_urls(self, category, total_results): """ Get product urls from category """ total_pages = total_results // 96 + 1 for page in range(1, total_pages + 1): url = f"{self.base_url}shop/{category}/all?page={page}" print(url) try: response = self.session.get(url, headers=self.headers) except: try: self.session = self.create_session() response = self.session.get(url, headers=self.headers) except: return False if response.status_code == 200: soup = BeautifulSoup(response.text, "html.parser") products_ul = soup.find( "ul", {"class": "ProductListingResults__productList"}) all_links = products_ul.find_all('a', href=True) urls = [] for link in all_links: if link['href'].startswith('https://www.ulta.com/p/'): print(link['href']) product = self.product_detail(link['href']) print(product) if product: save_product({ 'title': product[0], 'product_id': product[1], 'ingredients': product[2], 'url': link['href'], 'store_name': 'Ulta' }) urls.append(link['href']) if len(urls) == 0: break page += 1 def save_product_to_csv(self, product): with open('wallmart.csv', 'a+', encoding='utf-8', newline='') as file: # check if the product is already in the csv file.seek(0) # move the file pointer to the beginning of the file reader = csv.reader(file) product_ids = [row[1] for row in reader] if product[1] in product_ids: return False writer = csv.writer(file) writer.writerow(product) file.close() # def save_product(self, product): # # print(product, "Saving -> Product") # try: # conn = psycopg2.connect( # host="ep-rapid-cake-30394055.us-east-2.aws.neon.tech", # database="ingredients-scraper", # user="mumer113141", # password="SFBtp4xnPeA2") # cur = conn.cursor() # cur.execute( # "INSERT INTO scraper_product (title, product_id, ingredients, url, store_name, date_created) VALUES (%s, %s, %s, %s, %s, %s)", # (product['title'], product['product_id'], # product['ingredients'], product['url'], product['store_name'], # datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))) # conn.commit() # cur.close() # conn.close() # return True # except Exception as e: # print(e) # return False def run(self): for category in self.categories: total_results = self.get_number_of_results(category) self.get_product_urls(category, total_results) print(f'Finished {category}')