Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| app.py -- Flask API server for LeadGen Pro web dashboard (v2). | |
| Endpoints: | |
| POST /api/scrape -> start scrape job, return job_id | |
| GET /api/status/<id> -> SSE streaming progress | |
| GET /api/leads -> paginated + filtered leads from SQLite | |
| POST /api/leads/export -> trigger PDF/CSV/JSON export | |
| GET /api/stats -> dashboard statistics | |
| PATCH /api/leads/<id>/status -> update single lead status | |
| PATCH /api/leads/bulk-status -> update multiple leads' status | |
| PATCH /api/leads/<id>/notes -> update lead notes | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import uuid | |
| import threading | |
| import re | |
| from datetime import datetime | |
| from functools import wraps | |
| from flask import Flask, jsonify, request, send_file, send_from_directory, Response | |
| from flask_cors import CORS | |
| from utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| # -- Path setup ---------------------------------------------------------------- | |
| BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| sys.path.insert(0, BASE_DIR) | |
| import config | |
| from models import Lead | |
| from utils.database import ( | |
| init_db, insert_leads, get_all_leads, get_lead_by_id, | |
| update_lead_status, bulk_update_status, update_lead_notes, | |
| get_stats as db_get_stats, get_top_leads, delete_all_leads, | |
| ) | |
| from utils.helpers import timestamp_for_filename, lead_fingerprint | |
| from scraper.demo_scraper import DemoScraper | |
| from processors.deduplicator import Deduplicator | |
| from processors.categorizer import Categorizer | |
| from processors.scorer import LeadScorer | |
| from processors.validator import LeadValidator | |
| from exporters.pdf_exporter import PDFExporter | |
| from exporters.csv_exporter import CSVExporter | |
| from exporters.json_exporter import JSONExporter | |
| # -- Flask app ----------------------------------------------------------------- | |
| app = Flask(__name__, static_folder="static", static_url_path="") | |
| CORS(app) | |
| # -- Initialize database ------------------------------------------------------- | |
| init_db() | |
| # -- Job tracking (in-memory) -------------------------------------------------- | |
| _jobs = {} # job_id -> { status, progress, lead_count, ... } | |
| _jobs_lock = threading.Lock() | |
| # ============================================================================== | |
| # Input sanitization helpers | |
| # ============================================================================== | |
| def sanitize_string(s, max_len=200): | |
| if not isinstance(s, str): | |
| return "" | |
| return re.sub(r'[<>"\';]', '', s)[:max_len] | |
| def safe_int(val, default=1, min_val=1, max_val=500): | |
| try: | |
| n = int(val) | |
| return max(min_val, min(n, max_val)) | |
| except (TypeError, ValueError): | |
| return default | |
| def rate_limit(max_per_minute=30): | |
| """Simple per-IP rate limiter decorator.""" | |
| _store = {} | |
| def decorator(f): | |
| def wrapper(*args, **kwargs): | |
| key = request.remote_addr or "unknown" | |
| now = time.time() | |
| calls = [t for t in _store.get(key, []) if now - t < 60] | |
| if len(calls) >= max_per_minute: | |
| return jsonify({"error": "Rate limit exceeded"}), 429 | |
| calls.append(now) | |
| _store[key] = calls | |
| return f(*args, **kwargs) | |
| # Preserve the original function name so Flask doesn't get duplicate endpoints | |
| wrapper.__name__ = f.__name__ | |
| wrapper.__qualname__ = f.__qualname__ | |
| return wrapper | |
| return decorator | |
| # ============================================================================== | |
| # Routes | |
| # ============================================================================== | |
| def index(): | |
| return send_from_directory("static", "index.html") | |
| # -- Scrape job ---------------------------------------------------------------- | |
| def api_scrape(): | |
| """Start a scrape job. Returns job_id for SSE progress tracking.""" | |
| # Check if any job is already running (auto-expire stale jobs > 10 min) | |
| with _jobs_lock: | |
| stale_ids = [] | |
| for jid, job in _jobs.items(): | |
| if job.get("running"): | |
| started = job.get("started_at", "") | |
| if started: | |
| try: | |
| elapsed = (datetime.now() - datetime.fromisoformat(started)).total_seconds() | |
| if elapsed > 600: # 10 minutes | |
| stale_ids.append(jid) | |
| continue | |
| except (ValueError, TypeError): | |
| pass | |
| return jsonify({"error": "A job is already running", "job_id": jid}), 409 | |
| # Clean up stale jobs | |
| for jid in stale_ids: | |
| _jobs[jid]["running"] = False | |
| _jobs[jid]["error"] = "Auto-cancelled: exceeded 10 minute timeout" | |
| _jobs[jid]["progress"] = "Timed out" | |
| _jobs[jid]["finished_at"] = datetime.now().isoformat() | |
| logger.warning(f"Auto-cancelled stale job {jid}") | |
| data = request.get_json(force=True) or {} | |
| job_id = str(uuid.uuid4())[:8] | |
| service_val = sanitize_string(data.get("service", "all")) | |
| demo_val = data.get("demo") if isinstance(data.get("demo"), bool) else str(data.get("demo", "true")).lower() != "false" | |
| # Validate category against SEARCH_KEYWORDS (skip for demo mode) | |
| from config import SEARCH_KEYWORDS | |
| if not demo_val and service_val != "all" and service_val not in SEARCH_KEYWORDS: | |
| print(f"DEBUG REJECTED unknown category: {service_val!r}") | |
| return jsonify({"error": f"Unknown category: {service_val}. Valid: {', '.join(sorted(SEARCH_KEYWORDS.keys()))}"}), 400 | |
| print(f"DEBUG category received: {service_val!r}") | |
| job = { | |
| "id": job_id, | |
| "running": True, | |
| "progress": "Starting pipeline...", | |
| "lead_count": 0, | |
| "started_at": datetime.now().isoformat(), | |
| "finished_at": None, | |
| "error": None, | |
| "config": { | |
| "service": service_val, | |
| "location": sanitize_string(data.get("location", "Mumbai")), | |
| "source": sanitize_string(data.get("source", "all")), | |
| "limit": safe_int(data.get("limit", 50), default=50, max_val=500), | |
| "demo": demo_val, | |
| "analyse": data.get("analyse") if isinstance(data.get("analyse"), bool) else str(data.get("analyse", "false")).lower() == "true", | |
| }, | |
| } | |
| with _jobs_lock: | |
| _jobs[job_id] = job | |
| thread = threading.Thread( | |
| target=_run_pipeline, args=(job_id,), daemon=True | |
| ) | |
| thread.start() | |
| return jsonify({"job_id": job_id, "status": "started"}) | |
| def api_status_sse(job_id): | |
| """SSE stream for real-time job progress.""" | |
| def generate(): | |
| while True: | |
| with _jobs_lock: | |
| job = _jobs.get(job_id) | |
| if not job: | |
| yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" | |
| break | |
| payload = { | |
| "running": job["running"], | |
| "progress": job["progress"], | |
| "lead_count": job["lead_count"], | |
| "error": job["error"], | |
| } | |
| yield f"data: {json.dumps(payload)}\n\n" | |
| if not job["running"]: | |
| break | |
| time.sleep(0.8) | |
| return Response( | |
| generate(), | |
| mimetype="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| def api_status_simple(): | |
| """Simple JSON status (backward compat + polling fallback).""" | |
| with _jobs_lock: | |
| running_job = None | |
| for job in _jobs.values(): | |
| if job.get("running"): | |
| running_job = job | |
| break | |
| stats = db_get_stats() | |
| return jsonify({ | |
| "running": running_job is not None, | |
| "progress": running_job.get("progress", "") if running_job else "", | |
| "job_id": running_job["id"] if running_job else None, | |
| "total_leads": stats["total"], | |
| }) | |
| # -- Leads CRUD ---------------------------------------------------------------- | |
| def api_leads(): | |
| """Paginated, filtered leads from SQLite.""" | |
| page = safe_int(request.args.get("page", 1), default=1, min_val=1, max_val=10000) | |
| per_page = safe_int( | |
| request.args.get("per_page", config.DEFAULT_PER_PAGE), | |
| default=config.DEFAULT_PER_PAGE, | |
| min_val=1, | |
| max_val=config.MAX_PER_PAGE, | |
| ) | |
| leads, total = get_all_leads( | |
| page=page, | |
| per_page=per_page, | |
| score_filter=sanitize_string(request.args.get("score", "")), | |
| service_filter=sanitize_string(request.args.get("service", "")), | |
| status_filter=sanitize_string(request.args.get("status", "")), | |
| search=sanitize_string(request.args.get("search", ""), max_len=100), | |
| sort_by=sanitize_string(request.args.get("sort_by", "score_points")), | |
| sort_dir=sanitize_string(request.args.get("sort_dir", "DESC")), | |
| ) | |
| total_pages = max(1, (total + per_page - 1) // per_page) | |
| return jsonify({ | |
| "leads": [_lead_to_dict(l) for l in leads], | |
| "total": total, | |
| "page": page, | |
| "per_page": per_page, | |
| "total_pages": total_pages, | |
| }) | |
| def api_update_status(lead_id): | |
| """Update a single lead's status.""" | |
| data = request.get_json(force=True) or {} | |
| status = sanitize_string(data.get("status", "")) | |
| if not status: | |
| return jsonify({"error": "Missing 'status'"}), 400 | |
| ok = update_lead_status(lead_id, status) | |
| if ok: | |
| return jsonify({"success": True, "id": lead_id, "status": status}) | |
| return jsonify({"error": "Invalid status or lead not found"}), 400 | |
| def api_bulk_status(): | |
| """Update status for multiple leads.""" | |
| data = request.get_json(force=True) or {} | |
| ids = data.get("ids", []) | |
| status = sanitize_string(data.get("status", "")) | |
| if not ids or not isinstance(ids, list): | |
| return jsonify({"error": "Missing 'ids' array"}), 400 | |
| if not status: | |
| return jsonify({"error": "Missing 'status'"}), 400 | |
| # Sanitize IDs | |
| safe_ids = [int(i) for i in ids if isinstance(i, (int, float))][:100] | |
| count = bulk_update_status(safe_ids, status) | |
| return jsonify({"success": True, "updated": count, "status": status}) | |
| def api_update_notes(lead_id): | |
| """Update user notes for a lead.""" | |
| data = request.get_json(force=True) or {} | |
| notes = sanitize_string(data.get("notes", ""), max_len=2000) | |
| ok = update_lead_notes(lead_id, notes) | |
| if ok: | |
| return jsonify({"success": True}) | |
| return jsonify({"error": "Lead not found"}), 404 | |
| # -- Stats --------------------------------------------------------------------- | |
| def api_stats(): | |
| """Dashboard statistics from SQLite.""" | |
| stats = db_get_stats() | |
| top = get_top_leads(5) | |
| stats["top_leads"] = [_lead_to_dict(l) for l in top] | |
| return jsonify(stats) | |
| # -- Pipeline management ------------------------------------------------------- | |
| def api_cancel_job(): | |
| """Force-cancel any running job (fixes 'pipeline already running' stuck state).""" | |
| cancelled = [] | |
| with _jobs_lock: | |
| for jid, job in _jobs.items(): | |
| if job.get("running"): | |
| job["running"] = False | |
| job["error"] = "Manually cancelled by user" | |
| job["progress"] = "Cancelled" | |
| job["finished_at"] = datetime.now().isoformat() | |
| cancelled.append(jid) | |
| if cancelled: | |
| logger.info(f"Cancelled jobs: {cancelled}") | |
| return jsonify({"success": True, "cancelled": cancelled}) | |
| return jsonify({"success": True, "message": "No running jobs to cancel"}) | |
| def api_clear_leads(): | |
| """Delete ALL leads from the database.""" | |
| data = request.get_json(force=True) or {} | |
| confirm = data.get("confirm", False) | |
| if not confirm: | |
| return jsonify({"error": "Send {\"confirm\": true} to confirm deletion"}), 400 | |
| deleted = delete_all_leads() | |
| logger.info(f"Cleared all leads: {deleted} deleted") | |
| return jsonify({"success": True, "deleted": deleted}) | |
| # -- Exports ------------------------------------------------------------------- | |
| def api_export(fmt=None): | |
| """Export leads. Format from URL param or JSON body.""" | |
| if fmt is None: | |
| data = request.get_json(force=True) or {} | |
| fmt = sanitize_string(data.get("format", "csv")) | |
| # Get all leads from DB (no pagination) | |
| leads, total = get_all_leads(page=1, per_page=10000) | |
| if not leads: | |
| return jsonify({"error": "No leads to export"}), 400 | |
| # Group by service for exporters | |
| leads_by_service = {} | |
| for l in leads: | |
| cat = l.service_category or "other" | |
| if cat not in leads_by_service: | |
| leads_by_service[cat] = [] | |
| leads_by_service[cat].append(l) | |
| ts = datetime.now().strftime("%Y%m%d") | |
| # Determine city from leads, fall back to "India" | |
| location = "India" | |
| if leads: | |
| addr = leads[0].address or "" | |
| # Try to extract a city name for the filename | |
| for part in addr.split(","): | |
| stripped = part.strip() | |
| if stripped and len(stripped) > 2: | |
| location = stripped.replace(" ", "_") | |
| break | |
| safe_loc = "".join(c for c in location if c.isalnum() or c == "_")[:30] | |
| MIMETYPES = { | |
| "pdf": "application/pdf", | |
| "csv": "text/csv", | |
| "json": "application/json", | |
| } | |
| try: | |
| if fmt == "pdf": | |
| exporter = PDFExporter() | |
| fname = f"lead_report_{safe_loc}_{ts}.pdf" | |
| path = exporter.export( | |
| leads_by_service, location=location, | |
| out_dir=config.REPORTS_DIR, | |
| filename=fname, | |
| ) | |
| elif fmt == "csv": | |
| exporter = CSVExporter() | |
| fname = f"leads_{safe_loc}_{ts}.csv" | |
| path = exporter.export( | |
| leads_by_service, location=location, | |
| out_dir=config.DATA_DIR, | |
| filename=fname, | |
| ) | |
| elif fmt == "json": | |
| exporter = JSONExporter() | |
| fname = f"leads_{safe_loc}_{ts}.json" | |
| path = exporter.export( | |
| leads_by_service, location=location, | |
| out_dir=config.DATA_DIR, | |
| filename=fname, | |
| ) | |
| else: | |
| return jsonify({"error": f"Unknown format: {fmt}"}), 400 | |
| return send_file( | |
| path, | |
| mimetype=MIMETYPES.get(fmt, "application/octet-stream"), | |
| as_attachment=True, | |
| download_name=fname, | |
| ) | |
| except Exception as exc: | |
| return jsonify({"error": str(exc)}), 500 | |
| def api_exports(): | |
| """List available export files.""" | |
| files = [] | |
| for dir_path in [config.REPORTS_DIR, config.DATA_DIR]: | |
| if os.path.isdir(dir_path): | |
| for fname in sorted(os.listdir(dir_path), reverse=True): | |
| fpath = os.path.join(dir_path, fname) | |
| if os.path.isfile(fpath): | |
| stat = os.stat(fpath) | |
| files.append({ | |
| "name": fname, | |
| "size": stat.st_size, | |
| "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(), | |
| "type": fname.rsplit(".", 1)[-1].upper(), | |
| }) | |
| return jsonify({"files": files[:50]}) | |
| def api_download(filename): | |
| """Download a specific export file.""" | |
| safe_name = os.path.basename(filename) | |
| MIMETYPES = { | |
| ".pdf": "application/pdf", | |
| ".csv": "text/csv", | |
| ".json": "application/json", | |
| } | |
| ext = os.path.splitext(safe_name)[1].lower() | |
| for dir_path in [config.REPORTS_DIR, config.DATA_DIR]: | |
| fpath = os.path.join(dir_path, safe_name) | |
| if os.path.isfile(fpath): | |
| return send_file( | |
| fpath, | |
| mimetype=MIMETYPES.get(ext, "application/octet-stream"), | |
| as_attachment=True, | |
| download_name=safe_name, | |
| ) | |
| return jsonify({"error": "File not found"}), 404 | |
| # ============================================================================== | |
| # Pipeline runner (background thread) | |
| # ============================================================================== | |
| def _purge_demo_leads(): | |
| """Remove all demo-sourced leads from the DB so they don't pollute live results.""" | |
| import sqlite3 as _sql | |
| conn = _sql.connect(config.DB_PATH, timeout=10) | |
| deleted = conn.execute( | |
| "DELETE FROM leads WHERE lead_source_scraper = 'demo' OR source = 'Demo Data'" | |
| ).rowcount | |
| conn.commit() | |
| conn.close() | |
| if deleted: | |
| logger.info(f"Purged {deleted} demo leads from database") | |
| def _run_pipeline(job_id: str): | |
| """Execute the full lead generation pipeline in a background thread.""" | |
| def _update(progress, lead_count=None): | |
| with _jobs_lock: | |
| _jobs[job_id]["progress"] = progress | |
| if lead_count is not None: | |
| _jobs[job_id]["lead_count"] = lead_count | |
| try: | |
| with _jobs_lock: | |
| cfg = _jobs[job_id]["config"] | |
| service = cfg["service"] | |
| location = cfg["location"] | |
| source = cfg["source"] | |
| limit = cfg["limit"] | |
| demo = cfg["demo"] | |
| analyse = cfg["analyse"] | |
| # If "all", use every category from SEARCH_KEYWORDS; otherwise just the one selected | |
| if service == "all": | |
| services = list(SEARCH_KEYWORDS.keys()) | |
| else: | |
| services = [service] | |
| # -- Collect -- | |
| _update("Collecting leads...") | |
| leads = [] | |
| print(f"DEBUG demo flag: {demo}") | |
| if demo: | |
| scraper = DemoScraper() | |
| leads = scraper.scrape( | |
| location=location, | |
| limit=limit * len(services), | |
| services=services, | |
| ) | |
| else: | |
| # Purge old demo leads so they don't pollute live results | |
| _purge_demo_leads() | |
| _update("Cleared demo data. Starting live scraping...") | |
| # SEARCH_KEYWORDS already imported at top of api_scrape | |
| from config import SEARCH_KEYWORDS | |
| # Google Maps | |
| if source in ("googlemaps", "all"): | |
| try: | |
| from scraper.google_maps import GoogleMapsScraper | |
| gm = GoogleMapsScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc) | |
| if not keywords: | |
| print(f"DEBUG SKIP Google Maps: no keywords for {svc!r}") | |
| continue | |
| print(f"DEBUG Google Maps | category={svc!r} keywords={keywords}") | |
| per_kw = max(1, limit // len(keywords)) | |
| for kw in keywords[:3]: | |
| _update(f"Google Maps: {kw} in {location}...") | |
| new = gm.scrape(kw, location, limit=per_kw, category=svc) | |
| for lead in new: | |
| lead.service_category = svc | |
| lead.source = "Google Maps" | |
| lead.lead_source_scraper = "google_maps" | |
| leads.extend(new) | |
| _update(f"Google Maps: found {len(new)} for '{kw}'", len(leads)) | |
| except Exception as e: | |
| logger.error(f"Google Maps scraper error: {e}") | |
| _update(f"Google Maps: error - {e}") | |
| # JustDial | |
| if source in ("justdial", "all"): | |
| try: | |
| from scraper.justdial import JustDialScraper | |
| jd = JustDialScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc) | |
| if not keywords: | |
| print(f"DEBUG SKIP JustDial: no keywords for {svc!r}") | |
| continue | |
| print(f"DEBUG JustDial | category={svc!r} keywords={keywords}") | |
| per_kw = max(1, limit // len(keywords)) | |
| for kw in keywords[:3]: | |
| _update(f"JustDial: {kw} in {location}...") | |
| new = jd.scrape(kw, location, limit=per_kw) | |
| for lead in new: | |
| lead.service_category = svc | |
| lead.source = "JustDial" | |
| lead.lead_source_scraper = "justdial" | |
| leads.extend(new) | |
| _update(f"JustDial: found {len(new)} for '{kw}'", len(leads)) | |
| except Exception as e: | |
| logger.error(f"JustDial scraper error: {e}") | |
| _update(f"JustDial: error - {e}") | |
| # IndiaMART | |
| if source in ("indiamart", "all"): | |
| try: | |
| from scraper.indiamart import IndiaMARTScraper | |
| im = IndiaMARTScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc) | |
| if not keywords: | |
| print(f"DEBUG SKIP IndiaMART: no keywords for {svc!r}") | |
| continue | |
| print(f"DEBUG IndiaMART | category={svc!r} keywords={keywords}") | |
| per_kw = max(1, limit // len(keywords)) | |
| for kw in keywords[:2]: | |
| _update(f"IndiaMART: {kw} in {location}...") | |
| new = im.scrape(kw, location, limit=per_kw) | |
| for lead in new: | |
| lead.service_category = svc | |
| lead.source = "IndiaMART" | |
| lead.lead_source_scraper = "indiamart" | |
| leads.extend(new) | |
| except Exception as e: | |
| logger.error(f"IndiaMART scraper error: {e}") | |
| _update(f"IndiaMART: error - {e}") | |
| # Sulekha | |
| if source in ("sulekha", "all"): | |
| try: | |
| from scraper.sulekha import SulekhaScraper | |
| sk = SulekhaScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc) | |
| if not keywords: | |
| print(f"DEBUG SKIP Sulekha: no keywords for {svc!r}") | |
| continue | |
| print(f"DEBUG Sulekha | category={svc!r} keywords={keywords}") | |
| per_kw = max(1, limit // len(keywords)) | |
| for kw in keywords[:2]: | |
| _update(f"Sulekha: {kw} in {location}...") | |
| new = sk.scrape(kw, location, limit=per_kw) | |
| for lead in new: | |
| lead.service_category = svc | |
| lead.source = "Sulekha" | |
| lead.lead_source_scraper = "sulekha" | |
| leads.extend(new) | |
| except Exception as e: | |
| logger.error(f"Sulekha scraper error: {e}") | |
| _update(f"Sulekha: error - {e}") | |
| print(f"DEBUG leads collected: {len(leads)}") | |
| print(f"DEBUG first lead source: {leads[0].source if leads else 'none'}") | |
| if not leads: | |
| err_msg = "No leads found." if demo else "No leads found. Live scrapers returned 0 results. Enable Demo Mode to test with sample data." | |
| with _jobs_lock: | |
| _jobs[job_id]["running"] = False | |
| _jobs[job_id]["progress"] = err_msg | |
| _jobs[job_id]["error"] = err_msg | |
| _jobs[job_id]["finished_at"] = datetime.now().isoformat() | |
| return | |
| # -- Process -- | |
| _update(f"Deduplicating {len(leads)} leads...") | |
| dedup = Deduplicator(check_db=True) | |
| leads = dedup.deduplicate(leads) | |
| _update(f"Validating {len(leads)} leads...", len(leads)) | |
| validator = LeadValidator() | |
| leads, rejected = validator.validate_all(leads) | |
| _update(f"Validated: {len(leads)} valid, {len(rejected)} rejected", len(leads)) | |
| # Email finding | |
| if not demo: | |
| _update("Finding emails...") | |
| try: | |
| from scraper.email_finder import EmailFinder | |
| finder = EmailFinder() | |
| finder.find_emails_bulk(leads) | |
| except Exception: | |
| pass | |
| # Website analysis | |
| if analyse and not demo: | |
| _update("Analysing websites...") | |
| from scraper.website_analyzer import WebsiteAnalyzer | |
| analyser = WebsiteAnalyzer() | |
| for i, lead in enumerate(leads): | |
| if lead.website: | |
| _update(f"Analysing {i+1}/{len(leads)}: {lead.business_name}...") | |
| analyser.analyse(lead) | |
| _update("Categorizing...") | |
| cat = Categorizer() | |
| leads = cat.categorize_all(leads) | |
| _update("Scoring...") | |
| scorer = LeadScorer() | |
| leads = scorer.score_all(leads) | |
| # Ensure all leads have fingerprints | |
| for lead in leads: | |
| if not lead.fingerprint: | |
| lead.fingerprint = lead_fingerprint( | |
| lead.business_name, lead.phone, lead.email | |
| ) | |
| # -- Persist to SQLite -- | |
| _update(f"Saving {len(leads)} leads to database...") | |
| inserted, skipped = insert_leads(leads) | |
| # -- Done -- | |
| with _jobs_lock: | |
| _jobs[job_id]["running"] = False | |
| _jobs[job_id]["progress"] = "Complete!" | |
| _jobs[job_id]["lead_count"] = inserted + skipped | |
| _jobs[job_id]["finished_at"] = datetime.now().isoformat() | |
| except Exception as exc: | |
| with _jobs_lock: | |
| _jobs[job_id]["running"] = False | |
| _jobs[job_id]["progress"] = f"Error: {exc}" | |
| _jobs[job_id]["error"] = str(exc) | |
| _jobs[job_id]["finished_at"] = datetime.now().isoformat() | |
| # ============================================================================== | |
| # Helpers | |
| # ============================================================================== | |
| def _lead_to_dict(lead: Lead) -> dict: | |
| return { | |
| "id": lead.db_id, | |
| "business_name": lead.business_name, | |
| "service_category": lead.service_category, | |
| "service_label": lead.service_label, | |
| "phone": lead.phone, | |
| "email": lead.email, | |
| "address": lead.address, | |
| "website": lead.website, | |
| "contact_person": lead.contact_person, | |
| "source": lead.source, | |
| "lead_source_scraper": lead.lead_source_scraper, | |
| "notes": lead.notes, | |
| "user_notes": lead.user_notes, | |
| "score": lead.score, | |
| "score_points": lead.score_points, | |
| "industry": lead.industry, | |
| "product_category": lead.product_category, | |
| "status": lead.status, | |
| "opportunity_pitch": lead.opportunity_pitch, | |
| "has_https": lead.has_https, | |
| "is_mobile_friendly": lead.is_mobile_friendly, | |
| "is_outdated": lead.is_outdated, | |
| "has_chatbot": lead.has_chatbot, | |
| "has_online_booking": lead.has_online_booking, | |
| "social_facebook": lead.social_facebook, | |
| "social_instagram": lead.social_instagram, | |
| "has_contact_form": lead.has_contact_form, | |
| "ssl_expiry_days": lead.ssl_expiry_days, | |
| "page_speed_score": lead.page_speed_score, | |
| "load_time_sec": lead.load_time_sec, | |
| "site_status": lead.site_status, | |
| "gmb_exists": lead.gmb_exists, | |
| "review_count": lead.review_count, | |
| "whatsapp_link": lead.whatsapp_link, | |
| "scraped_at": lead.scraped_at, | |
| "updated_at": lead.updated_at, | |
| } | |
| # ============================================================================== | |
| # Entry point | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| print("\n" + "=" * 60) | |
| print(" LeadGen Pro v2 -- Web Dashboard") | |
| print("=" * 60) | |
| print(" Open in your browser: http://localhost:5000") | |
| print("=" * 60 + "\n") | |
| app.run(host="0.0.0.0", port=5000, debug=False, threaded=True) | |