shops-scraper / scraper /utils /UltaScraper.py
mumer119131's picture
Update scraper/utils/UltaScraper.py
7e72686
import requests
from bs4 import BeautifulSoup
import re
import csv
from .DatabaseDataSaver import save_product
import httpx
from httpx_socks import SyncProxyTransport
class UltaScraper:
def __init__(self):
self.headers = {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
}
self.base_url = "https://www.ulta.com/"
self.categories = ['men', 'gifts', 'luxury-beauty', 'tools-brushes', 'fragrance', 'body-care', 'hair', 'skin-care', 'make-up']
try:
self.session = self.create_session()
except Exception as e:
print(e)
self.session = self.create_session()
def create_session(self):
transport = SyncProxyTransport.from_url("http://W4a8IruR4dkhNGb6:Hesj0mkBfnJ1n95M_country-us@geo.iproyal.com:12321")
session = httpx.Client(transport=transport)
return session
def product_detail(self, url):
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
soup = BeautifulSoup(response.text, 'html.parser')
product_name = soup.find('h1').find('span', {
'class': 'Text-ds--title-5'
}).text
product_id_pattern = r'(\d+)\?'
product_id = re.search(product_id_pattern, url).group(1)
try:
ingridient_summary = ingredients_element = soup.find(id="Ingredients")
ingridients = ingridient_summary.find_next_sibling().text
except:
ingridients = ''
return [product_name, product_id, ingridients]
def get_number_of_results(self, category):
"""
Get the number of results for a given category.
"""
url = f"{self.base_url}shop/{category}/all"
print(url)
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
soup = BeautifulSoup(response.text, "html.parser")
total_results_text = soup.find(
"div", {
"class": "ProductListingWrapper__resultslabel"
}).text
# print(total_results_text)
result = re.search(r'\d+', total_results_text)
return int(result.group())
def get_product_urls(self, category, total_results):
"""
Get product urls from category
"""
total_pages = total_results // 96 + 1
for page in range(1, total_pages + 1):
url = f"{self.base_url}shop/{category}/all?page={page}"
print(url)
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
products_ul = soup.find(
"ul", {"class": "ProductListingResults__productList"})
all_links = products_ul.find_all('a', href=True)
urls = []
for link in all_links:
if link['href'].startswith('https://www.ulta.com/p/'):
print(link['href'])
product = self.product_detail(link['href'])
print(product)
if product:
save_product({
'title': product[0],
'product_id': product[1],
'ingredients': product[2],
'url': link['href'],
'store_name': 'Ulta'
})
urls.append(link['href'])
if len(urls) == 0:
break
page += 1
def save_product_to_csv(self, product):
with open('wallmart.csv', 'a+', encoding='utf-8', newline='') as file:
# check if the product is already in the csv
file.seek(0) # move the file pointer to the beginning of the file
reader = csv.reader(file)
product_ids = [row[1] for row in reader]
if product[1] in product_ids:
return False
writer = csv.writer(file)
writer.writerow(product)
file.close()
# def save_product(self, product):
# # print(product, "Saving -> Product")
# try:
# conn = psycopg2.connect(
# host="ep-rapid-cake-30394055.us-east-2.aws.neon.tech",
# database="ingredients-scraper",
# user="mumer113141",
# password="SFBtp4xnPeA2")
# cur = conn.cursor()
# cur.execute(
# "INSERT INTO scraper_product (title, product_id, ingredients, url, store_name, date_created) VALUES (%s, %s, %s, %s, %s, %s)",
# (product['title'], product['product_id'],
# product['ingredients'], product['url'], product['store_name'],
# datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
# conn.commit()
# cur.close()
# conn.close()
# return True
# except Exception as e:
# print(e)
# return False
def run(self):
for category in self.categories:
total_results = self.get_number_of_results(category)
self.get_product_urls(category, total_results)
print(f'Finished {category}')