import asyncio import json import os import csv import concurrent.futures from typing import List from fastapi import FastAPI, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel app = FastAPI(title="Lead Hunter AI API") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class ScrapeRequest(BaseModel): niche: str location: str limit: int = 10 # Global progress state progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0} # Thread pool for running sync Playwright code outside the event loop _executor = concurrent.futures.ThreadPoolExecutor(max_workers=2) def _run_scrape(query: str, limit: int) -> list: """Runs the sync Playwright scraper in a thread (safe for asyncio).""" try: from tools.google_maps_scraper import scrape_google_maps return scrape_google_maps(query, limit) except Exception as e: print(f"[!] Scraper Error: {e}") return [] def _run_enrich(lead: dict) -> dict: """Runs the sync httpx enricher in a thread.""" try: from tools.contact_enricher import enrich_lead return enrich_lead(lead) except Exception as e: print(f"[!] Enricher Error: {e}") return lead async def run_workflow(niche: str, location: str, limit: int): global progress_data loop = asyncio.get_event_loop() try: # ── Stage 1: Scrape ────────────────────────────────────────────── query = f"{niche} in {location}" progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10} raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit) if not raw_leads: progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100} return # ── Stage 2: Enrich ────────────────────────────────────────────── progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40} enriched_leads = [] for i, lead in enumerate(raw_leads): progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}" progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50) enriched = await loop.run_in_executor(_executor, _run_enrich, lead) enriched_leads.append(enriched) # ── Stage 3: Save ──────────────────────────────────────────────── progress_data = {"status": "saving", "message": "Saving results...", "progress": 95} os.makedirs(".tmp", exist_ok=True) with open(".tmp/last_results.json", "w", encoding="utf-8") as f: json.dump(enriched_leads, f, indent=2) # Also save a named CSV filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv" if enriched_leads: keys = list(enriched_leads[0].keys()) with open(filename, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=keys) w.writeheader() w.writerows(enriched_leads) progress_data = { "status": "complete", "message": f"Done! {len(enriched_leads)} leads saved to {filename}", "progress": 100, "filename": filename, } except Exception as e: print(f"[!] Workflow error: {e}") progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100} @app.post("/api/scrape") async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks): global progress_data if progress_data.get("status") in ("scraping", "enriching", "saving"): return {"error": "A scrape is already running"} background_tasks.add_task(run_workflow, request.niche, request.location, request.limit) return {"message": "Workflow started"} @app.get("/api/status") async def get_status(): return progress_data @app.get("/api/results") async def get_results(): try: if os.path.exists(".tmp/last_results.json"): with open(".tmp/last_results.json", "r", encoding="utf-8") as f: return json.load(f) return [] except Exception as e: print(f"[!] Error reading results: {e}") return [] # Serve Static Files frontend_path = "frontend/dist" if os.path.exists(frontend_path): # Mount assets folder if os.path.exists(os.path.join(frontend_path, "assets")): app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets") # Serve index.html for all other routes (SPA fallback) @app.get("/{full_path:path}") async def serve_frontend(full_path: str): # Exclude /api calls if full_path.startswith("api"): return None return FileResponse(os.path.join(frontend_path, "index.html")) if __name__ == "__main__": import uvicorn # Use 7860 as priority for HF Spaces port = int(os.environ.get("PORT", 7860)) # Bind to 0.0.0.0 for containerization uvicorn.run(app, host="0.0.0.0", port=port)