Spaces:
Runtime error
Runtime error
File size: 5,292 Bytes
0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 9f9307c 0277ad1 7e72686 0277ad1 9f9307c 0277ad1 9f9307c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import requests
from bs4 import BeautifulSoup
import re
import csv
from .DatabaseDataSaver import save_product
import httpx
from httpx_socks import SyncProxyTransport
class UltaScraper:
def __init__(self):
self.headers = {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36"
}
self.base_url = "https://www.ulta.com/"
self.categories = ['men', 'gifts', 'luxury-beauty', 'tools-brushes', 'fragrance', 'body-care', 'hair', 'skin-care', 'make-up']
try:
self.session = self.create_session()
except Exception as e:
print(e)
self.session = self.create_session()
def create_session(self):
transport = SyncProxyTransport.from_url("http://W4a8IruR4dkhNGb6:Hesj0mkBfnJ1n95M_country-us@geo.iproyal.com:12321")
session = httpx.Client(transport=transport)
return session
def product_detail(self, url):
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
soup = BeautifulSoup(response.text, 'html.parser')
product_name = soup.find('h1').find('span', {
'class': 'Text-ds--title-5'
}).text
product_id_pattern = r'(\d+)\?'
product_id = re.search(product_id_pattern, url).group(1)
try:
ingridient_summary = ingredients_element = soup.find(id="Ingredients")
ingridients = ingridient_summary.find_next_sibling().text
except:
ingridients = ''
return [product_name, product_id, ingridients]
def get_number_of_results(self, category):
"""
Get the number of results for a given category.
"""
url = f"{self.base_url}shop/{category}/all"
print(url)
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
soup = BeautifulSoup(response.text, "html.parser")
total_results_text = soup.find(
"div", {
"class": "ProductListingWrapper__resultslabel"
}).text
# print(total_results_text)
result = re.search(r'\d+', total_results_text)
return int(result.group())
def get_product_urls(self, category, total_results):
"""
Get product urls from category
"""
total_pages = total_results // 96 + 1
for page in range(1, total_pages + 1):
url = f"{self.base_url}shop/{category}/all?page={page}"
print(url)
try:
response = self.session.get(url, headers=self.headers)
except:
try:
self.session = self.create_session()
response = self.session.get(url, headers=self.headers)
except:
return False
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
products_ul = soup.find(
"ul", {"class": "ProductListingResults__productList"})
all_links = products_ul.find_all('a', href=True)
urls = []
for link in all_links:
if link['href'].startswith('https://www.ulta.com/p/'):
print(link['href'])
product = self.product_detail(link['href'])
print(product)
if product:
save_product({
'title': product[0],
'product_id': product[1],
'ingredients': product[2],
'url': link['href'],
'store_name': 'Ulta'
})
urls.append(link['href'])
if len(urls) == 0:
break
page += 1
def save_product_to_csv(self, product):
with open('wallmart.csv', 'a+', encoding='utf-8', newline='') as file:
# check if the product is already in the csv
file.seek(0) # move the file pointer to the beginning of the file
reader = csv.reader(file)
product_ids = [row[1] for row in reader]
if product[1] in product_ids:
return False
writer = csv.writer(file)
writer.writerow(product)
file.close()
# def save_product(self, product):
# # print(product, "Saving -> Product")
# try:
# conn = psycopg2.connect(
# host="ep-rapid-cake-30394055.us-east-2.aws.neon.tech",
# database="ingredients-scraper",
# user="mumer113141",
# password="SFBtp4xnPeA2")
# cur = conn.cursor()
# cur.execute(
# "INSERT INTO scraper_product (title, product_id, ingredients, url, store_name, date_created) VALUES (%s, %s, %s, %s, %s, %s)",
# (product['title'], product['product_id'],
# product['ingredients'], product['url'], product['store_name'],
# datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
# conn.commit()
# cur.close()
# conn.close()
# return True
# except Exception as e:
# print(e)
# return False
def run(self):
for category in self.categories:
total_results = self.get_number_of_results(category)
self.get_product_urls(category, total_results)
print(f'Finished {category}')
|