File size: 5,711 Bytes
fcf1601
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import asyncio
import json
import os
import csv
import concurrent.futures
from typing import List
from fastapi import FastAPI, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel

app = FastAPI(title="Lead Hunter AI API")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class ScrapeRequest(BaseModel):
    niche: str
    location: str
    limit: int = 10

# Global progress state
progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0}

# Thread pool for running sync Playwright code outside the event loop
_executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)


def _run_scrape(query: str, limit: int) -> list:
    """Runs the sync Playwright scraper in a thread (safe for asyncio)."""
    try:
        from tools.google_maps_scraper import scrape_google_maps
        return scrape_google_maps(query, limit)
    except Exception as e:
        print(f"[!] Scraper Error: {e}")
        return []


def _run_enrich(lead: dict) -> dict:
    """Runs the sync httpx enricher in a thread."""
    try:
        from tools.contact_enricher import enrich_lead
        return enrich_lead(lead)
    except Exception as e:
        print(f"[!] Enricher Error: {e}")
        return lead


async def run_workflow(niche: str, location: str, limit: int):
    global progress_data
    loop = asyncio.get_event_loop()

    try:
        # ── Stage 1: Scrape ──────────────────────────────────────────────
        query = f"{niche} in {location}"
        progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10}

        raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit)

        if not raw_leads:
            progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100}
            return

        # ── Stage 2: Enrich ──────────────────────────────────────────────
        progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40}

        enriched_leads = []
        for i, lead in enumerate(raw_leads):
            progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}"
            progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50)

            enriched = await loop.run_in_executor(_executor, _run_enrich, lead)
            enriched_leads.append(enriched)

        # ── Stage 3: Save ────────────────────────────────────────────────
        progress_data = {"status": "saving", "message": "Saving results...", "progress": 95}

        os.makedirs(".tmp", exist_ok=True)
        with open(".tmp/last_results.json", "w", encoding="utf-8") as f:
            json.dump(enriched_leads, f, indent=2)

        # Also save a named CSV
        filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv"
        if enriched_leads:
            keys = list(enriched_leads[0].keys())
            with open(filename, "w", newline="", encoding="utf-8") as f:
                w = csv.DictWriter(f, fieldnames=keys)
                w.writeheader()
                w.writerows(enriched_leads)

        progress_data = {
            "status": "complete",
            "message": f"Done! {len(enriched_leads)} leads saved to {filename}",
            "progress": 100,
            "filename": filename,
        }

    except Exception as e:
        print(f"[!] Workflow error: {e}")
        progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100}


@app.post("/api/scrape")
async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
    global progress_data
    if progress_data.get("status") in ("scraping", "enriching", "saving"):
        return {"error": "A scrape is already running"}
    background_tasks.add_task(run_workflow, request.niche, request.location, request.limit)
    return {"message": "Workflow started"}


@app.get("/api/status")
async def get_status():
    return progress_data


@app.get("/api/results")
async def get_results():
    try:
        if os.path.exists(".tmp/last_results.json"):
            with open(".tmp/last_results.json", "r", encoding="utf-8") as f:
                return json.load(f)
        return []
    except Exception as e:
        print(f"[!] Error reading results: {e}")
        return []

# Serve Static Files
frontend_path = "frontend/dist"
if os.path.exists(frontend_path):
    # Mount assets folder
    if os.path.exists(os.path.join(frontend_path, "assets")):
        app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets")

    # Serve index.html for all other routes (SPA fallback)
    @app.get("/{full_path:path}")
    async def serve_frontend(full_path: str):
        # Exclude /api calls
        if full_path.startswith("api"):
            return None
        return FileResponse(os.path.join(frontend_path, "index.html"))

if __name__ == "__main__":
    import uvicorn
    # Use 7860 as priority for HF Spaces
    port = int(os.environ.get("PORT", 7860))
    # Bind to 0.0.0.0 for containerization
    uvicorn.run(app, host="0.0.0.0", port=port)