shop-agent / site_scraper.py
eshan13's picture
Add application file
a8bc862
import json
from os import name, wait
from warnings import filters
import playwright
import asyncio
import time
import re
import random
from typing import List, Optional, Dict, Any, Tuple
from playwright.async_api import async_playwright
from pydantic_models import ProductReview, UserFilter, Product, ProductClass
import logging
import sys
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
logger = logging.getLogger(__name__)
async def get_product_page(pro_url:str, page: Any) -> Product | None:
try:
await page.goto(pro_url)
# await page.wait_for_selector('div._39kFie.N3De93.JxFEK3._48O0EI')
pro_visuals = page.locator('div.DOjaWF.gdgoEp.col-5-12.MfqIAz')
# await pro_visuals.first.wait_for(state='attached', timeout=1000)
# img_ele = pro_visuals.locator('img.DByuf4.IZexXJ.jLEJ7H')
img_ele = pro_visuals.locator('img').first
img = await img_ele.get_attribute('src') if await img_ele.count() else None
pro_desc = page.locator('div.DOjaWF.gdgoEp.col-8-12')
pro_intro = page.locator('div.C7fEHH')
name_ele = pro_intro.locator('h1._6EBuvT')
name = await name_ele.inner_text()
price_ele = pro_intro.locator('div.Nx9bqj.CxhGGd')
price = await price_ele.inner_text() if price_ele else None
rat_ele = pro_intro.locator('div.XQDdHH') # span.Y1HWO0
if await rat_ele.count():
rat = await rat_ele.inner_text()
num_ele = pro_intro.locator('span.Wphh3N')
if await num_ele.count():
num_rat, num_rev = re.findall(r'[\d,]+', await num_ele.inner_text())
else:
num_rat, num_rev = '-1', '-1'
else:
rat = '0.0'
num_rat, num_rev = '-1', '-1'
prod_rev = ProductReview.model_validate({'ratings':rat , 'num_ratings': num_rat, 'num_reviews': num_rev})
high = []
product = Product.model_validate({'name': name, 'price': price, 'url': pro_url, 'image': img, 'review': prod_rev, 'details': high})
return product
except Exception as e:
logger.error(f"Error while fetching {pro_url} deets: {e}")
async def get_products(base_url: str , page: Any) -> List[Product] | None:
try:
tiles = await page.query_selector_all("div.cPHDOP")
logger.info(f'Tiles: {len(tiles)} | {tiles}')
products = []
for idx,tile in enumerate(tiles):
try:
link_ele = await tile.query_selector("a.CGtC98")
link = await link_ele.get_attribute("href") if link_ele else None
if link:
link = base_url + link
logger.info('link:', link)
# Product name
name_ele = await tile.query_selector("div.KzDlHZ")
name = await name_ele.inner_text() if name_ele else None
logger.info('name:', name)
# Price
price_ele = await tile.query_selector("div.Nx9bqj._4b5DiR")
price = await price_ele.inner_text() if price_ele else None
logger.info('price:', price)
# Image
img_ele = await tile.query_selector("img.DByuf4")
image = await img_ele.get_attribute("src") if img_ele else None
logger.info('image:', image)
#Ratings
rat_ele = await tile.query_selector('div.XQDdHH')
rat = await rat_ele.inner_text() if rat_ele else 0.0
num_ele = await tile.query_selector('span.Wphh3N')
num_rat, num_rev = re.findall(r'[\d,]+', await num_ele.inner_text()) if num_ele else ('-1', '-1')
logger.info('ratings:', rat, num_rat, num_rev)
#Highlights
# ul_ele = await tile.query_selector('ul.G4BRas')
# if ul_ele:
li_ele = await tile.query_selector_all('li.J\\+igdf')
high = []
for li in li_ele:
if li:
high.append(await li.inner_text())
logger.info('high:', high)
# logger.info(f"{idx}. {name} | {price} | {link} | {image} | {rat} | {num_rat} | {num_rev} | {high}")
if name and price and link and image:
# logger.info({'name': name, 'price': price, 'link': link, 'image': image, 'ratings': rat, 'num_ratings': num_rat, 'num_reviews': num_rev, 'mini_deets': high})
prod_rev = ProductReview.model_validate(
{'ratings':rat , 'num_ratings': num_rat, 'num_reviews': num_rev}
)
product = Product.model_validate(
{'name': name, 'price': price, 'url': link, 'image': image, 'review': prod_rev, 'details': high}
)
products.append(product)
except Exception as e:
logger.error(f'Error processing tile {idx}: {e}')
return products
except Exception as e:
logger.error(f"Error during browser setup or navigation: {e}")
await page.screenshot(path="error_screenshot.png")
await page.pause()
async def get_filters(page: Any, base_url: str, search_query:str) -> List[UserFilter] | None:
# async with async_playwright() as p:
try:
# browser = await p.chromium.launch(headless=False)
# context = await browser.new_context()
# page = await context.new_page()
await page.goto(base_url)
await page.fill("input[name='q']", search_query)
await page.press("input[name='q']", "Enter")
await page.wait_for_selector('section._2OLUF3')
# filters = await page.query_selector_all('section._2OLUF3')
filters = page.locator('section._2OLUF3')
fcount = await filters.count()
logger.info(f'Filters: {fcount}')
toggles = page.locator("svg.ukzDZP")
site_filters = []
for i in range(2, fcount):
try:
filter = filters.nth(i)
fname = filter.locator('div.fxf7w6.rgHxCQ')
fname_count = await fname.count()
if not fname or fname_count < 1:
continue
fname = (await fname.inner_text()).strip()
logger.info(f'Processing filter {fname} ...')
is_exp = filter.locator('div.SDsN9S')
if await is_exp.count() < 1:
logger.info(f'click click click ...')
header_toggle = filter.locator('svg.ukzDZP')
await header_toggle.click()
# await toggles.nth(i).click()
await page.wait_for_timeout(500)
sel = filter.locator('div.ewzVkT._3DvUAf')
rang = filter.locator('div._0vP2OD')
if sel:
opt = [(await sel.nth(e).inner_text()).strip() for e in range(await sel.count())]
# filterval = {
# 'type': 'multiselect',
# 'selection': opt,
# 'range': None
# }
site_filter = UserFilter.model_validate({'name': fname, 'type': 'multiselect' , 'selection': opt})
# elif rang:
# opt = [(await rang.nth(e).inner_text()).strip() for e in range(await rang.count())]
# filterval = {
# 'type': 'range',
# 'selection': None,
# 'range': opt
# }
# site_filter = UserFilter.model_validate({'name': fname, 'type': 'range' , 'range': opt})
# logger.info(f'Pre Model: name: {fname}, value: {filterval}')
# site_filter = UserFilter.model_validate({'name': fname, 'value': filterval})
logger.info(f'Processed ! \n{site_filter}')
site_filters.append(site_filter)
except Exception as e:
logger.error(f'Oops !!: {e}')
return site_filters
# all_texts = await filter.evaluate("""
# (element) => {
# const texts = [];
# // Iterate over all child nodes of the element
# element.childNodes.forEach(node => {
# // Get the text content and remove leading/trailing whitespace
# const text = node.textContent.trim();
# // Add it to our list only if it's not an empty string
# if (text) {
# texts.push(text);
# }
# });
# return texts;
# }
# """)
# sel = re.split(r'(?=[A-Z0-9])', all_texts[-1])
# sel = [s.strip() for s in sel if s]
# sel = all_texts[-1]
# print(f'{idx}. Filter: {all_texts[0]} | txt: {sel}')
# For Range : _0vP2OD , Selection : ewzVkT _3DvUAf
except Exception as e:
logger.error(f"Error during browser setup or navigation: {e}")
await page.screenshot(path="error_screenshot.png")
await page.pause()
return []
async def get_filtered_products(page: Any, base_url: str, search_query: str, user_filters: List[UserFilter] | None, top_k:int = 10) -> List[Product] | None:
# async with async_playwright() as p:
try:
await page.goto(base_url)
# search_url = base_url + f'search?q={search_query.replace(" ", "+")}'
# await page.goto(search_url)
await page.fill("input[name='q']", search_query)
await page.press("input[name='q']", "Enter")
await page.wait_for_selector("div.DOjaWF.gdgoEp")
if user_filters:
filters = page.locator('section._2OLUF3')
fcount = await filters.count()
user_fnames = [ f.name for f in user_filters]
user_fn2vals = {f.name: f.selection for f in user_filters}
for i in range(2, fcount):
try:
filter = filters.nth(i)
fname = filter.locator('div.fxf7w6.rgHxCQ')
fname_count = await fname.count()
if not fname or fname_count < 1:
continue
fname = (await fname.inner_text()).strip()
logger.info(f'Applying {fname} filter')
if fname not in user_fnames:
continue
logger.info(f'Applying filter {fname} ...')
is_exp = filter.locator('div.SDsN9S')
if await is_exp.count() < 1:
logger.info(f'click click click ...')
header_toggle = filter.locator('svg.ukzDZP')
await header_toggle.click()
# await toggles.nth(i).click()
# await page.wait_for_timeout(500)
# when you know desired values:
vals = user_fn2vals.get(fname, None)
if vals:
for wanted in vals:
try:
# restrict the search to this filter's option elements
locator = filter.locator('div.ewzVkT._3DvUAf', has_text=wanted)
# small wait — the locator will throw quickly if not found
await locator.first.wait_for(state='attached', timeout=1000)
await locator.first.scroll_into_view_if_needed()
await locator.first.click(timeout=1000)
logger.info(f"Selected {wanted} in {fname}")
except Exception as e:
logger.error(f"couldn't select {wanted} in {fname}: {e}")
# sel = filter.locator('div.ewzVkT._3DvUAf')
# rang = filter.locator('div._0vP2OD')
# if sel:
# ocount = await sel.count()
# for e in range(ocount):
# try:
# # filtername = (await sel.nth(e).inner_text()).strip()
# filtername = await sel.nth(e).text_content(timeout=1000)
# filtername = filtername.strip() if filtername else ''
# logger.info(f'Trying {filtername} uwu')
# except Exception as e:
# logger.info(f'f: {e}')
# continue
# # locator = filter.locator('div.ewzVkT._3DvUAf', has_text=filtername)
# # await locator.wait_for(timeout=5000)
# # await locator.click()
# # logger.info(f"Selected {filtername} in {fname} ...")
# if user_fn2vals[fname] and filtername in user_fn2vals[fname]:
# await sel.nth(e).scroll_into_view_if_needed()
# await sel.nth(e).click()
# # await page.wait_for_timeout(500)
# logger.info(f'Selected {filtername} in {fname} ...')
# elif rang:
# opt = [(await rang.nth(e).inner_text()).strip() for e in range(await rang.count())]
# filterval = {
# 'type': 'range',
# 'range': opt
# }
# await page.wait_for_timeout(1000)
except Exception as e:
logger.error(f'Oops !!: {e}')
pro_links = await get_pro_links(base_url, page)
products = []
for pro_link in pro_links[:top_k]:
products.append( await get_product_page(pro_link, page))
return products
# return await get_products(base_url, page)
except Exception as e:
logger.error(f'Oopsie ! {e}')
async def get_pro_links(base_url: str, page:Any) -> List[str]:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
link_eles = await page.locator("div.DOjaWF.gdgoEp div.cPHDOP a").all()
# link_eles = await page.locator("a.CGtC98").all()
# link_count = await link_eles.count()
# if link_count < 1:
# link_eles = page.locator("a.CGtC98")
# link_count = await link_eles.count()
logger.info(f'{len(link_eles)} product links fetched !')
links = []
for i,link_ele in enumerate(link_eles):
try:
# link = await link_eles.nth(e).get_attribute("href")
link = await link_ele.get_attribute("href")
if link and 'page=' not in link and 'search?' not in link and len(link) > 150:
link = base_url + link
# logger.info(f'{e}. {link}')
if link not in links:
links.append(link)
except Exception as e:
logger.error(f'Oopsie in get_pro_links: {e}')
logger.info(f'links: {len(links)}')
return links[1:]
async def playwright_enter() -> Tuple:
context_man = async_playwright()
playwright = await context_man.__aenter__()
browser = await playwright.chromium.launch(headless=True) #False for browser
context = await browser.new_context()
page = await context.new_page()
page.set_default_timeout(15000) # 6 seconds for all waits
page.set_default_navigation_timeout(15000)
return context_man, playwright, browser, context, page
async def playwright_exit(context_man) -> None:
await context_man.__aexit__(None, None, None)
async def main():
base_url = 'https://www.flipkart.com'
search_query = 'Real Madrid 16/17 blue jersey'
pro_url = 'https://www.flipkart.com/apple-iphone-16-black-256-gb/p/itm86da1977dcdf1?pid=MOBH4DQFZCJJXUFG&lid=LSTMOBH4DQFZCJJXUFGO5DY3W&marketplace=FLIPKART&q=iphones&store=tyy%2F4io&spotlightTagId=default_BestsellerId_tyy%2F4io&srno=s_1_1&otracker=search&otracker1=search&fm=Search&iid=ebc6ba69-a89c-454d-89fc-28840c990f28.MOBH4DQFZCJJXUFG.SEARCH&ppt=sp&ppn=sp&ssid=kbfoyqc2aiya8utc1758898276696&qH=3e7fa8c51e2e4986'
pro_url = """https://www.flipkart.com/yellowvibes-printed-typography-men-polo-neck-white-t-shirt/p/itm1a98ebe03b5ec?pid=TSHGWEPXUAPRMKYR&lid=LSTTSHGWEPXUAPRMKYRQIO4IO&marketplace=FLIPKART&q=Real+Madrid+16%2F17+blue+jersey
&store=clo%2Fash%2Fank&srno=s_1_1&otracker=search&otracker1=search&fm=organic&iid=en_oFB3vt2XCktASEkPfAibC4Z1DeYhIw7ZGQqZLNQPWOWPAg129Gg2Hhmsqf-_kVmqB9SCXfJYY1jeUE-T3l3eNg%3D%3D&ppt=None&ppn=None&ssid=z37yvkk2tc0
000001758907408893&qH=b0f06ba87382280b
"""
# yellowvibes
# await get_filters(base_url, 'iphones !latest one black with 512 gb into latest iPhone, black, 512GB')
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context()
page = await context.new_page()
#
# await page.goto(base_url)
# # search_url = base_url + f'search?q={search_query.replace(" ", "+")}'
# # await page.goto(search_url)
# await page.fill("input[name='q']", search_query)
# await page.press("input[name='q']", "Enter")
# await page.wait_for_selector("div.DOjaWF.gdgoEp")
# res = await get_products(base_url, page)
# res = await get_product_page(pro_url, page)
# res = await get_pro_links(base_url, page)
# logger.info(len(res))
# user_filters = [{'name': 'RAM', 'value': {'type': 'multiselect', 'selection': ['4 GB'] }}]
# user_filters = [UserFilter(name='RAM', type='multiselect', selection=['4 GB'])]
user_filters = [UserFilter(name='INTERNAL STORAGE', type='multiselect', selection=['256 GB & Above', '128 - 255.9 GB', '64 - 127.9 GB', '32 - 63.9 GB', '16 - 31.9 GB', '8 - 15.9 GB', '4 - 7.9 GB', 'Less than 1 GB', '256 GB Above'], range=None), UserFilter(name='SIM TYPE', type='multiselect', selection=['Dual Sim', 'Dual Sim(Nano + eSIM)', 'Single Sim'], range=None)]
res = await get_filtered_products(page, base_url, search_query, user_filters, top_k=5)
logger.info(res)
# TSHH6F3Y3XZ7WBN3 | _1sdMkc LFEi7Z
if __name__ == "__main__":
asyncio.run(main())