Spaces:
Sleeping
Sleeping
File size: 6,740 Bytes
82f5373 2db3fe7 82f5373 2db3fe7 82f5373 2db3fe7 82f5373 2db3fe7 82f5373 2db3fe7 82f5373 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel
import requests
from bs4 import BeautifulSoup
import pandas as pd
import io
import re
app = FastAPI(title="Universal Web Scraper API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount("/static", StaticFiles(directory="/code/static"), name="static")
@app.get("/")
async def read_root():
return FileResponse("/code/static/index.html")
class ScrapeRequest(BaseModel):
url: str
mode: str = "table"
def scrape_table(soup: BeautifulSoup):
tables = soup.find_all("table")
if not tables:
raise HTTPException(status_code=400, detail="No table found on page")
table = max(tables, key=lambda t: len(t.find_all("tr")))
headers = []
header_row = table.find("tr")
if header_row:
for th in header_row.find_all(["th", "td"]):
headers.append(th.get_text(strip=True))
if not headers:
first_data_row = table.find("tr")
if not first_data_row:
raise HTTPException(status_code=400, detail="Empty table")
cols = len(first_data_row.find_all("td"))
headers = [f"col_{i+1}" for i in range(cols)]
rows = []
for tr in table.find_all("tr")[1:]:
cells = tr.find_all("td")
if not cells:
continue
row = [c.get_text(strip=True) for c in cells]
if len(row) < len(headers):
row += [""] * (len(headers) - len(row))
elif len(row) > len(headers):
row = row[:len(headers)]
rows.append(row)
df = pd.DataFrame(rows, columns=headers)
return df
def scrape_links(soup: BeautifulSoup):
links = []
for a in soup.find_all("a"):
text = a.get_text(strip=True)
href = a.get("href", "")
if not href:
continue
links.append({"text": text, "href": href})
if not links:
raise HTTPException(status_code=400, detail="No links found")
df = pd.DataFrame(links)
return df
def scrape_all_content(soup: BeautifulSoup):
# IMPROVED: Extract only meaningful product/content data
data = []
# Remove unwanted elements (navigation, scripts, styles, ads)
for tag in soup(["script", "style", "nav", "header", "footer", "aside", "iframe"]):
tag.decompose()
# Try to find product/article containers first (common e-commerce patterns)
product_containers = soup.find_all(
attrs={
"class": re.compile(r"product|item|card|listing|article", re.I)
}
)
# If we find product containers, extract from them
if product_containers and len(product_containers) > 5:
for container in product_containers[:100]: # Limit to first 100 items
# Extract title/name
title_elem = container.find(["h1", "h2", "h3", "h4", "a"],
attrs={"class": re.compile(r"title|name|heading", re.I)})
title = title_elem.get_text(strip=True) if title_elem else ""
# Extract price
price_elem = container.find(attrs={"class": re.compile(r"price|cost|amount", re.I)})
price = price_elem.get_text(strip=True) if price_elem else ""
# Extract description
desc_elem = container.find(["p", "div"],
attrs={"class": re.compile(r"desc|detail|summary", re.I)})
description = desc_elem.get_text(strip=True)[:200] if desc_elem else ""
# Extract link
link_elem = container.find("a", href=True)
link = link_elem["href"] if link_elem else ""
if title or price: # Only add if we have meaningful data
data.append({
"Title": title[:200],
"Price": price[:50],
"Description": description,
"Link": link[:300]
})
# Fallback: If no product containers found, extract main content
else:
# Look for main content area
main_content = soup.find(["main", "article", "div"],
attrs={"id": re.compile(r"main|content|primary", re.I)}) or soup
# Extract headings and associated content
for heading in main_content.find_all(["h1", "h2", "h3"]):
heading_text = heading.get_text(strip=True)
if len(heading_text) > 5: # Skip very short headings
# Get next sibling paragraph or div
content = ""
next_elem = heading.find_next_sibling(["p", "div", "ul"])
if next_elem:
content = next_elem.get_text(strip=True)[:300]
data.append({
"Title": heading_text[:200],
"Price": "",
"Description": content,
"Link": ""
})
if not data:
raise HTTPException(status_code=400, detail="No meaningful content found on page. Try 'Tables' or 'Links' mode instead.")
# Remove exact duplicates
df = pd.DataFrame(data)
df = df.drop_duplicates(subset=["Title"], keep="first")
return df
@app.post("/scrape")
def scrape_to_excel(req: ScrapeRequest):
try:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}
resp = requests.get(req.url, headers=headers, timeout=15)
except Exception:
raise HTTPException(status_code=400, detail="Could not fetch URL")
if resp.status_code != 200:
raise HTTPException(status_code=400, detail=f"Bad status code: {resp.status_code}")
soup = BeautifulSoup(resp.text, "html.parser")
if req.mode == "table":
df = scrape_table(soup)
elif req.mode == "links":
df = scrape_links(soup)
elif req.mode == "content":
df = scrape_all_content(soup)
else:
raise HTTPException(status_code=400, detail="Unsupported mode")
output = io.BytesIO()
with pd.ExcelWriter(output, engine="openpyxl") as writer:
df.to_excel(writer, index=False, sheet_name="data")
output.seek(0)
headers = {"Content-Disposition": 'attachment; filename="scraped_data.xlsx"'}
return StreamingResponse(
output,
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
headers=headers,
) |