Spaces:
Sleeping
Sleeping
File size: 7,992 Bytes
cfa4580 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | import os
import asyncio
from fastapi import FastAPI, HTTPException
from playwright.async_api import async_playwright
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
app = FastAPI(title="Pinterest Scraper API")
# Cache for search results (optional optimization)
search_cache = {}
CACHE_DURATION = 300 # 5 minutes
class ScrapeRequest(BaseModel):
keyword: str
count: int = 10
aspect_ratio: str = None # Options: "9:16", "16:9", "1:1", "4:5", "any"
class ScrapeResponse(BaseModel):
success: bool
message: str
images: List[dict] # Each image has url, width, height, aspect_ratio
keyword: str
def check_aspect_ratio(width: int, height: int, target_ratio: str) -> bool:
"""Check if image matches target aspect ratio within tolerance."""
if not target_ratio or target_ratio == "any":
return True
current_ratio = width / height
ratios = {
"9:16": 9/16, # Vertical (Shorts/Reels)
"16:9": 16/9, # Horizontal (Landscape)
"1:1": 1/1, # Square
"4:5": 4/5, # Portrait (Instagram)
"3:4": 3/4, # Portrait (Standard)
"21:9": 21/9, # Ultrawide
}
if target_ratio not in ratios:
return True
target = ratios[target_ratio]
tolerance = 0.15 # 15% tolerance
return abs(current_ratio - target) <= tolerance * target
async def scrape_pinterest_api(keyword: str, count: int, aspect_ratio: str = None):
print(f"Starting scrape for '{keyword}', count={count}, ratio={aspect_ratio}")
images = [] # List of dict with url, width, height
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
)
page = await context.new_page()
search_url = f"https://www.pinterest.com/search/pins/?q={keyword.replace(' ', '%20')}"
try:
await page.goto(search_url, timeout=60000)
await page.wait_for_selector("div[data-test-id='pin']", timeout=15000)
except Exception as e:
await browser.close()
return [], str(e)
downloaded_count = 0
seen_urls = set()
last_height = await page.evaluate("document.body.scrollHeight")
no_new_content_count = 0
scroll_attempts = 0
max_scrolls = 8 # Limit scroll attempts
while downloaded_count < count and scroll_attempts < max_scrolls:
# Wait for images to load and get dimensions properly
await page.wait_for_timeout(500) # Let lazy images load
img_data = await page.evaluate("""
() => {
const pins = document.querySelectorAll("div[data-test-id='pin']");
return Array.from(pins).map(pin => {
const img = pin.querySelector('img');
if (!img || !img.src) return null;
// Get actual rendered dimensions from parent container
const rect = pin.getBoundingClientRect();
return {
src: img.src,
// Use container aspect ratio if image not loaded
width: img.naturalWidth || Math.round(rect.width),
height: img.naturalHeight || Math.round(rect.height),
container_width: Math.round(rect.width),
container_height: Math.round(rect.height)
};
}).filter(item => item && item.src.includes('pinimg.com'));
}
""")
for img_info in img_data:
if downloaded_count >= count:
break
src = img_info.get("src", "")
if not src:
continue
# Convert to high-res URL
high_res_url = src.replace("236x", "736x").replace("474x", "736x")
if high_res_url not in seen_urls:
seen_urls.add(high_res_url)
# Use natural dimensions if available, else container dimensions
width = img_info.get("width", 0) or img_info.get("container_width", 0)
height = img_info.get("height", 0) or img_info.get("container_height", 0)
# Check aspect ratio if specified
passes_ratio = True
if aspect_ratio and aspect_ratio != "any":
if width > 0 and height > 0:
passes_ratio = check_aspect_ratio(width, height, aspect_ratio)
print(f"Checking ratio: {width}x{height} = {width/height:.2f} for {aspect_ratio} -> {passes_ratio}")
if passes_ratio:
images.append({
"url": high_res_url,
"width": width,
"height": height,
"aspect_ratio": f"{width}:{height}" if width > 0 else "unknown"
})
downloaded_count += 1
if downloaded_count >= count:
break
scroll_attempts += 1
# Scroll down - reduced wait time
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(800) # Reduced from 2000ms
new_height = await page.evaluate("document.body.scrollHeight")
if new_height == last_height:
no_new_content_count += 1
if no_new_content_count > 3:
break
else:
no_new_content_count = 0
last_height = new_height
await browser.close()
except Exception as e:
print(f"Playwright error: {e}")
return [], str(e)
return images, None
@app.post("/scrape", response_model=ScrapeResponse)
async def scrape(request: ScrapeRequest):
if not request.keyword:
raise HTTPException(status_code=400, detail="Keyword is required")
if request.count < 1 or request.count > 20:
raise HTTPException(status_code=400, detail="Count must be between 1 and 20")
paths, error = await scrape_pinterest_api(request.keyword, request.count, request.aspect_ratio)
if error:
return ScrapeResponse(
success=False,
message=f"Error: {error}",
images=[],
keyword=request.keyword
)
return ScrapeResponse(
success=True,
message=f"Found {len(paths)} images",
images=paths,
keyword=request.keyword
)
@app.get("/health")
async def health():
return {"status": "healthy", "service": "pinterest-scraper-api"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)
|