lead-hunter-ai / app.py
agenticworkflowsspace's picture
Upload app.py with huggingface_hub
fcf1601 verified
import asyncio
import json
import os
import csv
import concurrent.futures
from typing import List
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
app = FastAPI(title="Lead Hunter AI API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ScrapeRequest(BaseModel):
niche: str
location: str
limit: int = 10
# Global progress state
progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0}
# Thread pool for running sync Playwright code outside the event loop
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
def _run_scrape(query: str, limit: int) -> list:
"""Runs the sync Playwright scraper in a thread (safe for asyncio)."""
try:
from tools.google_maps_scraper import scrape_google_maps
return scrape_google_maps(query, limit)
except Exception as e:
print(f"[!] Scraper Error: {e}")
return []
def _run_enrich(lead: dict) -> dict:
"""Runs the sync httpx enricher in a thread."""
try:
from tools.contact_enricher import enrich_lead
return enrich_lead(lead)
except Exception as e:
print(f"[!] Enricher Error: {e}")
return lead
async def run_workflow(niche: str, location: str, limit: int):
global progress_data
loop = asyncio.get_event_loop()
try:
# ── Stage 1: Scrape ──────────────────────────────────────────────
query = f"{niche} in {location}"
progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10}
raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit)
if not raw_leads:
progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100}
return
# ── Stage 2: Enrich ──────────────────────────────────────────────
progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40}
enriched_leads = []
for i, lead in enumerate(raw_leads):
progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}"
progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50)
enriched = await loop.run_in_executor(_executor, _run_enrich, lead)
enriched_leads.append(enriched)
# ── Stage 3: Save ────────────────────────────────────────────────
progress_data = {"status": "saving", "message": "Saving results...", "progress": 95}
os.makedirs(".tmp", exist_ok=True)
with open(".tmp/last_results.json", "w", encoding="utf-8") as f:
json.dump(enriched_leads, f, indent=2)
# Also save a named CSV
filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv"
if enriched_leads:
keys = list(enriched_leads[0].keys())
with open(filename, "w", newline="", encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=keys)
w.writeheader()
w.writerows(enriched_leads)
progress_data = {
"status": "complete",
"message": f"Done! {len(enriched_leads)} leads saved to {filename}",
"progress": 100,
"filename": filename,
}
except Exception as e:
print(f"[!] Workflow error: {e}")
progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100}
@app.post("/api/scrape")
async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
global progress_data
if progress_data.get("status") in ("scraping", "enriching", "saving"):
return {"error": "A scrape is already running"}
background_tasks.add_task(run_workflow, request.niche, request.location, request.limit)
return {"message": "Workflow started"}
@app.get("/api/status")
async def get_status():
return progress_data
@app.get("/api/results")
async def get_results():
try:
if os.path.exists(".tmp/last_results.json"):
with open(".tmp/last_results.json", "r", encoding="utf-8") as f:
return json.load(f)
return []
except Exception as e:
print(f"[!] Error reading results: {e}")
return []
# Serve Static Files
frontend_path = "frontend/dist"
if os.path.exists(frontend_path):
# Mount assets folder
if os.path.exists(os.path.join(frontend_path, "assets")):
app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets")
# Serve index.html for all other routes (SPA fallback)
@app.get("/{full_path:path}")
async def serve_frontend(full_path: str):
# Exclude /api calls
if full_path.startswith("api"):
return None
return FileResponse(os.path.join(frontend_path, "index.html"))
if __name__ == "__main__":
import uvicorn
# Use 7860 as priority for HF Spaces
port = int(os.environ.get("PORT", 7860))
# Bind to 0.0.0.0 for containerization
uvicorn.run(app, host="0.0.0.0", port=port)