| | import asyncio |
| | import os |
| | import re |
| | import json |
| | import time |
| | import zipfile |
| | from urllib.parse import urljoin, urlparse |
| | from typing import List, Dict, Any, Optional, Tuple, Set |
| |
|
| | import requests |
| | import pandas as pd |
| | from bs4 import BeautifulSoup |
| | import gradio as gr |
| |
|
| | |
| | |
| | |
| | MAX_CONCURRENCY = 4 |
| | PLAYWRIGHT_WAIT_MS = 1500 |
| | FETCH_RETRIES = 2 |
| | SEARCH_PAGES = 2 |
| | RESULTS_PER_QUERY = 10 |
| | USER_AGENT = ( |
| | "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " |
| | "(KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36" |
| | ) |
| |
|
| | |
| | |
| | |
| | def openai_extract_json(html: str, url: str, fields: List[str], api_key: Optional[str]) -> Optional[List[Dict[str, Any]]]: |
| | if not api_key: |
| | return None |
| | try: |
| | from openai import OpenAI |
| | client = OpenAI(api_key=api_key) |
| | field_hint = ", ".join(fields) if fields else "title, price, image, rating, url" |
| | system = ( |
| | "You are a robust web extractor. Given raw HTML and the page URL, " |
| | "return an array of JSON objects with fields you can infer (and the requested fields if present). " |
| | "Always output strictly valid JSON with double-quoted keys/strings. Include absolute image URLs if possible." |
| | ) |
| | user = ( |
| | f"URL: {url}\n\n" |
| | f"Required fields to attempt: [{field_hint}]\n\n" |
| | "Return JSON array only. Do not include any commentary.\n\n" |
| | f"HTML:\n{html[:180000]}" |
| | ) |
| | resp = client.chat.completions.create( |
| | model="gpt-4o-mini", |
| | messages=[{"role": "system", "content": system}, {"role": "user", "content": user}], |
| | temperature=0, |
| | ) |
| | content = resp.choices[0].message.content.strip() |
| | content = re.sub(r"^```(?:json)?|```$", "", content).strip() |
| | data = json.loads(content) |
| | if isinstance(data, dict): |
| | data = [data] |
| | if isinstance(data, list): |
| | return data |
| | return None |
| | except Exception as e: |
| | print("OpenAI extraction failed:", e) |
| | return None |
| |
|
| | |
| | |
| | |
| | async def _fetch_dom_once(url: str, wait_ms: int) -> str: |
| | from playwright.async_api import async_playwright |
| | async with async_playwright() as p: |
| | browser = await p.chromium.launch(headless=True) |
| | page = await browser.new_page(user_agent=USER_AGENT) |
| | await page.goto(url, wait_until="domcontentloaded", timeout=30000) |
| | try: |
| | await page.wait_for_load_state("networkidle", timeout=8000) |
| | except Exception: |
| | pass |
| | if wait_ms > 0: |
| | await asyncio.sleep(wait_ms / 1000) |
| | html = await page.content() |
| | await browser.close() |
| | return html |
| |
|
| | async def fetch_dom(url: str, wait_ms: int = PLAYWRIGHT_WAIT_MS, retries: int = FETCH_RETRIES) -> str: |
| | last_err = None |
| | for attempt in range(1, retries + 2): |
| | try: |
| | return await _fetch_dom_once(url, wait_ms) |
| | except Exception as e: |
| | last_err = e |
| | await asyncio.sleep(0.6 * attempt) |
| | raise last_err |
| |
|
| | |
| | |
| | |
| | def extract_images_and_items(html: str, base_url: str, card_selector: Optional[str] = None) -> Tuple[List[Dict[str, Any]], List[str]]: |
| | soup = BeautifulSoup(html, "html.parser") |
| |
|
| | |
| | images = [] |
| | for img in soup.find_all("img"): |
| | src = img.get("src") or img.get("data-src") or img.get("data-original") |
| | if not src: |
| | continue |
| | abs_src = urljoin(base_url, src) |
| | images.append(abs_src) |
| |
|
| | |
| | items = [] |
| | if card_selector: |
| | candidates = soup.select(card_selector) |
| | else: |
| | candidates = soup.select( |
| | "div.product, li.product, div.card, article, div.product-item, " |
| | "div.s-result-item, div._1AtVbE, div._4ddWXP, div.MuiCard-root, " |
| | "section, li.grid-item" |
| | ) |
| | if not candidates: |
| | candidates = [a.parent for a in soup.select("a img") if a.parent] |
| |
|
| | for c in candidates: |
| | try: |
| | title = None |
| | for sel in ["h1", "h2", "h3", ".title", ".product-title", "._4rR01T", ".s1Q9rs", "a[title]"]: |
| | n = c.select_one(sel) |
| | if n and n.get_text(strip=True): |
| | title = n.get_text(strip=True) |
| | break |
| | if not title: |
| | img = c.find("img") |
| | if img and img.get("alt"): |
| | title = img.get("alt").strip() |
| |
|
| | price = None |
| | price_text = c.get_text(" ", strip=True) |
| | m = re.search(r"(?:βΉ|Rs\.?|INR|\$|β¬|Β£)\s?\d[\d,]*(?:\.\d+)?", price_text) |
| | if m: |
| | price = m.group(0) |
| |
|
| | link = c.find("a") |
| | href = urljoin(base_url, link.get("href")) if link and link.get("href") else base_url |
| |
|
| | img = c.find("img") |
| | img_src = None |
| | if img: |
| | img_src = img.get("src") or img.get("data-src") or img.get("data-original") |
| | if img_src: |
| | img_src = urljoin(base_url, img_src) |
| |
|
| | if any([title, price, img_src]): |
| | items.append({"title": title, "price": price, "url": href, "image": img_src}) |
| | except Exception: |
| | continue |
| |
|
| | |
| | seen = set() |
| | unique_images = [] |
| | for u in images: |
| | if u not in seen: |
| | seen.add(u) |
| | unique_images.append(u) |
| |
|
| | return items, unique_images |
| |
|
| | |
| | |
| | |
| | def download_images(image_urls: List[str], out_dir: str) -> List[str]: |
| | os.makedirs(out_dir, exist_ok=True) |
| | saved = [] |
| | s = requests.Session() |
| | s.headers.update({"User-Agent": USER_AGENT}) |
| | for u in image_urls: |
| | try: |
| | name = os.path.basename(urlparse(u).path) or f"img_{len(saved)+1}.jpg" |
| | if not os.path.splitext(name)[1]: |
| | name += ".jpg" |
| | path = os.path.join(out_dir, name) |
| | r = s.get(u, timeout=20) |
| | if r.status_code == 200 and r.content: |
| | with open(path, "wb") as f: |
| | f.write(r.content) |
| | saved.append(path) |
| | except Exception as e: |
| | print("Image download failed:", u, e) |
| | return saved |
| |
|
| | def caption_images(paths: List[str]) -> Dict[str, str]: |
| | try: |
| | from transformers import BlipProcessor, BlipForConditionalGeneration |
| | from PIL import Image |
| | import torch |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base") |
| | model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-base").to(device) |
| |
|
| | captions = {} |
| | for p in paths: |
| | try: |
| | im = Image.open(p).convert("RGB") |
| | inputs = processor(im, return_tensors="pt").to(device) |
| | out = model.generate(**inputs, max_new_tokens=40) |
| | text = processor.decode(out[0], skip_special_tokens=True) |
| | captions[p] = text |
| | except Exception as e: |
| | captions[p] = f"(caption failed: {e})" |
| | return captions |
| | except Exception as e: |
| | print("Captioning unavailable:", e) |
| | return {} |
| |
|
| | |
| | |
| | |
| | def zip_paths(paths: List[str], zip_path: str) -> str: |
| | with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: |
| | for p in paths: |
| | if os.path.isfile(p): |
| | zf.write(p, arcname=os.path.basename(p)) |
| | return zip_path |
| |
|
| | |
| | |
| | |
| | ADS_PRESETS = [ |
| | |
| | "site:adsoftheworld.com", |
| | "site:theinspiration.com", |
| | "site:ads-of-the-world.s3", |
| | "site:behance.net ad campaign", |
| | "site:dribbble.com case study ad", |
| | ] |
| |
|
| | NEWS_SIGNAL = [ |
| | "site:news.ycombinator.com", "site:techcrunch.com", "site:theverge.com", |
| | "site:adage.com", "site:campaignlive.com" |
| | ] |
| |
|
| | def build_queries_from_prompt(prompt: str, include_ads_sources: bool) -> List[str]: |
| | |
| | base = re.sub(r"[^a-zA-Z0-9\s:+\-_/\.]", " ", prompt).strip() |
| | base = re.sub(r"\s+", " ", base) |
| |
|
| | core_variants = [ |
| | base, |
| | f'{base} best examples', |
| | f'{base} recent campaigns', |
| | f'{base} case study', |
| | f'{base} images', |
| | ] |
| |
|
| | queries = [] |
| | for v in core_variants: |
| | queries.append(v) |
| | |
| | for ns in NEWS_SIGNAL[:2]: |
| | queries.append(f"{v} {ns}") |
| |
|
| | if include_ads_sources: |
| | for v in core_variants: |
| | for siteq in ADS_PRESETS: |
| | queries.append(f"{v} {siteq}") |
| |
|
| | |
| | seen = set() |
| | uniq = [] |
| | for q in queries: |
| | if q not in seen: |
| | seen.add(q) |
| | uniq.append(q) |
| | return uniq[:12] |
| |
|
| | def ddg_search(query: str, pages: int = 1) -> List[Tuple[str, str]]: |
| | """ |
| | Returns list of (title, url) from DuckDuckGo HTML results, across pages. |
| | """ |
| | results = [] |
| | session = requests.Session() |
| | session.headers.update({"User-Agent": USER_AGENT}) |
| |
|
| | for page in range(pages): |
| | params = {"q": query} |
| | if page > 0: |
| | params["s"] = str(page * 50) |
| | r = session.get("https://duckduckgo.com/html/", params=params, timeout=20) |
| | soup = BeautifulSoup(r.text, "html.parser") |
| | for res in soup.select(".result"): |
| | a = res.select_one(".result__a") |
| | if not a: |
| | continue |
| | title = a.get_text(strip=True) |
| | href = a.get("href") |
| | if not href: |
| | continue |
| | results.append((title, href)) |
| | return results |
| |
|
| | def pick_best_links(all_results: List[Tuple[str, str]], want: int = 10) -> List[str]: |
| | """ |
| | Simple pragmatic ranking: |
| | - de-duplicate by URL & domain |
| | - prefer diverse domains |
| | """ |
| | picked = [] |
| | seen_urls: Set[str] = set() |
| | seen_domains: Set[str] = set() |
| |
|
| | for _, url in all_results: |
| | u = url.strip() |
| | if not u or u in seen_urls: |
| | continue |
| | dom = urlparse(u).netloc.lower() |
| | if dom.startswith("www."): |
| | dom = dom[4:] |
| | |
| | if dom in {"duckduckgo.com"}: |
| | continue |
| | if dom in seen_domains and len(picked) < want // 2: |
| | |
| | continue |
| |
|
| | seen_urls.add(u) |
| | seen_domains.add(dom) |
| | picked.append(u) |
| | if len(picked) >= want: |
| | break |
| | return picked |
| |
|
| | def search_links_from_prompt(prompt: str, include_ads_sources: bool, per_query: int, pages: int) -> List[str]: |
| | queries = build_queries_from_prompt(prompt, include_ads_sources) |
| | all_results: List[Tuple[str, str]] = [] |
| | for q in queries: |
| | try: |
| | res = ddg_search(q, pages=pages) |
| | |
| | all_results.extend(res[:per_query]) |
| | except Exception as e: |
| | print("Search failed for query:", q, e) |
| | continue |
| | |
| | best = pick_best_links(all_results, want=max(5, per_query * 2)) |
| | return best |
| |
|
| | |
| | |
| | |
| | async def scrape_one(url: str, fields: List[str], use_llm: bool, api_key: Optional[str], |
| | card_selector: Optional[str], log: List[str], sem: asyncio.Semaphore) -> Dict[str, Any]: |
| | async with sem: |
| | try: |
| | html = await fetch_dom(url) |
| | except Exception as e: |
| | log.append(f"[ERROR] Failed to load: {url} -> {e}") |
| | return {"url": url, "html": "", "items": [], "images": [], "llm_rows": []} |
| |
|
| | items, images = [], [] |
| | try: |
| | items, images = extract_images_and_items(html, url, card_selector) |
| | except Exception as e: |
| | log.append(f"[WARN] Parse issue on: {url} -> {e}") |
| |
|
| | llm_rows = [] |
| | if use_llm: |
| | try: |
| | llm_rows = openai_extract_json(html, url, fields, api_key) or [] |
| | except Exception as e: |
| | log.append(f"[WARN] LLM extraction failed: {url} -> {e}") |
| |
|
| | return {"url": url, "html": html, "items": items, "images": images, "llm_rows": llm_rows} |
| |
|
| | def to_dataframe(rows: List[Dict[str, Any]]) -> pd.DataFrame: |
| | if not rows: |
| | return pd.DataFrame() |
| | all_keys = set() |
| | for r in rows: |
| | all_keys.update(r.keys()) |
| | ordered = [] |
| | for r in rows: |
| | d = {k: r.get(k) for k in all_keys} |
| | ordered.append(d) |
| | df = pd.DataFrame(ordered) |
| | preferred = [k for k in ["title", "name", "price", "rating", "image", "url"] if k in df.columns] |
| | others = [c for c in df.columns if c not in preferred] |
| | df = df[preferred + others] |
| | return df |
| |
|
| | |
| | |
| | |
| | def run_scrape(input_mode: str, |
| | prompt_or_urls: str, |
| | fields_text: str, |
| | card_selector: str, |
| | include_ads_sources: bool, |
| | per_query_results: int, |
| | search_pages: int, |
| | use_llm: bool, |
| | api_key: str, |
| | download_imgs: bool, |
| | do_caption: bool): |
| | start = time.time() |
| | log: List[str] = [] |
| |
|
| | |
| | if input_mode == "Prompt": |
| | if not prompt_or_urls.strip(): |
| | return pd.DataFrame(), [], None, None, None, "Enter a prompt.", "No prompt given." |
| | log.append(f"[INFO] Building queries from prompt: {prompt_or_urls!r}") |
| | urls = search_links_from_prompt( |
| | prompt_or_urls.strip(), |
| | include_ads_sources=include_ads_sources, |
| | per_query=per_query_results, |
| | pages=max(1, search_pages) |
| | ) |
| | if not urls: |
| | return pd.DataFrame(), [], None, None, None, "No links found.", "\n".join(log) |
| | log.append(f"[INFO] Selected {len(urls)} links from search.") |
| | else: |
| | urls = [u.strip() for u in prompt_or_urls.splitlines() if u.strip()] |
| | if not urls: |
| | return pd.DataFrame(), [], None, None, None, "Enter at least one URL.", "No URLs supplied." |
| | log.append(f"[INFO] Using {len(urls)} direct URL(s).") |
| |
|
| | fields = [f.strip() for f in fields_text.split(',')] if fields_text.strip() else [] |
| |
|
| | out_dir = os.path.abspath("scrape_output") |
| | os.makedirs(out_dir, exist_ok=True) |
| |
|
| | |
| | sem = asyncio.Semaphore(MAX_CONCURRENCY) |
| |
|
| | async def gather_all(): |
| | tasks = [ |
| | scrape_one(u, fields, use_llm, api_key if use_llm else None, card_selector or None, log, sem) |
| | for u in urls |
| | ] |
| | return await asyncio.gather(*tasks) |
| |
|
| | try: |
| | scraped = asyncio.run(gather_all()) |
| | except RuntimeError: |
| | scraped = asyncio.get_event_loop().run_until_complete(gather_all()) |
| | except Exception as e: |
| | log.append(f"[FATAL] Async run failed: {e}") |
| | return pd.DataFrame(), [], None, None, None, "Run failed.", "\n".join(log) |
| |
|
| | heuristic_rows: List[Dict[str, Any]] = [] |
| | llm_rows: List[Dict[str, Any]] = [] |
| | all_images: List[str] = [] |
| |
|
| | for s in scraped: |
| | if not isinstance(s, dict): |
| | continue |
| | heuristic_rows.extend(s.get("items", [])) |
| | llm_rows.extend(s.get("llm_rows", [])) |
| | all_images.extend(s.get("images", [])) |
| |
|
| | |
| | rows = llm_rows if use_llm and llm_rows else heuristic_rows |
| | df = to_dataframe(rows) |
| |
|
| | ts = int(time.time()) |
| | json_path = os.path.join(out_dir, f"scrape_{ts}.json") |
| | csv_path = os.path.join(out_dir, f"scrape_{ts}.csv") |
| | try: |
| | df.to_csv(csv_path, index=False) |
| | with open(json_path, "w", encoding="utf-8") as f: |
| | json.dump(rows, f, ensure_ascii=False, indent=2) |
| | except Exception as e: |
| | log.append(f"[WARN] Failed to save CSV/JSON: {e}") |
| | json_path = None |
| | csv_path = None |
| |
|
| | gallery_paths, zip_path = [], None |
| | if download_imgs and all_images: |
| | try: |
| | img_dir = os.path.join(out_dir, f"images_{ts}") |
| | saved = download_images(all_images, img_dir) |
| | gallery_paths = saved[:120] |
| | if do_caption and saved: |
| | try: |
| | captions_map = caption_images(saved) |
| | if not df.empty: |
| | img_col = None |
| | for c in df.columns: |
| | if c.lower() in ("image", "image_url", "img", "imageurl"): |
| | img_col = c |
| | break |
| | if img_col: |
| | def _map_caption(u): |
| | if not u: |
| | return "" |
| | fname = os.path.basename(urlparse(str(u)).path) |
| | return captions_map.get(os.path.join(img_dir, fname), "") |
| | df["caption"] = df[img_col].map(_map_caption) |
| | df.to_csv(csv_path, index=False) |
| | with open(json_path, "w", encoding="utf-8") as f: |
| | json.dump(json.loads(df.to_json(orient="records")), f, ensure_ascii=False, indent=2) |
| | except Exception as e: |
| | log.append(f"[WARN] Captioning failed: {e}") |
| |
|
| | zip_path = os.path.join(out_dir, f"images_{ts}.zip") |
| | try: |
| | zip_paths(saved, zip_path) |
| | except Exception as e: |
| | log.append(f"[WARN] ZIP failed: {e}") |
| | zip_path = None |
| | except Exception as e: |
| | log.append(f"[WARN] Image pipeline failed: {e}") |
| |
|
| | elapsed = round(time.time() - start, 2) |
| | gallery_data = [(p, os.path.basename(p)) for p in gallery_paths] |
| | status = f"Scraped {len(urls)} URL(s) β’ Rows: {len(df)} β’ Images found: {len(all_images)} β’ Time: {elapsed}s" |
| | return df, gallery_data, (json_path if json_path and os.path.isfile(json_path) else None), \ |
| | (csv_path if csv_path and os.path.isfile(csv_path) else None), \ |
| | (zip_path if zip_path and os.path.isfile(zip_path) else None), \ |
| | status, "\n".join(log) if log else "OK" |
| |
|
| | |
| | |
| | |
| | with gr.Blocks(title="AI Scraper β Prompt β Best Links β Text+Images", css=".gradio-container {max-width: 1200px !important}") as demo: |
| | gr.Markdown(""" |
| | # π·οΈ AI-Powered Prompt Scraper (2025) |
| | - Give a **prompt** (e.g., "Gen Z pink organic skincare ad campaign in India 2024") |
| | β we search smartly, pick strong links (optionally ad archives), and scrape **text + images** |
| | - Or switch to **Direct URLs** mode and paste URLs. |
| | - Optional **LLM semantic parsing** to structured JSON. |
| | """) |
| |
|
| | with gr.Row(): |
| | input_mode = gr.Radio(choices=["Prompt", "Direct URLs"], value="Prompt", label="Input Mode") |
| |
|
| | with gr.Row(): |
| | prompt_or_urls = gr.Textbox( |
| | label="Prompt (or URLs if in Direct mode)", |
| | placeholder="e.g., gen z pink skincare ad campaign india 2024" |
| | ) |
| |
|
| | with gr.Row(): |
| | fields = gr.Textbox(label="Fields to extract (comma-separated)", placeholder="title, price, image, rating, url") |
| | card_selector = gr.Textbox(label="Optional CSS selector for item cards", placeholder="div.product, article, .card") |
| |
|
| | with gr.Row(): |
| | include_ads_sources = gr.Checkbox(label="Bias search towards ad archives/sources", value=True) |
| | per_query_results = gr.Slider(1, 15, value=6, step=1, label="Top results to keep per query") |
| | search_pages = gr.Slider(1, 3, value=2, step=1, label="Search pages per query (DDG)") |
| |
|
| | with gr.Row(): |
| | use_llm = gr.Checkbox(label="Use OpenAI for semantic extraction", value=False) |
| | api_key = gr.Textbox(label="OpenAI API Key (if using LLM)", type="password") |
| |
|
| | with gr.Row(): |
| | download_imgs = gr.Checkbox(label="Download images", value=True) |
| | do_caption = gr.Checkbox(label="Caption images (slow)", value=False) |
| |
|
| | run_btn = gr.Button("π Run Scraper", variant="primary") |
| |
|
| | with gr.Row(): |
| | table = gr.Dataframe(label="Extracted Data (preview)", interactive=False) |
| | gallery = gr.Gallery(label="Scraped Images (subset)", show_label=True, height=420, allow_preview=True) |
| |
|
| | with gr.Row(): |
| | json_file = gr.File(label="Download JSON") |
| | csv_file = gr.File(label="Download CSV") |
| | zip_file = gr.File(label="Download Images ZIP") |
| |
|
| | status = gr.Markdown("Ready.") |
| | logs = gr.Textbox(label="Run Logs", lines=10) |
| |
|
| | run_btn.click( |
| | fn=run_scrape, |
| | inputs=[ |
| | input_mode, prompt_or_urls, fields, card_selector, |
| | include_ads_sources, per_query_results, search_pages, |
| | use_llm, api_key, download_imgs, do_caption |
| | ], |
| | outputs=[table, gallery, json_file, csv_file, zip_file, status, logs] |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |