LeadGenPro / lead_gen /scraper.py
MaSTer-suFYan
feat: LeadGen Pro v2.0 β€” full system with bug fixes
beec01d
#!/usr/bin/env python3
"""
scraper.py β€” Lead Generation Scraper System β€” Main CLI entry point.
Usage examples
--------------
# Demo run (no network, instant results):
python scraper.py --demo --location="Mumbai" --limit=30
# Scrape web development leads from JustDial:
python scraper.py --service=web --location="Mumbai" --source=justdial --limit=50
# Scrape all services from Google Maps:
python scraper.py --service=all --location="Bangalore" --source=googlemaps --limit=100
# Scrape specific service, skip website analysis (faster):
python scraper.py --service=ai --location="Delhi" --no-analyse --limit=40
# Only export PDF (no CSV/JSON):
python scraper.py --demo --formats=pdf
Full option reference
---------------------
--service web | app | ai | all (default: all)
--location City name (default: Mumbai)
--source googlemaps | justdial | all (default: all)
--limit Max leads per service (default: 50)
--formats pdf,csv,json (comma-separated) (default: pdf,csv,json)
--no-analyse Skip website analysis (speeds up scraping)
--demo Use built-in demo data (no network)
--output-dir Custom output directory
--verbose Enable DEBUG logging
"""
import argparse
import sys
import os
import time
# ── Path setup (allow running from any directory) ──────────────────────────
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import config
from utils.logger import get_logger
from utils.helpers import timestamp_for_filename, now_str
logger = get_logger("scraper.main")
# ──────────────────────────────────────────────────────────────────────────────
# CLI argument parser
# ──────────────────────────────────────────────────────────────────────────────
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="scraper.py",
description="πŸš€ LeadGen Pro β€” Intelligent Business Lead Generation System",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--service", default="all",
choices=["web", "app", "ai", "all"],
help="Service category to target (default: all)",
)
parser.add_argument(
"--location", default=config.DEFAULT_LOCATION,
help="City / location to search (default: Mumbai)",
)
parser.add_argument(
"--source", default="all",
choices=["googlemaps", "justdial", "all"],
help="Data source(s) to scrape (default: all)",
)
parser.add_argument(
"--limit", type=int, default=config.DEFAULT_LIMIT,
help="Max leads per service category (default: 50)",
)
parser.add_argument(
"--formats", default="pdf,csv,json",
help="Comma-separated output formats: pdf,csv,json (default: all)",
)
parser.add_argument(
"--no-analyse", action="store_true",
help="Skip per-website analysis (faster but less intelligence)",
)
parser.add_argument(
"--demo", action="store_true",
help="Use built-in demo data β€” no network access required",
)
parser.add_argument(
"--output-dir", default=None,
help="Override output directory (default: output/)",
)
parser.add_argument(
"--verbose", action="store_true",
help="Enable DEBUG-level logging",
)
return parser
# ──────────────────────────────────────────────────────────────────────────────
# Pipeline orchestration
# ──────────────────────────────────────────────────────────────────────────────
def run_pipeline(args: argparse.Namespace) -> None:
"""End-to-end lead generation pipeline."""
# ── 0. Logging verbosity ──────────────────────────────────────────────
if args.verbose:
config.LOG_LEVEL = "DEBUG"
# ── 1. Resolve services to scrape ─────────────────────────────────────
services = (
["web", "app", "ai"] if args.service == "all"
else [args.service]
)
formats = [f.strip().lower() for f in args.formats.split(",")]
_banner(args, services)
start_time = time.monotonic()
# ── 2. Collect leads ──────────────────────────────────────────────────
all_leads = _collect_leads(args, services)
if not all_leads:
logger.warning("No leads collected β€” exiting.")
print("\n⚠️ No leads were collected. Use --demo to run with sample data.\n")
return
# ── 3. Process ─────────────────────────────────────────────────────────
all_leads = _process(all_leads, args, services)
# ── 4. Export ──────────────────────────────────────────────────────────
output_paths = _export(all_leads, args, services, formats)
# ── 5. Summary ─────────────────────────────────────────────────────────
elapsed = time.monotonic() - start_time
_print_summary(all_leads, output_paths, elapsed)
# ──────────────────────────────────────────────────────────────────────────────
# Step 2: Collect
# ──────────────────────────────────────────────────────────────────────────────
def _collect_leads(args, services):
from models import Lead
from typing import List
leads: List[Lead] = []
if args.demo:
logger.info("Running in DEMO mode β€” using built-in dataset.")
print(" πŸ“¦ Loading demo data...")
from scraper.demo_scraper import DemoScraper
demo = DemoScraper()
leads = demo.scrape(
location=args.location,
limit=args.limit * len(services),
services=services,
)
return leads
# ── Live scraping ─────────────────────────────────────────────────────
from config import SEARCH_KEYWORDS
if args.source in ("googlemaps", "all"):
try:
from scraper.google_maps import GoogleMapsScraper
gm = GoogleMapsScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc, ["businesses"])
per_kw = max(1, args.limit // len(keywords))
for kw in keywords[:3]: # Limit keyword iterations
logger.info(f"[Google Maps] keyword={kw!r} svc={svc}")
print(f" πŸ—Ί Google Maps: {kw} in {args.location}...")
new = gm.scrape(kw, args.location, limit=per_kw)
for lead in new:
lead.service_category = svc
leads.extend(new)
except Exception as exc:
logger.error(f"Google Maps scraper failed: {exc}")
if args.source in ("justdial", "all"):
try:
from scraper.justdial import JustDialScraper
jd = JustDialScraper()
for svc in services:
keywords = SEARCH_KEYWORDS.get(svc, ["businesses"])
per_kw = max(1, args.limit // len(keywords))
for kw in keywords[:3]:
logger.info(f"[JustDial] keyword={kw!r} svc={svc}")
print(f" πŸ“‹ JustDial: {kw} in {args.location}...")
new = jd.scrape(kw, args.location, limit=per_kw)
for lead in new:
lead.service_category = svc
leads.extend(new)
except Exception as exc:
logger.error(f"JustDial scraper failed: {exc}")
return leads
# ──────────────────────────────────────────────────────────────────────────────
# Step 3: Process
# ──────────────────────────────────────────────────────────────────────────────
def _process(all_leads, args, services):
from processors.deduplicator import Deduplicator
from processors.categorizer import Categorizer
from processors.scorer import LeadScorer
print(f"\n πŸ”§ Processing {len(all_leads)} raw leads...")
# Deduplication
dedup = Deduplicator()
all_leads = dedup.deduplicate(all_leads)
# Website analysis (optional)
if not args.no_analyse and not args.demo:
from scraper.website_analyzer import WebsiteAnalyzer
analyser = WebsiteAnalyzer()
total = len(all_leads)
print(f" πŸ” Analysing {total} websites (use --no-analyse to skip)...")
for i, lead in enumerate(all_leads, 1):
if lead.website:
analyser.analyse(lead)
if i % 10 == 0:
print(f" {i}/{total} analysed...", end="\r")
print()
# Categorization
cat = Categorizer()
all_leads = cat.categorize_all(all_leads)
# Scoring
scorer = LeadScorer()
all_leads = scorer.score_all(all_leads)
return all_leads
# ──────────────────────────────────────────────────────────────────────────────
# Step 4: Export
# ──────────────────────────────────────────────────────────────────────────────
def _export(all_leads, args, services, formats):
from processors.categorizer import Categorizer
from exporters.pdf_exporter import PDFExporter
from exporters.csv_exporter import CSVExporter
from exporters.json_exporter import JSONExporter
cat = Categorizer()
leads_by_service = cat.split_by_service(all_leads)
out_dir = args.output_dir or config.OUTPUT_DIR
reports_dir = os.path.join(out_dir, "reports")
data_dir = os.path.join(out_dir, "data")
ts = timestamp_for_filename()
loc_slug = args.location.replace(" ", "_")
print(f"\n πŸ“„ Exporting results...")
paths = {}
if "pdf" in formats:
try:
pdf = PDFExporter()
p = pdf.export(
leads_by_service,
location = args.location,
out_dir = reports_dir,
filename = f"lead_report_{loc_slug}_{ts}.pdf",
)
paths["pdf"] = p
print(f" βœ… PDF β†’ {p}")
except Exception as exc:
logger.error(f"PDF export failed: {exc}")
if "csv" in formats:
try:
csv_exp = CSVExporter()
p = csv_exp.export(
leads_by_service,
location = args.location,
out_dir = data_dir,
filename = f"leads_{loc_slug}_{ts}.csv",
)
paths["csv"] = p
print(f" βœ… CSV β†’ {p}")
except Exception as exc:
logger.error(f"CSV export failed: {exc}")
if "json" in formats:
try:
json_exp = JSONExporter()
p = json_exp.export(
leads_by_service,
location = args.location,
out_dir = data_dir,
filename = f"leads_{loc_slug}_{ts}.json",
)
paths["json"] = p
print(f" βœ… JSON β†’ {p}")
except Exception as exc:
logger.error(f"JSON export failed: {exc}")
return paths
# ──────────────────────────────────────────────────────────────────────────────
# Pretty I/O helpers
# ──────────────────────────────────────────────────────────────────────────────
def _banner(args, services):
print("\n" + "═" * 60)
print(" πŸš€ LeadGen Pro β€” Business Lead Generation System")
print("═" * 60)
print(f" Location : {args.location}")
print(f" Services : {', '.join(s.upper() for s in services)}")
print(f" Source : {'Demo Data' if args.demo else args.source.title()}")
print(f" Limit : {args.limit} per service")
print(f" Mode : {'DEMO (offline)' if args.demo else 'LIVE'}")
print("─" * 60 + "\n")
def _print_summary(all_leads, output_paths, elapsed):
from collections import Counter
scores = Counter(l.score for l in all_leads)
cats = Counter(l.service_category for l in all_leads)
print("\n" + "═" * 60)
print(" πŸ“Š RESULTS SUMMARY")
print("═" * 60)
print(f" Total Leads : {len(all_leads)}")
print(f" β”œβ”€ Web Dev : {cats.get('web', 0)}")
print(f" β”œβ”€ App Dev : {cats.get('app', 0)}")
print(f" └─ AI Auto : {cats.get('ai', 0)}")
print()
print(f" Priority Breakdown:")
print(f" β”œβ”€ πŸ”΄ HIGH : {scores.get('HIGH', 0)}")
print(f" β”œβ”€ 🟠 MEDIUM : {scores.get('MEDIUM', 0)}")
print(f" └─ 🟒 LOW : {scores.get('LOW', 0)}")
print()
print(f" ⏱ Time : {elapsed:.1f}s")
print()
if output_paths:
print(" πŸ“ Output Files:")
for fmt, path in output_paths.items():
print(f" β”œβ”€ {fmt.upper():5s} β†’ {path}")
print("═" * 60 + "\n")
# ──────────────────────────────────────────────────────────────────────────────
# Entry point
# ──────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
parser = build_parser()
args = parser.parse_args()
try:
run_pipeline(args)
except KeyboardInterrupt:
print("\n\n⚠️ Interrupted by user.\n")
sys.exit(0)
except Exception as exc:
logger.exception(f"Unhandled error: {exc}")
print(f"\n❌ Fatal error: {exc}\n")
sys.exit(1)