Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import StreamingResponse, FileResponse | |
| from pydantic import BaseModel | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import pandas as pd | |
| import io | |
| import re | |
| app = FastAPI(title="Universal Web Scraper API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| app.mount("/static", StaticFiles(directory="/code/static"), name="static") | |
| async def read_root(): | |
| return FileResponse("/code/static/index.html") | |
| class ScrapeRequest(BaseModel): | |
| url: str | |
| mode: str = "table" | |
| def scrape_table(soup: BeautifulSoup): | |
| tables = soup.find_all("table") | |
| if not tables: | |
| raise HTTPException(status_code=400, detail="No table found on page") | |
| table = max(tables, key=lambda t: len(t.find_all("tr"))) | |
| headers = [] | |
| header_row = table.find("tr") | |
| if header_row: | |
| for th in header_row.find_all(["th", "td"]): | |
| headers.append(th.get_text(strip=True)) | |
| if not headers: | |
| first_data_row = table.find("tr") | |
| if not first_data_row: | |
| raise HTTPException(status_code=400, detail="Empty table") | |
| cols = len(first_data_row.find_all("td")) | |
| headers = [f"col_{i+1}" for i in range(cols)] | |
| rows = [] | |
| for tr in table.find_all("tr")[1:]: | |
| cells = tr.find_all("td") | |
| if not cells: | |
| continue | |
| row = [c.get_text(strip=True) for c in cells] | |
| if len(row) < len(headers): | |
| row += [""] * (len(headers) - len(row)) | |
| elif len(row) > len(headers): | |
| row = row[:len(headers)] | |
| rows.append(row) | |
| df = pd.DataFrame(rows, columns=headers) | |
| return df | |
| def scrape_links(soup: BeautifulSoup): | |
| links = [] | |
| for a in soup.find_all("a"): | |
| text = a.get_text(strip=True) | |
| href = a.get("href", "") | |
| if not href: | |
| continue | |
| links.append({"text": text, "href": href}) | |
| if not links: | |
| raise HTTPException(status_code=400, detail="No links found") | |
| df = pd.DataFrame(links) | |
| return df | |
| def scrape_all_content(soup: BeautifulSoup): | |
| # IMPROVED: Extract only meaningful product/content data | |
| data = [] | |
| # Remove unwanted elements (navigation, scripts, styles, ads) | |
| for tag in soup(["script", "style", "nav", "header", "footer", "aside", "iframe"]): | |
| tag.decompose() | |
| # Try to find product/article containers first (common e-commerce patterns) | |
| product_containers = soup.find_all( | |
| attrs={ | |
| "class": re.compile(r"product|item|card|listing|article", re.I) | |
| } | |
| ) | |
| # If we find product containers, extract from them | |
| if product_containers and len(product_containers) > 5: | |
| for container in product_containers[:100]: # Limit to first 100 items | |
| # Extract title/name | |
| title_elem = container.find(["h1", "h2", "h3", "h4", "a"], | |
| attrs={"class": re.compile(r"title|name|heading", re.I)}) | |
| title = title_elem.get_text(strip=True) if title_elem else "" | |
| # Extract price | |
| price_elem = container.find(attrs={"class": re.compile(r"price|cost|amount", re.I)}) | |
| price = price_elem.get_text(strip=True) if price_elem else "" | |
| # Extract description | |
| desc_elem = container.find(["p", "div"], | |
| attrs={"class": re.compile(r"desc|detail|summary", re.I)}) | |
| description = desc_elem.get_text(strip=True)[:200] if desc_elem else "" | |
| # Extract link | |
| link_elem = container.find("a", href=True) | |
| link = link_elem["href"] if link_elem else "" | |
| if title or price: # Only add if we have meaningful data | |
| data.append({ | |
| "Title": title[:200], | |
| "Price": price[:50], | |
| "Description": description, | |
| "Link": link[:300] | |
| }) | |
| # Fallback: If no product containers found, extract main content | |
| else: | |
| # Look for main content area | |
| main_content = soup.find(["main", "article", "div"], | |
| attrs={"id": re.compile(r"main|content|primary", re.I)}) or soup | |
| # Extract headings and associated content | |
| for heading in main_content.find_all(["h1", "h2", "h3"]): | |
| heading_text = heading.get_text(strip=True) | |
| if len(heading_text) > 5: # Skip very short headings | |
| # Get next sibling paragraph or div | |
| content = "" | |
| next_elem = heading.find_next_sibling(["p", "div", "ul"]) | |
| if next_elem: | |
| content = next_elem.get_text(strip=True)[:300] | |
| data.append({ | |
| "Title": heading_text[:200], | |
| "Price": "", | |
| "Description": content, | |
| "Link": "" | |
| }) | |
| if not data: | |
| raise HTTPException(status_code=400, detail="No meaningful content found on page. Try 'Tables' or 'Links' mode instead.") | |
| # Remove exact duplicates | |
| df = pd.DataFrame(data) | |
| df = df.drop_duplicates(subset=["Title"], keep="first") | |
| return df | |
| def scrape_to_excel(req: ScrapeRequest): | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} | |
| resp = requests.get(req.url, headers=headers, timeout=15) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Could not fetch URL") | |
| if resp.status_code != 200: | |
| raise HTTPException(status_code=400, detail=f"Bad status code: {resp.status_code}") | |
| soup = BeautifulSoup(resp.text, "html.parser") | |
| if req.mode == "table": | |
| df = scrape_table(soup) | |
| elif req.mode == "links": | |
| df = scrape_links(soup) | |
| elif req.mode == "content": | |
| df = scrape_all_content(soup) | |
| else: | |
| raise HTTPException(status_code=400, detail="Unsupported mode") | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine="openpyxl") as writer: | |
| df.to_excel(writer, index=False, sheet_name="data") | |
| output.seek(0) | |
| headers = {"Content-Disposition": 'attachment; filename="scraped_data.xlsx"'} | |
| return StreamingResponse( | |
| output, | |
| media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| headers=headers, | |
| ) |