LeadGenPro / lead_gen /app.py
MaSTer-suFYan
Fix pipeline stuck bug and add lead clearing controls
26a54a9
#!/usr/bin/env python3
"""
app.py -- Flask API server for LeadGen Pro web dashboard (v2).
Endpoints:
POST /api/scrape -> start scrape job, return job_id
GET /api/status/<id> -> SSE streaming progress
GET /api/leads -> paginated + filtered leads from SQLite
POST /api/leads/export -> trigger PDF/CSV/JSON export
GET /api/stats -> dashboard statistics
PATCH /api/leads/<id>/status -> update single lead status
PATCH /api/leads/bulk-status -> update multiple leads' status
PATCH /api/leads/<id>/notes -> update lead notes
"""
import os
import sys
import json
import time
import uuid
import threading
import re
from datetime import datetime
from functools import wraps
from flask import Flask, jsonify, request, send_file, send_from_directory, Response
from flask_cors import CORS
from utils.logger import get_logger
logger = get_logger(__name__)
# -- Path setup ----------------------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
import config
from models import Lead
from utils.database import (
init_db, insert_leads, get_all_leads, get_lead_by_id,
update_lead_status, bulk_update_status, update_lead_notes,
get_stats as db_get_stats, get_top_leads, delete_all_leads,
)
from utils.helpers import timestamp_for_filename, lead_fingerprint
from scraper.demo_scraper import DemoScraper
from processors.deduplicator import Deduplicator
from processors.categorizer import Categorizer
from processors.scorer import LeadScorer
from processors.validator import LeadValidator
from exporters.pdf_exporter import PDFExporter
from exporters.csv_exporter import CSVExporter
from exporters.json_exporter import JSONExporter
# -- Flask app -----------------------------------------------------------------
app = Flask(__name__, static_folder="static", static_url_path="")
CORS(app)
# -- Initialize database -------------------------------------------------------
init_db()
# -- Job tracking (in-memory) --------------------------------------------------
_jobs = {} # job_id -> { status, progress, lead_count, ... }
_jobs_lock = threading.Lock()
# ==============================================================================
# Input sanitization helpers
# ==============================================================================
def sanitize_string(s, max_len=200):
if not isinstance(s, str):
return ""
return re.sub(r'[<>"\';]', '', s)[:max_len]
def safe_int(val, default=1, min_val=1, max_val=500):
try:
n = int(val)
return max(min_val, min(n, max_val))
except (TypeError, ValueError):
return default
def rate_limit(max_per_minute=30):
"""Simple per-IP rate limiter decorator."""
_store = {}
def decorator(f):
def wrapper(*args, **kwargs):
key = request.remote_addr or "unknown"
now = time.time()
calls = [t for t in _store.get(key, []) if now - t < 60]
if len(calls) >= max_per_minute:
return jsonify({"error": "Rate limit exceeded"}), 429
calls.append(now)
_store[key] = calls
return f(*args, **kwargs)
# Preserve the original function name so Flask doesn't get duplicate endpoints
wrapper.__name__ = f.__name__
wrapper.__qualname__ = f.__qualname__
return wrapper
return decorator
# ==============================================================================
# Routes
# ==============================================================================
@app.route("/")
def index():
return send_from_directory("static", "index.html")
# -- Scrape job ----------------------------------------------------------------
@app.route("/api/scrape", methods=["POST"])
@rate_limit(max_per_minute=5)
def api_scrape():
"""Start a scrape job. Returns job_id for SSE progress tracking."""
# Check if any job is already running (auto-expire stale jobs > 10 min)
with _jobs_lock:
stale_ids = []
for jid, job in _jobs.items():
if job.get("running"):
started = job.get("started_at", "")
if started:
try:
elapsed = (datetime.now() - datetime.fromisoformat(started)).total_seconds()
if elapsed > 600: # 10 minutes
stale_ids.append(jid)
continue
except (ValueError, TypeError):
pass
return jsonify({"error": "A job is already running", "job_id": jid}), 409
# Clean up stale jobs
for jid in stale_ids:
_jobs[jid]["running"] = False
_jobs[jid]["error"] = "Auto-cancelled: exceeded 10 minute timeout"
_jobs[jid]["progress"] = "Timed out"
_jobs[jid]["finished_at"] = datetime.now().isoformat()
logger.warning(f"Auto-cancelled stale job {jid}")
data = request.get_json(force=True) or {}
job_id = str(uuid.uuid4())[:8]
service_val = sanitize_string(data.get("service", "all"))
demo_val = data.get("demo") if isinstance(data.get("demo"), bool) else str(data.get("demo", "true")).lower() != "false"
# Validate category against SEARCH_KEYWORDS (skip for demo mode)
from config import SEARCH_KEYWORDS
if not demo_val and service_val != "all" and service_val not in SEARCH_KEYWORDS:
print(f"DEBUG REJECTED unknown category: {service_val!r}")
return jsonify({"error": f"Unknown category: {service_val}. Valid: {', '.join(sorted(SEARCH_KEYWORDS.keys()))}"}), 400
print(f"DEBUG category received: {service_val!r}")
job = {
"id": job_id,
"running": True,
"progress": "Starting pipeline...",
"lead_count": 0,
"started_at": datetime.now().isoformat(),
"finished_at": None,
"error": None,
"config": {
"service": service_val,
"location": sanitize_string(data.get("location", "Mumbai")),
"source": sanitize_string(data.get("source", "all")),
"limit": safe_int(data.get("limit", 50), default=50, max_val=500),
"demo": demo_val,
"analyse": data.get("analyse") if isinstance(data.get("analyse"), bool) else str(data.get("analyse", "false")).lower() == "true",
},
}
with _jobs_lock:
_jobs[job_id] = job
thread = threading.Thread(
target=_run_pipeline, args=(job_id,), daemon=True
)
thread.start()
return jsonify({"job_id": job_id, "status": "started"})
@app.route("/api/status/<job_id>")
def api_status_sse(job_id):
"""SSE stream for real-time job progress."""
def generate():
while True:
with _jobs_lock:
job = _jobs.get(job_id)
if not job:
yield f"data: {json.dumps({'error': 'Job not found'})}\n\n"
break
payload = {
"running": job["running"],
"progress": job["progress"],
"lead_count": job["lead_count"],
"error": job["error"],
}
yield f"data: {json.dumps(payload)}\n\n"
if not job["running"]:
break
time.sleep(0.8)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/status")
def api_status_simple():
"""Simple JSON status (backward compat + polling fallback)."""
with _jobs_lock:
running_job = None
for job in _jobs.values():
if job.get("running"):
running_job = job
break
stats = db_get_stats()
return jsonify({
"running": running_job is not None,
"progress": running_job.get("progress", "") if running_job else "",
"job_id": running_job["id"] if running_job else None,
"total_leads": stats["total"],
})
# -- Leads CRUD ----------------------------------------------------------------
@app.route("/api/leads")
def api_leads():
"""Paginated, filtered leads from SQLite."""
page = safe_int(request.args.get("page", 1), default=1, min_val=1, max_val=10000)
per_page = safe_int(
request.args.get("per_page", config.DEFAULT_PER_PAGE),
default=config.DEFAULT_PER_PAGE,
min_val=1,
max_val=config.MAX_PER_PAGE,
)
leads, total = get_all_leads(
page=page,
per_page=per_page,
score_filter=sanitize_string(request.args.get("score", "")),
service_filter=sanitize_string(request.args.get("service", "")),
status_filter=sanitize_string(request.args.get("status", "")),
search=sanitize_string(request.args.get("search", ""), max_len=100),
sort_by=sanitize_string(request.args.get("sort_by", "score_points")),
sort_dir=sanitize_string(request.args.get("sort_dir", "DESC")),
)
total_pages = max(1, (total + per_page - 1) // per_page)
return jsonify({
"leads": [_lead_to_dict(l) for l in leads],
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
})
@app.route("/api/leads/<int:lead_id>/status", methods=["PATCH"])
@rate_limit(max_per_minute=60)
def api_update_status(lead_id):
"""Update a single lead's status."""
data = request.get_json(force=True) or {}
status = sanitize_string(data.get("status", ""))
if not status:
return jsonify({"error": "Missing 'status'"}), 400
ok = update_lead_status(lead_id, status)
if ok:
return jsonify({"success": True, "id": lead_id, "status": status})
return jsonify({"error": "Invalid status or lead not found"}), 400
@app.route("/api/leads/bulk-status", methods=["PATCH"])
@rate_limit(max_per_minute=20)
def api_bulk_status():
"""Update status for multiple leads."""
data = request.get_json(force=True) or {}
ids = data.get("ids", [])
status = sanitize_string(data.get("status", ""))
if not ids or not isinstance(ids, list):
return jsonify({"error": "Missing 'ids' array"}), 400
if not status:
return jsonify({"error": "Missing 'status'"}), 400
# Sanitize IDs
safe_ids = [int(i) for i in ids if isinstance(i, (int, float))][:100]
count = bulk_update_status(safe_ids, status)
return jsonify({"success": True, "updated": count, "status": status})
@app.route("/api/leads/<int:lead_id>/notes", methods=["PATCH"])
@rate_limit(max_per_minute=60)
def api_update_notes(lead_id):
"""Update user notes for a lead."""
data = request.get_json(force=True) or {}
notes = sanitize_string(data.get("notes", ""), max_len=2000)
ok = update_lead_notes(lead_id, notes)
if ok:
return jsonify({"success": True})
return jsonify({"error": "Lead not found"}), 404
# -- Stats ---------------------------------------------------------------------
@app.route("/api/stats")
def api_stats():
"""Dashboard statistics from SQLite."""
stats = db_get_stats()
top = get_top_leads(5)
stats["top_leads"] = [_lead_to_dict(l) for l in top]
return jsonify(stats)
# -- Pipeline management -------------------------------------------------------
@app.route("/api/jobs/cancel", methods=["POST"])
@rate_limit(max_per_minute=10)
def api_cancel_job():
"""Force-cancel any running job (fixes 'pipeline already running' stuck state)."""
cancelled = []
with _jobs_lock:
for jid, job in _jobs.items():
if job.get("running"):
job["running"] = False
job["error"] = "Manually cancelled by user"
job["progress"] = "Cancelled"
job["finished_at"] = datetime.now().isoformat()
cancelled.append(jid)
if cancelled:
logger.info(f"Cancelled jobs: {cancelled}")
return jsonify({"success": True, "cancelled": cancelled})
return jsonify({"success": True, "message": "No running jobs to cancel"})
@app.route("/api/leads/clear", methods=["POST"])
@rate_limit(max_per_minute=5)
def api_clear_leads():
"""Delete ALL leads from the database."""
data = request.get_json(force=True) or {}
confirm = data.get("confirm", False)
if not confirm:
return jsonify({"error": "Send {\"confirm\": true} to confirm deletion"}), 400
deleted = delete_all_leads()
logger.info(f"Cleared all leads: {deleted} deleted")
return jsonify({"success": True, "deleted": deleted})
# -- Exports -------------------------------------------------------------------
@app.route("/api/leads/export", methods=["POST"])
@app.route("/api/export/<fmt>", methods=["POST"])
@rate_limit(max_per_minute=10)
def api_export(fmt=None):
"""Export leads. Format from URL param or JSON body."""
if fmt is None:
data = request.get_json(force=True) or {}
fmt = sanitize_string(data.get("format", "csv"))
# Get all leads from DB (no pagination)
leads, total = get_all_leads(page=1, per_page=10000)
if not leads:
return jsonify({"error": "No leads to export"}), 400
# Group by service for exporters
leads_by_service = {}
for l in leads:
cat = l.service_category or "other"
if cat not in leads_by_service:
leads_by_service[cat] = []
leads_by_service[cat].append(l)
ts = datetime.now().strftime("%Y%m%d")
# Determine city from leads, fall back to "India"
location = "India"
if leads:
addr = leads[0].address or ""
# Try to extract a city name for the filename
for part in addr.split(","):
stripped = part.strip()
if stripped and len(stripped) > 2:
location = stripped.replace(" ", "_")
break
safe_loc = "".join(c for c in location if c.isalnum() or c == "_")[:30]
MIMETYPES = {
"pdf": "application/pdf",
"csv": "text/csv",
"json": "application/json",
}
try:
if fmt == "pdf":
exporter = PDFExporter()
fname = f"lead_report_{safe_loc}_{ts}.pdf"
path = exporter.export(
leads_by_service, location=location,
out_dir=config.REPORTS_DIR,
filename=fname,
)
elif fmt == "csv":
exporter = CSVExporter()
fname = f"leads_{safe_loc}_{ts}.csv"
path = exporter.export(
leads_by_service, location=location,
out_dir=config.DATA_DIR,
filename=fname,
)
elif fmt == "json":
exporter = JSONExporter()
fname = f"leads_{safe_loc}_{ts}.json"
path = exporter.export(
leads_by_service, location=location,
out_dir=config.DATA_DIR,
filename=fname,
)
else:
return jsonify({"error": f"Unknown format: {fmt}"}), 400
return send_file(
path,
mimetype=MIMETYPES.get(fmt, "application/octet-stream"),
as_attachment=True,
download_name=fname,
)
except Exception as exc:
return jsonify({"error": str(exc)}), 500
@app.route("/api/exports")
def api_exports():
"""List available export files."""
files = []
for dir_path in [config.REPORTS_DIR, config.DATA_DIR]:
if os.path.isdir(dir_path):
for fname in sorted(os.listdir(dir_path), reverse=True):
fpath = os.path.join(dir_path, fname)
if os.path.isfile(fpath):
stat = os.stat(fpath)
files.append({
"name": fname,
"size": stat.st_size,
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"type": fname.rsplit(".", 1)[-1].upper(),
})
return jsonify({"files": files[:50]})
@app.route("/api/download/<path:filename>")
def api_download(filename):
"""Download a specific export file."""
safe_name = os.path.basename(filename)
MIMETYPES = {
".pdf": "application/pdf",
".csv": "text/csv",
".json": "application/json",
}
ext = os.path.splitext(safe_name)[1].lower()
for dir_path in [config.REPORTS_DIR, config.DATA_DIR]:
fpath = os.path.join(dir_path, safe_name)
if os.path.isfile(fpath):
return send_file(
fpath,
mimetype=MIMETYPES.get(ext, "application/octet-stream"),
as_attachment=True,
download_name=safe_name,
)
return jsonify({"error": "File not found"}), 404
# ==============================================================================
# Pipeline runner (background thread)
# ==============================================================================
def _purge_demo_leads():
"""Remove all demo-sourced leads from the DB so they don't pollute live results."""
import sqlite3 as _sql
conn = _sql.connect(config.DB_PATH, timeout=10)
deleted = conn.execute(
"DELETE FROM leads WHERE lead_source_scraper = 'demo' OR source = 'Demo Data'"
).rowcount
conn.commit()
conn.close()
if deleted:
logger.info(f"Purged {deleted} demo leads from database")
def _run_pipeline(job_id: str):
"""Execute the full lead generation pipeline in a background thread."""
def _update(progress, lead_count=None):
with _jobs_lock:
_jobs[job_id]["progress"] = progress
if lead_count is not None:
_jobs[job_id]["lead_count"] = lead_count
try:
with _jobs_lock:
cfg = _jobs[job_id]["config"]
service = cfg["service"]
location = cfg["location"]
source = cfg["source"]
limit = cfg["limit"]
demo = cfg["demo"]
analyse = cfg["analyse"]
# If "all", use every category from SEARCH_KEYWORDS; otherwise just the one selected
if service == "all":
services = list(SEARCH_KEYWORDS.keys())
else:
services = [service]
# -- Collect --
_update("Collecting leads...")
leads = []
print(f"DEBUG demo flag: {demo}")
if demo:
scraper = DemoScraper()
leads = scraper.scrape(
location=location,
limit=limit * len(services),
services=services,
)
else:
# Purge old demo leads so they don't pollute live results
_purge_demo_leads()
_update("Cleared demo data. Starting live scraping...")
# SEARCH_KEYWORDS already imported at top of api_scrape
from config import SEARCH_KEYWORDS
# Google Maps
if source in ("googlemaps", "all"):
try:
from scraper.google_maps import GoogleMapsScraper
gm = GoogleMapsScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc)
if not keywords:
print(f"DEBUG SKIP Google Maps: no keywords for {svc!r}")
continue
print(f"DEBUG Google Maps | category={svc!r} keywords={keywords}")
per_kw = max(1, limit // len(keywords))
for kw in keywords[:3]:
_update(f"Google Maps: {kw} in {location}...")
new = gm.scrape(kw, location, limit=per_kw, category=svc)
for lead in new:
lead.service_category = svc
lead.source = "Google Maps"
lead.lead_source_scraper = "google_maps"
leads.extend(new)
_update(f"Google Maps: found {len(new)} for '{kw}'", len(leads))
except Exception as e:
logger.error(f"Google Maps scraper error: {e}")
_update(f"Google Maps: error - {e}")
# JustDial
if source in ("justdial", "all"):
try:
from scraper.justdial import JustDialScraper
jd = JustDialScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc)
if not keywords:
print(f"DEBUG SKIP JustDial: no keywords for {svc!r}")
continue
print(f"DEBUG JustDial | category={svc!r} keywords={keywords}")
per_kw = max(1, limit // len(keywords))
for kw in keywords[:3]:
_update(f"JustDial: {kw} in {location}...")
new = jd.scrape(kw, location, limit=per_kw)
for lead in new:
lead.service_category = svc
lead.source = "JustDial"
lead.lead_source_scraper = "justdial"
leads.extend(new)
_update(f"JustDial: found {len(new)} for '{kw}'", len(leads))
except Exception as e:
logger.error(f"JustDial scraper error: {e}")
_update(f"JustDial: error - {e}")
# IndiaMART
if source in ("indiamart", "all"):
try:
from scraper.indiamart import IndiaMARTScraper
im = IndiaMARTScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc)
if not keywords:
print(f"DEBUG SKIP IndiaMART: no keywords for {svc!r}")
continue
print(f"DEBUG IndiaMART | category={svc!r} keywords={keywords}")
per_kw = max(1, limit // len(keywords))
for kw in keywords[:2]:
_update(f"IndiaMART: {kw} in {location}...")
new = im.scrape(kw, location, limit=per_kw)
for lead in new:
lead.service_category = svc
lead.source = "IndiaMART"
lead.lead_source_scraper = "indiamart"
leads.extend(new)
except Exception as e:
logger.error(f"IndiaMART scraper error: {e}")
_update(f"IndiaMART: error - {e}")
# Sulekha
if source in ("sulekha", "all"):
try:
from scraper.sulekha import SulekhaScraper
sk = SulekhaScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc)
if not keywords:
print(f"DEBUG SKIP Sulekha: no keywords for {svc!r}")
continue
print(f"DEBUG Sulekha | category={svc!r} keywords={keywords}")
per_kw = max(1, limit // len(keywords))
for kw in keywords[:2]:
_update(f"Sulekha: {kw} in {location}...")
new = sk.scrape(kw, location, limit=per_kw)
for lead in new:
lead.service_category = svc
lead.source = "Sulekha"
lead.lead_source_scraper = "sulekha"
leads.extend(new)
except Exception as e:
logger.error(f"Sulekha scraper error: {e}")
_update(f"Sulekha: error - {e}")
print(f"DEBUG leads collected: {len(leads)}")
print(f"DEBUG first lead source: {leads[0].source if leads else 'none'}")
if not leads:
err_msg = "No leads found." if demo else "No leads found. Live scrapers returned 0 results. Enable Demo Mode to test with sample data."
with _jobs_lock:
_jobs[job_id]["running"] = False
_jobs[job_id]["progress"] = err_msg
_jobs[job_id]["error"] = err_msg
_jobs[job_id]["finished_at"] = datetime.now().isoformat()
return
# -- Process --
_update(f"Deduplicating {len(leads)} leads...")
dedup = Deduplicator(check_db=True)
leads = dedup.deduplicate(leads)
_update(f"Validating {len(leads)} leads...", len(leads))
validator = LeadValidator()
leads, rejected = validator.validate_all(leads)
_update(f"Validated: {len(leads)} valid, {len(rejected)} rejected", len(leads))
# Email finding
if not demo:
_update("Finding emails...")
try:
from scraper.email_finder import EmailFinder
finder = EmailFinder()
finder.find_emails_bulk(leads)
except Exception:
pass
# Website analysis
if analyse and not demo:
_update("Analysing websites...")
from scraper.website_analyzer import WebsiteAnalyzer
analyser = WebsiteAnalyzer()
for i, lead in enumerate(leads):
if lead.website:
_update(f"Analysing {i+1}/{len(leads)}: {lead.business_name}...")
analyser.analyse(lead)
_update("Categorizing...")
cat = Categorizer()
leads = cat.categorize_all(leads)
_update("Scoring...")
scorer = LeadScorer()
leads = scorer.score_all(leads)
# Ensure all leads have fingerprints
for lead in leads:
if not lead.fingerprint:
lead.fingerprint = lead_fingerprint(
lead.business_name, lead.phone, lead.email
)
# -- Persist to SQLite --
_update(f"Saving {len(leads)} leads to database...")
inserted, skipped = insert_leads(leads)
# -- Done --
with _jobs_lock:
_jobs[job_id]["running"] = False
_jobs[job_id]["progress"] = "Complete!"
_jobs[job_id]["lead_count"] = inserted + skipped
_jobs[job_id]["finished_at"] = datetime.now().isoformat()
except Exception as exc:
with _jobs_lock:
_jobs[job_id]["running"] = False
_jobs[job_id]["progress"] = f"Error: {exc}"
_jobs[job_id]["error"] = str(exc)
_jobs[job_id]["finished_at"] = datetime.now().isoformat()
# ==============================================================================
# Helpers
# ==============================================================================
def _lead_to_dict(lead: Lead) -> dict:
return {
"id": lead.db_id,
"business_name": lead.business_name,
"service_category": lead.service_category,
"service_label": lead.service_label,
"phone": lead.phone,
"email": lead.email,
"address": lead.address,
"website": lead.website,
"contact_person": lead.contact_person,
"source": lead.source,
"lead_source_scraper": lead.lead_source_scraper,
"notes": lead.notes,
"user_notes": lead.user_notes,
"score": lead.score,
"score_points": lead.score_points,
"industry": lead.industry,
"product_category": lead.product_category,
"status": lead.status,
"opportunity_pitch": lead.opportunity_pitch,
"has_https": lead.has_https,
"is_mobile_friendly": lead.is_mobile_friendly,
"is_outdated": lead.is_outdated,
"has_chatbot": lead.has_chatbot,
"has_online_booking": lead.has_online_booking,
"social_facebook": lead.social_facebook,
"social_instagram": lead.social_instagram,
"has_contact_form": lead.has_contact_form,
"ssl_expiry_days": lead.ssl_expiry_days,
"page_speed_score": lead.page_speed_score,
"load_time_sec": lead.load_time_sec,
"site_status": lead.site_status,
"gmb_exists": lead.gmb_exists,
"review_count": lead.review_count,
"whatsapp_link": lead.whatsapp_link,
"scraped_at": lead.scraped_at,
"updated_at": lead.updated_at,
}
# ==============================================================================
# Entry point
# ==============================================================================
if __name__ == "__main__":
print("\n" + "=" * 60)
print(" LeadGen Pro v2 -- Web Dashboard")
print("=" * 60)
print(" Open in your browser: http://localhost:5000")
print("=" * 60 + "\n")
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)