| from fastapi import FastAPI |
| from playwright.async_api import async_playwright, TimeoutError |
| import re |
|
|
| app = FastAPI() |
|
|
| async def scrape_google(query: str): |
| url = f"https://www.google.com/search?q={query}" |
| async with async_playwright() as pw: |
| browser = await pw.chromium.launch(headless=True) |
| context = await browser.new_context() |
| page = await context.new_page() |
|
|
| await page.goto(url, wait_until="domcontentloaded", timeout=60000) |
| try: |
| await page.wait_for_selector("div#search", timeout=10000) |
| except TimeoutError: |
| pass |
|
|
| links = [] |
| for h in await page.query_selector_all("h3"): |
| try: |
| a = await h.evaluate_handle("e => e.closest('a')") |
| href = await a.get_attribute("href") |
| title = await h.inner_text() |
| links.append({"title": title, "link": href}) |
| except: |
| continue |
|
|
| results = [] |
| for item in links[:5]: |
| await page.goto(item["link"], wait_until="domcontentloaded", timeout=30000) |
| html = await page.content() |
| emails = re.findall(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", html) |
| phones = re.findall(r"\+?\d[\d\s\-/]{7,}\d", html) |
| results.append({ |
| **item, |
| "emails": list(set(emails))[:2], |
| "phones": list(set(phones))[:2] |
| }) |
|
|
| await browser.close() |
| return results |
|
|
| @app.get("/search") |
| async def search(query: str): |
| data = await scrape_google(query.replace(" ", "+")) |
| return {"query": query, "results": data} |
|
|