Spaces:
Sleeping
Sleeping
| import json | |
| from os import name, wait | |
| from warnings import filters | |
| import playwright | |
| import asyncio | |
| import time | |
| import re | |
| import random | |
| from typing import List, Optional, Dict, Any, Tuple | |
| from playwright.async_api import async_playwright | |
| from pydantic_models import ProductReview, UserFilter, Product, ProductClass | |
| import logging | |
| import sys | |
| logging.basicConfig(stream=sys.stderr, level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| async def get_product_page(pro_url:str, page: Any) -> Product | None: | |
| try: | |
| await page.goto(pro_url) | |
| # await page.wait_for_selector('div._39kFie.N3De93.JxFEK3._48O0EI') | |
| pro_visuals = page.locator('div.DOjaWF.gdgoEp.col-5-12.MfqIAz') | |
| # await pro_visuals.first.wait_for(state='attached', timeout=1000) | |
| # img_ele = pro_visuals.locator('img.DByuf4.IZexXJ.jLEJ7H') | |
| img_ele = pro_visuals.locator('img').first | |
| img = await img_ele.get_attribute('src') if await img_ele.count() else None | |
| pro_desc = page.locator('div.DOjaWF.gdgoEp.col-8-12') | |
| pro_intro = page.locator('div.C7fEHH') | |
| name_ele = pro_intro.locator('h1._6EBuvT') | |
| name = await name_ele.inner_text() | |
| price_ele = pro_intro.locator('div.Nx9bqj.CxhGGd') | |
| price = await price_ele.inner_text() if price_ele else None | |
| rat_ele = pro_intro.locator('div.XQDdHH') # span.Y1HWO0 | |
| if await rat_ele.count(): | |
| rat = await rat_ele.inner_text() | |
| num_ele = pro_intro.locator('span.Wphh3N') | |
| if await num_ele.count(): | |
| num_rat, num_rev = re.findall(r'[\d,]+', await num_ele.inner_text()) | |
| else: | |
| num_rat, num_rev = '-1', '-1' | |
| else: | |
| rat = '0.0' | |
| num_rat, num_rev = '-1', '-1' | |
| prod_rev = ProductReview.model_validate({'ratings':rat , 'num_ratings': num_rat, 'num_reviews': num_rev}) | |
| high = [] | |
| product = Product.model_validate({'name': name, 'price': price, 'url': pro_url, 'image': img, 'review': prod_rev, 'details': high}) | |
| return product | |
| except Exception as e: | |
| logger.error(f"Error while fetching {pro_url} deets: {e}") | |
| async def get_products(base_url: str , page: Any) -> List[Product] | None: | |
| try: | |
| tiles = await page.query_selector_all("div.cPHDOP") | |
| logger.info(f'Tiles: {len(tiles)} | {tiles}') | |
| products = [] | |
| for idx,tile in enumerate(tiles): | |
| try: | |
| link_ele = await tile.query_selector("a.CGtC98") | |
| link = await link_ele.get_attribute("href") if link_ele else None | |
| if link: | |
| link = base_url + link | |
| logger.info('link:', link) | |
| # Product name | |
| name_ele = await tile.query_selector("div.KzDlHZ") | |
| name = await name_ele.inner_text() if name_ele else None | |
| logger.info('name:', name) | |
| # Price | |
| price_ele = await tile.query_selector("div.Nx9bqj._4b5DiR") | |
| price = await price_ele.inner_text() if price_ele else None | |
| logger.info('price:', price) | |
| # Image | |
| img_ele = await tile.query_selector("img.DByuf4") | |
| image = await img_ele.get_attribute("src") if img_ele else None | |
| logger.info('image:', image) | |
| #Ratings | |
| rat_ele = await tile.query_selector('div.XQDdHH') | |
| rat = await rat_ele.inner_text() if rat_ele else 0.0 | |
| num_ele = await tile.query_selector('span.Wphh3N') | |
| num_rat, num_rev = re.findall(r'[\d,]+', await num_ele.inner_text()) if num_ele else ('-1', '-1') | |
| logger.info('ratings:', rat, num_rat, num_rev) | |
| #Highlights | |
| # ul_ele = await tile.query_selector('ul.G4BRas') | |
| # if ul_ele: | |
| li_ele = await tile.query_selector_all('li.J\\+igdf') | |
| high = [] | |
| for li in li_ele: | |
| if li: | |
| high.append(await li.inner_text()) | |
| logger.info('high:', high) | |
| # logger.info(f"{idx}. {name} | {price} | {link} | {image} | {rat} | {num_rat} | {num_rev} | {high}") | |
| if name and price and link and image: | |
| # logger.info({'name': name, 'price': price, 'link': link, 'image': image, 'ratings': rat, 'num_ratings': num_rat, 'num_reviews': num_rev, 'mini_deets': high}) | |
| prod_rev = ProductReview.model_validate( | |
| {'ratings':rat , 'num_ratings': num_rat, 'num_reviews': num_rev} | |
| ) | |
| product = Product.model_validate( | |
| {'name': name, 'price': price, 'url': link, 'image': image, 'review': prod_rev, 'details': high} | |
| ) | |
| products.append(product) | |
| except Exception as e: | |
| logger.error(f'Error processing tile {idx}: {e}') | |
| return products | |
| except Exception as e: | |
| logger.error(f"Error during browser setup or navigation: {e}") | |
| await page.screenshot(path="error_screenshot.png") | |
| await page.pause() | |
| async def get_filters(page: Any, base_url: str, search_query:str) -> List[UserFilter] | None: | |
| # async with async_playwright() as p: | |
| try: | |
| # browser = await p.chromium.launch(headless=False) | |
| # context = await browser.new_context() | |
| # page = await context.new_page() | |
| await page.goto(base_url) | |
| await page.fill("input[name='q']", search_query) | |
| await page.press("input[name='q']", "Enter") | |
| await page.wait_for_selector('section._2OLUF3') | |
| # filters = await page.query_selector_all('section._2OLUF3') | |
| filters = page.locator('section._2OLUF3') | |
| fcount = await filters.count() | |
| logger.info(f'Filters: {fcount}') | |
| toggles = page.locator("svg.ukzDZP") | |
| site_filters = [] | |
| for i in range(2, fcount): | |
| try: | |
| filter = filters.nth(i) | |
| fname = filter.locator('div.fxf7w6.rgHxCQ') | |
| fname_count = await fname.count() | |
| if not fname or fname_count < 1: | |
| continue | |
| fname = (await fname.inner_text()).strip() | |
| logger.info(f'Processing filter {fname} ...') | |
| is_exp = filter.locator('div.SDsN9S') | |
| if await is_exp.count() < 1: | |
| logger.info(f'click click click ...') | |
| header_toggle = filter.locator('svg.ukzDZP') | |
| await header_toggle.click() | |
| # await toggles.nth(i).click() | |
| await page.wait_for_timeout(500) | |
| sel = filter.locator('div.ewzVkT._3DvUAf') | |
| rang = filter.locator('div._0vP2OD') | |
| if sel: | |
| opt = [(await sel.nth(e).inner_text()).strip() for e in range(await sel.count())] | |
| # filterval = { | |
| # 'type': 'multiselect', | |
| # 'selection': opt, | |
| # 'range': None | |
| # } | |
| site_filter = UserFilter.model_validate({'name': fname, 'type': 'multiselect' , 'selection': opt}) | |
| # elif rang: | |
| # opt = [(await rang.nth(e).inner_text()).strip() for e in range(await rang.count())] | |
| # filterval = { | |
| # 'type': 'range', | |
| # 'selection': None, | |
| # 'range': opt | |
| # } | |
| # site_filter = UserFilter.model_validate({'name': fname, 'type': 'range' , 'range': opt}) | |
| # logger.info(f'Pre Model: name: {fname}, value: {filterval}') | |
| # site_filter = UserFilter.model_validate({'name': fname, 'value': filterval}) | |
| logger.info(f'Processed ! \n{site_filter}') | |
| site_filters.append(site_filter) | |
| except Exception as e: | |
| logger.error(f'Oops !!: {e}') | |
| return site_filters | |
| # all_texts = await filter.evaluate(""" | |
| # (element) => { | |
| # const texts = []; | |
| # // Iterate over all child nodes of the element | |
| # element.childNodes.forEach(node => { | |
| # // Get the text content and remove leading/trailing whitespace | |
| # const text = node.textContent.trim(); | |
| # // Add it to our list only if it's not an empty string | |
| # if (text) { | |
| # texts.push(text); | |
| # } | |
| # }); | |
| # return texts; | |
| # } | |
| # """) | |
| # sel = re.split(r'(?=[A-Z0-9])', all_texts[-1]) | |
| # sel = [s.strip() for s in sel if s] | |
| # sel = all_texts[-1] | |
| # print(f'{idx}. Filter: {all_texts[0]} | txt: {sel}') | |
| # For Range : _0vP2OD , Selection : ewzVkT _3DvUAf | |
| except Exception as e: | |
| logger.error(f"Error during browser setup or navigation: {e}") | |
| await page.screenshot(path="error_screenshot.png") | |
| await page.pause() | |
| return [] | |
| async def get_filtered_products(page: Any, base_url: str, search_query: str, user_filters: List[UserFilter] | None, top_k:int = 10) -> List[Product] | None: | |
| # async with async_playwright() as p: | |
| try: | |
| await page.goto(base_url) | |
| # search_url = base_url + f'search?q={search_query.replace(" ", "+")}' | |
| # await page.goto(search_url) | |
| await page.fill("input[name='q']", search_query) | |
| await page.press("input[name='q']", "Enter") | |
| await page.wait_for_selector("div.DOjaWF.gdgoEp") | |
| if user_filters: | |
| filters = page.locator('section._2OLUF3') | |
| fcount = await filters.count() | |
| user_fnames = [ f.name for f in user_filters] | |
| user_fn2vals = {f.name: f.selection for f in user_filters} | |
| for i in range(2, fcount): | |
| try: | |
| filter = filters.nth(i) | |
| fname = filter.locator('div.fxf7w6.rgHxCQ') | |
| fname_count = await fname.count() | |
| if not fname or fname_count < 1: | |
| continue | |
| fname = (await fname.inner_text()).strip() | |
| logger.info(f'Applying {fname} filter') | |
| if fname not in user_fnames: | |
| continue | |
| logger.info(f'Applying filter {fname} ...') | |
| is_exp = filter.locator('div.SDsN9S') | |
| if await is_exp.count() < 1: | |
| logger.info(f'click click click ...') | |
| header_toggle = filter.locator('svg.ukzDZP') | |
| await header_toggle.click() | |
| # await toggles.nth(i).click() | |
| # await page.wait_for_timeout(500) | |
| # when you know desired values: | |
| vals = user_fn2vals.get(fname, None) | |
| if vals: | |
| for wanted in vals: | |
| try: | |
| # restrict the search to this filter's option elements | |
| locator = filter.locator('div.ewzVkT._3DvUAf', has_text=wanted) | |
| # small wait — the locator will throw quickly if not found | |
| await locator.first.wait_for(state='attached', timeout=1000) | |
| await locator.first.scroll_into_view_if_needed() | |
| await locator.first.click(timeout=1000) | |
| logger.info(f"Selected {wanted} in {fname}") | |
| except Exception as e: | |
| logger.error(f"couldn't select {wanted} in {fname}: {e}") | |
| # sel = filter.locator('div.ewzVkT._3DvUAf') | |
| # rang = filter.locator('div._0vP2OD') | |
| # if sel: | |
| # ocount = await sel.count() | |
| # for e in range(ocount): | |
| # try: | |
| # # filtername = (await sel.nth(e).inner_text()).strip() | |
| # filtername = await sel.nth(e).text_content(timeout=1000) | |
| # filtername = filtername.strip() if filtername else '' | |
| # logger.info(f'Trying {filtername} uwu') | |
| # except Exception as e: | |
| # logger.info(f'f: {e}') | |
| # continue | |
| # # locator = filter.locator('div.ewzVkT._3DvUAf', has_text=filtername) | |
| # # await locator.wait_for(timeout=5000) | |
| # # await locator.click() | |
| # # logger.info(f"Selected {filtername} in {fname} ...") | |
| # if user_fn2vals[fname] and filtername in user_fn2vals[fname]: | |
| # await sel.nth(e).scroll_into_view_if_needed() | |
| # await sel.nth(e).click() | |
| # # await page.wait_for_timeout(500) | |
| # logger.info(f'Selected {filtername} in {fname} ...') | |
| # elif rang: | |
| # opt = [(await rang.nth(e).inner_text()).strip() for e in range(await rang.count())] | |
| # filterval = { | |
| # 'type': 'range', | |
| # 'range': opt | |
| # } | |
| # await page.wait_for_timeout(1000) | |
| except Exception as e: | |
| logger.error(f'Oops !!: {e}') | |
| pro_links = await get_pro_links(base_url, page) | |
| products = [] | |
| for pro_link in pro_links[:top_k]: | |
| products.append( await get_product_page(pro_link, page)) | |
| return products | |
| # return await get_products(base_url, page) | |
| except Exception as e: | |
| logger.error(f'Oopsie ! {e}') | |
| async def get_pro_links(base_url: str, page:Any) -> List[str]: | |
| await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") | |
| link_eles = await page.locator("div.DOjaWF.gdgoEp div.cPHDOP a").all() | |
| # link_eles = await page.locator("a.CGtC98").all() | |
| # link_count = await link_eles.count() | |
| # if link_count < 1: | |
| # link_eles = page.locator("a.CGtC98") | |
| # link_count = await link_eles.count() | |
| logger.info(f'{len(link_eles)} product links fetched !') | |
| links = [] | |
| for i,link_ele in enumerate(link_eles): | |
| try: | |
| # link = await link_eles.nth(e).get_attribute("href") | |
| link = await link_ele.get_attribute("href") | |
| if link and 'page=' not in link and 'search?' not in link and len(link) > 150: | |
| link = base_url + link | |
| # logger.info(f'{e}. {link}') | |
| if link not in links: | |
| links.append(link) | |
| except Exception as e: | |
| logger.error(f'Oopsie in get_pro_links: {e}') | |
| logger.info(f'links: {len(links)}') | |
| return links[1:] | |
| async def playwright_enter() -> Tuple: | |
| context_man = async_playwright() | |
| playwright = await context_man.__aenter__() | |
| browser = await playwright.chromium.launch(headless=True) #False for browser | |
| context = await browser.new_context() | |
| page = await context.new_page() | |
| page.set_default_timeout(15000) # 6 seconds for all waits | |
| page.set_default_navigation_timeout(15000) | |
| return context_man, playwright, browser, context, page | |
| async def playwright_exit(context_man) -> None: | |
| await context_man.__aexit__(None, None, None) | |
| async def main(): | |
| base_url = 'https://www.flipkart.com' | |
| search_query = 'Real Madrid 16/17 blue jersey' | |
| pro_url = 'https://www.flipkart.com/apple-iphone-16-black-256-gb/p/itm86da1977dcdf1?pid=MOBH4DQFZCJJXUFG&lid=LSTMOBH4DQFZCJJXUFGO5DY3W&marketplace=FLIPKART&q=iphones&store=tyy%2F4io&spotlightTagId=default_BestsellerId_tyy%2F4io&srno=s_1_1&otracker=search&otracker1=search&fm=Search&iid=ebc6ba69-a89c-454d-89fc-28840c990f28.MOBH4DQFZCJJXUFG.SEARCH&ppt=sp&ppn=sp&ssid=kbfoyqc2aiya8utc1758898276696&qH=3e7fa8c51e2e4986' | |
| pro_url = """https://www.flipkart.com/yellowvibes-printed-typography-men-polo-neck-white-t-shirt/p/itm1a98ebe03b5ec?pid=TSHGWEPXUAPRMKYR&lid=LSTTSHGWEPXUAPRMKYRQIO4IO&marketplace=FLIPKART&q=Real+Madrid+16%2F17+blue+jersey | |
| &store=clo%2Fash%2Fank&srno=s_1_1&otracker=search&otracker1=search&fm=organic&iid=en_oFB3vt2XCktASEkPfAibC4Z1DeYhIw7ZGQqZLNQPWOWPAg129Gg2Hhmsqf-_kVmqB9SCXfJYY1jeUE-T3l3eNg%3D%3D&ppt=None&ppn=None&ssid=z37yvkk2tc0 | |
| 000001758907408893&qH=b0f06ba87382280b | |
| """ | |
| # yellowvibes | |
| # await get_filters(base_url, 'iphones !latest one black with 512 gb into latest iPhone, black, 512GB') | |
| async with async_playwright() as p: | |
| browser = await p.chromium.launch(headless=False) | |
| context = await browser.new_context() | |
| page = await context.new_page() | |
| # | |
| # await page.goto(base_url) | |
| # # search_url = base_url + f'search?q={search_query.replace(" ", "+")}' | |
| # # await page.goto(search_url) | |
| # await page.fill("input[name='q']", search_query) | |
| # await page.press("input[name='q']", "Enter") | |
| # await page.wait_for_selector("div.DOjaWF.gdgoEp") | |
| # res = await get_products(base_url, page) | |
| # res = await get_product_page(pro_url, page) | |
| # res = await get_pro_links(base_url, page) | |
| # logger.info(len(res)) | |
| # user_filters = [{'name': 'RAM', 'value': {'type': 'multiselect', 'selection': ['4 GB'] }}] | |
| # user_filters = [UserFilter(name='RAM', type='multiselect', selection=['4 GB'])] | |
| user_filters = [UserFilter(name='INTERNAL STORAGE', type='multiselect', selection=['256 GB & Above', '128 - 255.9 GB', '64 - 127.9 GB', '32 - 63.9 GB', '16 - 31.9 GB', '8 - 15.9 GB', '4 - 7.9 GB', 'Less than 1 GB', '256 GB Above'], range=None), UserFilter(name='SIM TYPE', type='multiselect', selection=['Dual Sim', 'Dual Sim(Nano + eSIM)', 'Single Sim'], range=None)] | |
| res = await get_filtered_products(page, base_url, search_query, user_filters, top_k=5) | |
| logger.info(res) | |
| # TSHH6F3Y3XZ7WBN3 | _1sdMkc LFEi7Z | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |