Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException, Query | |
| from pydantic import BaseModel | |
| from playwright.async_api import async_playwright | |
| import asyncio | |
| import base64 | |
| import logging | |
| from typing import List, Optional | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="Playwright Web Scraper", description="A simple web scraper using Playwright") | |
| class LinkInfo(BaseModel): | |
| text: str | |
| href: str | |
| class ContactInfo(BaseModel): | |
| emails: List[str] = [] | |
| phones: List[str] = [] | |
| social_media: List[str] = [] | |
| contact_forms: List[str] = [] | |
| class BusinessInfo(BaseModel): | |
| company_name: Optional[str] = None | |
| address: Optional[str] = None | |
| description: Optional[str] = None | |
| industry_keywords: List[str] = [] | |
| class LeadData(BaseModel): | |
| contact_info: ContactInfo | |
| business_info: BusinessInfo | |
| lead_score: int = 0 | |
| technologies: List[str] = [] | |
| class ScrapeResponse(BaseModel): | |
| body_content: Optional[str] = None | |
| screenshot: Optional[str] = None | |
| links: Optional[List[LinkInfo]] = None | |
| page_title: Optional[str] = None | |
| meta_description: Optional[str] = None | |
| lead_data: Optional[LeadData] = None | |
| async def root(): | |
| return { | |
| "message": "π Lead Generation Web Scraper API", | |
| "tagline": "Turn any website into qualified leads", | |
| "endpoints": { | |
| "/scrape": "Extract leads, contacts, and business data from any website", | |
| "/docs": "API documentation" | |
| }, | |
| "example": "/scrape?url=https://example.com&lead_generation=true&screenshot=true", | |
| "lead_generation_features": [ | |
| "π§ Extract email addresses and contact forms", | |
| "π Find phone numbers and contact info", | |
| "π’ Identify company names and addresses", | |
| "π Discover social media profiles", | |
| "β‘ Detect technologies and tools used", | |
| "π Calculate lead quality scores", | |
| "π― Industry keyword extraction" | |
| ], | |
| "basic_features": [ | |
| "π Clean body text extraction", | |
| "π Smart link filtering", | |
| "πΈ Full page screenshots", | |
| "π Page metadata extraction" | |
| ], | |
| "use_cases": [ | |
| "B2B lead generation", | |
| "Sales prospecting", | |
| "Market research", | |
| "Competitor analysis", | |
| "Contact discovery" | |
| ] | |
| } | |
| async def scrape_page( | |
| url: str = Query(..., description="URL to scrape"), | |
| lead_generation: bool = Query(True, description="Extract lead generation data (emails, phones, business info)"), | |
| screenshot: bool = Query(True, description="Take a full page screenshot"), | |
| get_links: bool = Query(True, description="Extract all links from the page"), | |
| get_body: bool = Query(False, description="Extract body tag content (can be large)") | |
| ): | |
| logger.info(f"Starting scrape for URL: {url}") | |
| try: | |
| async with async_playwright() as p: | |
| logger.info("Launching browser...") | |
| browser = await p.chromium.launch( | |
| headless=True, | |
| args=[ | |
| '--no-sandbox', | |
| '--disable-setuid-sandbox', | |
| '--disable-dev-shm-usage', | |
| '--disable-accelerated-2d-canvas', | |
| '--no-first-run', | |
| '--no-zygote', | |
| '--disable-gpu' | |
| ] | |
| ) | |
| page = await browser.new_page() | |
| try: | |
| logger.info(f"Navigating to {url}...") | |
| await page.goto(url, wait_until="networkidle") | |
| response = ScrapeResponse() | |
| # Always get page title and meta description | |
| logger.info("Getting page metadata...") | |
| response.page_title = await page.title() | |
| meta_desc = await page.evaluate(""" | |
| () => { | |
| const meta = document.querySelector('meta[name="description"]'); | |
| return meta ? meta.getAttribute('content') : null; | |
| } | |
| """) | |
| response.meta_description = meta_desc | |
| # Get body content (clean text) | |
| if get_body: | |
| logger.info("Extracting body content...") | |
| body_content = await page.evaluate(""" | |
| () => { | |
| const body = document.querySelector('body'); | |
| if (!body) return null; | |
| // Remove script and style elements | |
| const scripts = body.querySelectorAll('script, style, noscript'); | |
| scripts.forEach(el => el.remove()); | |
| // Get clean text content | |
| return body.innerText.trim(); | |
| } | |
| """) | |
| response.body_content = body_content | |
| # Get screenshot (full page) | |
| if screenshot: | |
| logger.info("Taking full page screenshot...") | |
| screenshot_bytes = await page.screenshot(full_page=True) | |
| response.screenshot = base64.b64encode(screenshot_bytes).decode('utf-8') | |
| # Get links with better filtering | |
| if get_links: | |
| logger.info("Extracting links...") | |
| links = await page.evaluate(""" | |
| () => { | |
| return Array.from(document.querySelectorAll('a[href]')).map(a => { | |
| const text = a.innerText.trim(); | |
| const href = a.href; | |
| // Only include links with meaningful text and valid URLs | |
| if (text && href && href.startsWith('http')) { | |
| return { | |
| text: text.substring(0, 200), // Limit text length | |
| href: href | |
| } | |
| } | |
| return null; | |
| }).filter(link => link !== null); | |
| } | |
| """) | |
| response.links = [LinkInfo(**link) for link in links] | |
| # Lead Generation Extraction | |
| if lead_generation: | |
| logger.info("Extracting lead generation data...") | |
| lead_data_raw = await page.evaluate(""" | |
| () => { | |
| const result = { | |
| emails: [], | |
| phones: [], | |
| social_media: [], | |
| contact_forms: [], | |
| company_name: null, | |
| address: null, | |
| technologies: [], | |
| industry_keywords: [] | |
| }; | |
| // Extract emails | |
| const emailRegex = /[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g; | |
| const pageText = document.body.innerText; | |
| const emails = pageText.match(emailRegex) || []; | |
| result.emails = [...new Set(emails)].slice(0, 10); // Unique emails, max 10 | |
| // Extract phone numbers | |
| const phoneRegex = /(\+?1?[-.\s]?)?\(?([0-9]{3})\)?[-.\s]?([0-9]{3})[-.\s]?([0-9]{4})/g; | |
| const phones = pageText.match(phoneRegex) || []; | |
| result.phones = [...new Set(phones)].slice(0, 5); // Unique phones, max 5 | |
| // Extract social media links | |
| const socialLinks = Array.from(document.querySelectorAll('a[href]')).map(a => a.href) | |
| .filter(href => /facebook|twitter|linkedin|instagram|youtube|tiktok/i.test(href)); | |
| result.social_media = [...new Set(socialLinks)].slice(0, 10); | |
| // Find contact forms | |
| const forms = Array.from(document.querySelectorAll('form')).map(form => { | |
| const action = form.action || window.location.href; | |
| return action; | |
| }); | |
| result.contact_forms = [...new Set(forms)].slice(0, 5); | |
| // Extract company name (try multiple methods) | |
| result.company_name = | |
| document.querySelector('meta[property="og:site_name"]')?.content || | |
| document.querySelector('meta[name="application-name"]')?.content || | |
| document.querySelector('h1')?.innerText?.trim() || | |
| document.title?.split('|')[0]?.split('-')[0]?.trim(); | |
| // Extract address | |
| const addressRegex = /\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Lane|Ln|Drive|Dr|Court|Ct|Place|Pl)\s*,?\s*[A-Za-z\s]+,?\s*[A-Z]{2}\s*\d{5}/g; | |
| const addresses = pageText.match(addressRegex) || []; | |
| result.address = addresses[0] || null; | |
| // Detect technologies | |
| const techKeywords = ['wordpress', 'shopify', 'react', 'angular', 'vue', 'bootstrap', 'jquery', 'google analytics', 'facebook pixel']; | |
| const htmlContent = document.documentElement.outerHTML.toLowerCase(); | |
| result.technologies = techKeywords.filter(tech => htmlContent.includes(tech)); | |
| // Industry keywords | |
| const industryKeywords = ['consulting', 'marketing', 'software', 'healthcare', 'finance', 'real estate', 'education', 'retail', 'manufacturing', 'legal', 'restaurant', 'fitness', 'beauty', 'automotive']; | |
| const lowerPageText = pageText.toLowerCase(); | |
| result.industry_keywords = industryKeywords.filter(keyword => lowerPageText.includes(keyword)); | |
| return result; | |
| } | |
| """) | |
| # Calculate lead score | |
| lead_score = 0 | |
| if lead_data_raw['emails']: lead_score += 30 | |
| if lead_data_raw['phones']: lead_score += 25 | |
| if lead_data_raw['contact_forms']: lead_score += 20 | |
| if lead_data_raw['social_media']: lead_score += 15 | |
| if lead_data_raw['company_name']: lead_score += 10 | |
| if lead_data_raw['address']: lead_score += 15 | |
| if lead_data_raw['technologies']: lead_score += 10 | |
| if lead_data_raw['industry_keywords']: lead_score += 5 | |
| # Create lead data object | |
| contact_info = ContactInfo( | |
| emails=lead_data_raw['emails'], | |
| phones=lead_data_raw['phones'], | |
| social_media=lead_data_raw['social_media'], | |
| contact_forms=lead_data_raw['contact_forms'] | |
| ) | |
| business_info = BusinessInfo( | |
| company_name=lead_data_raw['company_name'], | |
| address=lead_data_raw['address'], | |
| description=response.meta_description, | |
| industry_keywords=lead_data_raw['industry_keywords'] | |
| ) | |
| response.lead_data = LeadData( | |
| contact_info=contact_info, | |
| business_info=business_info, | |
| lead_score=min(lead_score, 100), # Cap at 100 | |
| technologies=lead_data_raw['technologies'] | |
| ) | |
| await browser.close() | |
| logger.info("Scraping completed successfully") | |
| return response | |
| except Exception as e: | |
| logger.error(f"Error during scraping: {str(e)}") | |
| await browser.close() | |
| raise HTTPException(status_code=500, detail=f"Scraping error: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error launching browser: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Browser launch error: {str(e)}") | |