agenticworkflowsspace commited on
Commit
fcf1601
Β·
verified Β·
1 Parent(s): 931a8b1

Upload app.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. app.py +154 -0
app.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ import json
3
+ import os
4
+ import csv
5
+ import concurrent.futures
6
+ from typing import List
7
+ from fastapi import FastAPI, BackgroundTasks
8
+ from fastapi.middleware.cors import CORSMiddleware
9
+ from fastapi.staticfiles import StaticFiles
10
+ from fastapi.responses import FileResponse
11
+ from pydantic import BaseModel
12
+
13
+ app = FastAPI(title="Lead Hunter AI API")
14
+
15
+ app.add_middleware(
16
+ CORSMiddleware,
17
+ allow_origins=["*"],
18
+ allow_credentials=True,
19
+ allow_methods=["*"],
20
+ allow_headers=["*"],
21
+ )
22
+
23
+ class ScrapeRequest(BaseModel):
24
+ niche: str
25
+ location: str
26
+ limit: int = 10
27
+
28
+ # Global progress state
29
+ progress_data = {"status": "idle", "message": "Awaiting transmission...", "progress": 0}
30
+
31
+ # Thread pool for running sync Playwright code outside the event loop
32
+ _executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
33
+
34
+
35
+ def _run_scrape(query: str, limit: int) -> list:
36
+ """Runs the sync Playwright scraper in a thread (safe for asyncio)."""
37
+ try:
38
+ from tools.google_maps_scraper import scrape_google_maps
39
+ return scrape_google_maps(query, limit)
40
+ except Exception as e:
41
+ print(f"[!] Scraper Error: {e}")
42
+ return []
43
+
44
+
45
+ def _run_enrich(lead: dict) -> dict:
46
+ """Runs the sync httpx enricher in a thread."""
47
+ try:
48
+ from tools.contact_enricher import enrich_lead
49
+ return enrich_lead(lead)
50
+ except Exception as e:
51
+ print(f"[!] Enricher Error: {e}")
52
+ return lead
53
+
54
+
55
+ async def run_workflow(niche: str, location: str, limit: int):
56
+ global progress_data
57
+ loop = asyncio.get_event_loop()
58
+
59
+ try:
60
+ # ── Stage 1: Scrape ──────────────────────────────────────────────
61
+ query = f"{niche} in {location}"
62
+ progress_data = {"status": "scraping", "message": f"Searching for {niche} in {location}...", "progress": 10}
63
+
64
+ raw_leads = await loop.run_in_executor(_executor, _run_scrape, query, limit)
65
+
66
+ if not raw_leads:
67
+ progress_data = {"status": "error", "message": "No businesses found. Try a different niche or location.", "progress": 100}
68
+ return
69
+
70
+ # ── Stage 2: Enrich ──────────────────────────────────────────────
71
+ progress_data = {"status": "enriching", "message": f"Found {len(raw_leads)} leads. Enriching contacts...", "progress": 40}
72
+
73
+ enriched_leads = []
74
+ for i, lead in enumerate(raw_leads):
75
+ progress_data["message"] = f"Enriching {i+1}/{len(raw_leads)}: {lead.get('name', '...')}"
76
+ progress_data["progress"] = 40 + int((i / len(raw_leads)) * 50)
77
+
78
+ enriched = await loop.run_in_executor(_executor, _run_enrich, lead)
79
+ enriched_leads.append(enriched)
80
+
81
+ # ── Stage 3: Save ────────────────────────────────────────────────
82
+ progress_data = {"status": "saving", "message": "Saving results...", "progress": 95}
83
+
84
+ os.makedirs(".tmp", exist_ok=True)
85
+ with open(".tmp/last_results.json", "w", encoding="utf-8") as f:
86
+ json.dump(enriched_leads, f, indent=2)
87
+
88
+ # Also save a named CSV
89
+ filename = f"{niche.lower().replace(' ','_')}_{location.lower().replace(' ','_')}.csv"
90
+ if enriched_leads:
91
+ keys = list(enriched_leads[0].keys())
92
+ with open(filename, "w", newline="", encoding="utf-8") as f:
93
+ w = csv.DictWriter(f, fieldnames=keys)
94
+ w.writeheader()
95
+ w.writerows(enriched_leads)
96
+
97
+ progress_data = {
98
+ "status": "complete",
99
+ "message": f"Done! {len(enriched_leads)} leads saved to {filename}",
100
+ "progress": 100,
101
+ "filename": filename,
102
+ }
103
+
104
+ except Exception as e:
105
+ print(f"[!] Workflow error: {e}")
106
+ progress_data = {"status": "error", "message": f"Error: {str(e)}", "progress": 100}
107
+
108
+
109
+ @app.post("/api/scrape")
110
+ async def start_scrape(request: ScrapeRequest, background_tasks: BackgroundTasks):
111
+ global progress_data
112
+ if progress_data.get("status") in ("scraping", "enriching", "saving"):
113
+ return {"error": "A scrape is already running"}
114
+ background_tasks.add_task(run_workflow, request.niche, request.location, request.limit)
115
+ return {"message": "Workflow started"}
116
+
117
+
118
+ @app.get("/api/status")
119
+ async def get_status():
120
+ return progress_data
121
+
122
+
123
+ @app.get("/api/results")
124
+ async def get_results():
125
+ try:
126
+ if os.path.exists(".tmp/last_results.json"):
127
+ with open(".tmp/last_results.json", "r", encoding="utf-8") as f:
128
+ return json.load(f)
129
+ return []
130
+ except Exception as e:
131
+ print(f"[!] Error reading results: {e}")
132
+ return []
133
+
134
+ # Serve Static Files
135
+ frontend_path = "frontend/dist"
136
+ if os.path.exists(frontend_path):
137
+ # Mount assets folder
138
+ if os.path.exists(os.path.join(frontend_path, "assets")):
139
+ app.mount("/assets", StaticFiles(directory=os.path.join(frontend_path, "assets")), name="assets")
140
+
141
+ # Serve index.html for all other routes (SPA fallback)
142
+ @app.get("/{full_path:path}")
143
+ async def serve_frontend(full_path: str):
144
+ # Exclude /api calls
145
+ if full_path.startswith("api"):
146
+ return None
147
+ return FileResponse(os.path.join(frontend_path, "index.html"))
148
+
149
+ if __name__ == "__main__":
150
+ import uvicorn
151
+ # Use 7860 as priority for HF Spaces
152
+ port = int(os.environ.get("PORT", 7860))
153
+ # Bind to 0.0.0.0 for containerization
154
+ uvicorn.run(app, host="0.0.0.0", port=port)