| import asyncio |
| import json |
| import os |
| import csv |
| import concurrent.futures |
| from typing import List |
| from fastapi import FastAPI, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse |
| from pydantic import BaseModel |
|
|
| app = FastAPI(title="Lead Hunter AI API") |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| class ScrapeRequest(BaseModel): |
| niche: str |
| location: str |
| limit: int = 10 |
|
|
| |
| progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0} |
|
|
| |
| _executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) |
|
|
|
|
| def _run_scrape(query: str, limit: int) -> list: |
| """Runs the sync Playwright scraper in a thread (safe for asyncio).""" |
| try: |
| from tools.google_maps_scraper import scrape_google_maps |
| return scrape_google_maps(query, limit) |
| except Exception as e: |
| print(f"[!] Scraper Error: {e}") |
| return [] |
|
|
|
|
| def _run_enrich(lead: dict) -> dict: |
| """Runs the sync httpx enricher in a thread.""" |
| try: |
| from tools.contact_enricher import enrich_lead |
| return enrich_lead(lead) |
| except Exception as e: |
| print(f"[!] Enricher Error: {e}") |
| return lead |
|
|
|
|
| async def run_workflow(niche: str, location: str, limit: int): |
| global progress_data |
| loop = asyncio.get_event_loop() |
|
|
| try: |
| |
| query = f"{niche} in {location}" |
| progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10} |
|
|
| raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit) |
|
|
| if not raw_leads: |
| progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100} |
| return |
|
|
| |
| progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40} |
|
|
| enriched_leads = [] |
| for i, lead in enumerate(raw_leads): |
| progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}" |
| progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50) |
|
|
| enriched = await loop.run_in_executor(_executor, _run_enrich, lead) |
| enriched_leads.append(enriched) |
|
|
| |
| progress_data = {"status": "saving", "message": "Saving results...", "progress": 95} |
|
|
| os.makedirs(".tmp", exist_ok=True) |
| with open(".tmp/last_results.json", "w", encoding="utf-8") as f: |
| json.dump(enriched_leads, f, indent=2) |
|
|
| |
| filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv" |
| if enriched_leads: |
| keys = list(enriched_leads[0].keys()) |
| with open(filename, "w", newline="", encoding="utf-8") as f: |
| w = csv.DictWriter(f, fieldnames=keys) |
| w.writeheader() |
| w.writerows(enriched_leads) |
|
|
| progress_data = { |
| "status": "complete", |
| "message": f"Done! {len(enriched_leads)} leads saved to {filename}", |
| "progress": 100, |
| "filename": filename, |
| } |
|
|
| except Exception as e: |
| print(f"[!] Workflow error: {e}") |
| progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100} |
|
|
|
|
| @app.post("/api/scrape") |
| async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks): |
| global progress_data |
| if progress_data.get("status") in ("scraping", "enriching", "saving"): |
| return {"error": "A scrape is already running"} |
| background_tasks.add_task(run_workflow, request.niche, request.location, request.limit) |
| return {"message": "Workflow started"} |
|
|
|
|
| @app.get("/api/status") |
| async def get_status(): |
| return progress_data |
|
|
|
|
| @app.get("/api/results") |
| async def get_results(): |
| try: |
| if os.path.exists(".tmp/last_results.json"): |
| with open(".tmp/last_results.json", "r", encoding="utf-8") as f: |
| return json.load(f) |
| return [] |
| except Exception as e: |
| print(f"[!] Error reading results: {e}") |
| return [] |
|
|
| |
| frontend_path = "frontend/dist" |
| if os.path.exists(frontend_path): |
| |
| if os.path.exists(os.path.join(frontend_path, "assets")): |
| app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets") |
|
|
| |
| @app.get("/{full_path:path}") |
| async def serve_frontend(full_path: str): |
| |
| if full_path.startswith("api"): |
| return None |
| return FileResponse(os.path.join(frontend_path, "index.html")) |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| |
| port = int(os.environ.get("PORT", 7860)) |
| |
| uvicorn.run(app, host="0.0.0.0", port=port) |
|
|