File size: 5,711 Bytes
fcf1601 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | import asyncio
import json
import os
import csv
import concurrent.futures
from typing import List
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
app = FastAPI(title="Lead Hunter AI API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ScrapeRequest(BaseModel):
niche: str
location: str
limit: int = 10
# Global progress state
progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0}
# Thread pool for running sync Playwright code outside the event loop
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def _run_scrape(query: str, limit: int) -> list:
"""Runs the sync Playwright scraper in a thread (safe for asyncio)."""
try:
from tools.google_maps_scraper import scrape_google_maps
return scrape_google_maps(query, limit)
except Exception as e:
print(f"[!] Scraper Error: {e}")
return []
def _run_enrich(lead: dict) -> dict:
"""Runs the sync httpx enricher in a thread."""
try:
from tools.contact_enricher import enrich_lead
return enrich_lead(lead)
except Exception as e:
print(f"[!] Enricher Error: {e}")
return lead
async def run_workflow(niche: str, location: str, limit: int):
global progress_data
loop = asyncio.get_event_loop()
try:
# ββ Stage 1: Scrape ββββββββββββββββββββββββββββββββββββββββββββββ
query = f"{niche} in {location}"
progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10}
raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit)
if not raw_leads:
progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100}
return
# ββ Stage 2: Enrich ββββββββββββββββββββββββββββββββββββββββββββββ
progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40}
enriched_leads = []
for i, lead in enumerate(raw_leads):
progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}"
progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50)
enriched = await loop.run_in_executor(_executor, _run_enrich, lead)
enriched_leads.append(enriched)
# ββ Stage 3: Save ββββββββββββββββββββββββββββββββββββββββββββββββ
progress_data = {"status": "saving", "message": "Saving results...", "progress": 95}
os.makedirs(".tmp", exist_ok=True)
with open(".tmp/last_results.json", "w", encoding="utf-8") as f:
json.dump(enriched_leads, f, indent=2)
# Also save a named CSV
filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv"
if enriched_leads:
keys = list(enriched_leads[0].keys())
with open(filename, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=keys)
w.writeheader()
w.writerows(enriched_leads)
progress_data = {
"status": "complete",
"message": f"Done! {len(enriched_leads)} leads saved to {filename}",
"progress": 100,
"filename": filename,
}
except Exception as e:
print(f"[!] Workflow error: {e}")
progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100}
@app.post("/api/scrape")
async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
global progress_data
if progress_data.get("status") in ("scraping", "enriching", "saving"):
return {"error": "A scrape is already running"}
background_tasks.add_task(run_workflow, request.niche, request.location, request.limit)
return {"message": "Workflow started"}
@app.get("/api/status")
async def get_status():
return progress_data
@app.get("/api/results")
async def get_results():
try:
if os.path.exists(".tmp/last_results.json"):
with open(".tmp/last_results.json", "r", encoding="utf-8") as f:
return json.load(f)
return []
except Exception as e:
print(f"[!] Error reading results: {e}")
return []
# Serve Static Files
frontend_path = "frontend/dist"
if os.path.exists(frontend_path):
# Mount assets folder
if os.path.exists(os.path.join(frontend_path, "assets")):
app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets")
# Serve index.html for all other routes (SPA fallback)
@app.get("/{full_path:path}")
async def serve_frontend(full_path: str):
# Exclude /api calls
if full_path.startswith("api"):
return None
return FileResponse(os.path.join(frontend_path, "index.html"))
if __name__ == "__main__":
import uvicorn
# Use 7860 as priority for HF Spaces
port = int(os.environ.get("PORT", 7860))
# Bind to 0.0.0.0 for containerization
uvicorn.run(app, host="0.0.0.0", port=port)
|