Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| scraper.py β Lead Generation Scraper System β Main CLI entry point. | |
| Usage examples | |
| -------------- | |
| # Demo run (no network, instant results): | |
| python scraper.py --demo --location="Mumbai" --limit=30 | |
| # Scrape web development leads from JustDial: | |
| python scraper.py --service=web --location="Mumbai" --source=justdial --limit=50 | |
| # Scrape all services from Google Maps: | |
| python scraper.py --service=all --location="Bangalore" --source=googlemaps --limit=100 | |
| # Scrape specific service, skip website analysis (faster): | |
| python scraper.py --service=ai --location="Delhi" --no-analyse --limit=40 | |
| # Only export PDF (no CSV/JSON): | |
| python scraper.py --demo --formats=pdf | |
| Full option reference | |
| --------------------- | |
| --service web | app | ai | all (default: all) | |
| --location City name (default: Mumbai) | |
| --source googlemaps | justdial | all (default: all) | |
| --limit Max leads per service (default: 50) | |
| --formats pdf,csv,json (comma-separated) (default: pdf,csv,json) | |
| --no-analyse Skip website analysis (speeds up scraping) | |
| --demo Use built-in demo data (no network) | |
| --output-dir Custom output directory | |
| --verbose Enable DEBUG logging | |
| """ | |
| import argparse | |
| import sys | |
| import os | |
| import time | |
| # ββ Path setup (allow running from any directory) ββββββββββββββββββββββββββ | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| import config | |
| from utils.logger import get_logger | |
| from utils.helpers import timestamp_for_filename, now_str | |
| logger = get_logger("scraper.main") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CLI argument parser | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_parser() -> argparse.ArgumentParser: | |
| parser = argparse.ArgumentParser( | |
| prog="scraper.py", | |
| description="π LeadGen Pro β Intelligent Business Lead Generation System", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=__doc__, | |
| ) | |
| parser.add_argument( | |
| "--service", default="all", | |
| choices=["web", "app", "ai", "all"], | |
| help="Service category to target (default: all)", | |
| ) | |
| parser.add_argument( | |
| "--location", default=config.DEFAULT_LOCATION, | |
| help="City / location to search (default: Mumbai)", | |
| ) | |
| parser.add_argument( | |
| "--source", default="all", | |
| choices=["googlemaps", "justdial", "all"], | |
| help="Data source(s) to scrape (default: all)", | |
| ) | |
| parser.add_argument( | |
| "--limit", type=int, default=config.DEFAULT_LIMIT, | |
| help="Max leads per service category (default: 50)", | |
| ) | |
| parser.add_argument( | |
| "--formats", default="pdf,csv,json", | |
| help="Comma-separated output formats: pdf,csv,json (default: all)", | |
| ) | |
| parser.add_argument( | |
| "--no-analyse", action="store_true", | |
| help="Skip per-website analysis (faster but less intelligence)", | |
| ) | |
| parser.add_argument( | |
| "--demo", action="store_true", | |
| help="Use built-in demo data β no network access required", | |
| ) | |
| parser.add_argument( | |
| "--output-dir", default=None, | |
| help="Override output directory (default: output/)", | |
| ) | |
| parser.add_argument( | |
| "--verbose", action="store_true", | |
| help="Enable DEBUG-level logging", | |
| ) | |
| return parser | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pipeline orchestration | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_pipeline(args: argparse.Namespace) -> None: | |
| """End-to-end lead generation pipeline.""" | |
| # ββ 0. Logging verbosity ββββββββββββββββββββββββββββββββββββββββββββββ | |
| if args.verbose: | |
| config.LOG_LEVEL = "DEBUG" | |
| # ββ 1. Resolve services to scrape βββββββββββββββββββββββββββββββββββββ | |
| services = ( | |
| ["web", "app", "ai"] if args.service == "all" | |
| else [args.service] | |
| ) | |
| formats = [f.strip().lower() for f in args.formats.split(",")] | |
| _banner(args, services) | |
| start_time = time.monotonic() | |
| # ββ 2. Collect leads ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| all_leads = _collect_leads(args, services) | |
| if not all_leads: | |
| logger.warning("No leads collected β exiting.") | |
| print("\nβ οΈ No leads were collected. Use --demo to run with sample data.\n") | |
| return | |
| # ββ 3. Process βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| all_leads = _process(all_leads, args, services) | |
| # ββ 4. Export ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| output_paths = _export(all_leads, args, services, formats) | |
| # ββ 5. Summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| elapsed = time.monotonic() - start_time | |
| _print_summary(all_leads, output_paths, elapsed) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 2: Collect | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _collect_leads(args, services): | |
| from models import Lead | |
| from typing import List | |
| leads: List[Lead] = [] | |
| if args.demo: | |
| logger.info("Running in DEMO mode β using built-in dataset.") | |
| print(" π¦ Loading demo data...") | |
| from scraper.demo_scraper import DemoScraper | |
| demo = DemoScraper() | |
| leads = demo.scrape( | |
| location=args.location, | |
| limit=args.limit * len(services), | |
| services=services, | |
| ) | |
| return leads | |
| # ββ Live scraping βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from config import SEARCH_KEYWORDS | |
| if args.source in ("googlemaps", "all"): | |
| try: | |
| from scraper.google_maps import GoogleMapsScraper | |
| gm = GoogleMapsScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc, ["businesses"]) | |
| per_kw = max(1, args.limit // len(keywords)) | |
| for kw in keywords[:3]: # Limit keyword iterations | |
| logger.info(f"[Google Maps] keyword={kw!r} svc={svc}") | |
| print(f" πΊ Google Maps: {kw} in {args.location}...") | |
| new = gm.scrape(kw, args.location, limit=per_kw) | |
| for lead in new: | |
| lead.service_category = svc | |
| leads.extend(new) | |
| except Exception as exc: | |
| logger.error(f"Google Maps scraper failed: {exc}") | |
| if args.source in ("justdial", "all"): | |
| try: | |
| from scraper.justdial import JustDialScraper | |
| jd = JustDialScraper() | |
| for svc in services: | |
| keywords = SEARCH_KEYWORDS.get(svc, ["businesses"]) | |
| per_kw = max(1, args.limit // len(keywords)) | |
| for kw in keywords[:3]: | |
| logger.info(f"[JustDial] keyword={kw!r} svc={svc}") | |
| print(f" π JustDial: {kw} in {args.location}...") | |
| new = jd.scrape(kw, args.location, limit=per_kw) | |
| for lead in new: | |
| lead.service_category = svc | |
| leads.extend(new) | |
| except Exception as exc: | |
| logger.error(f"JustDial scraper failed: {exc}") | |
| return leads | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 3: Process | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process(all_leads, args, services): | |
| from processors.deduplicator import Deduplicator | |
| from processors.categorizer import Categorizer | |
| from processors.scorer import LeadScorer | |
| print(f"\n π§ Processing {len(all_leads)} raw leads...") | |
| # Deduplication | |
| dedup = Deduplicator() | |
| all_leads = dedup.deduplicate(all_leads) | |
| # Website analysis (optional) | |
| if not args.no_analyse and not args.demo: | |
| from scraper.website_analyzer import WebsiteAnalyzer | |
| analyser = WebsiteAnalyzer() | |
| total = len(all_leads) | |
| print(f" π Analysing {total} websites (use --no-analyse to skip)...") | |
| for i, lead in enumerate(all_leads, 1): | |
| if lead.website: | |
| analyser.analyse(lead) | |
| if i % 10 == 0: | |
| print(f" {i}/{total} analysed...", end="\r") | |
| print() | |
| # Categorization | |
| cat = Categorizer() | |
| all_leads = cat.categorize_all(all_leads) | |
| # Scoring | |
| scorer = LeadScorer() | |
| all_leads = scorer.score_all(all_leads) | |
| return all_leads | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 4: Export | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _export(all_leads, args, services, formats): | |
| from processors.categorizer import Categorizer | |
| from exporters.pdf_exporter import PDFExporter | |
| from exporters.csv_exporter import CSVExporter | |
| from exporters.json_exporter import JSONExporter | |
| cat = Categorizer() | |
| leads_by_service = cat.split_by_service(all_leads) | |
| out_dir = args.output_dir or config.OUTPUT_DIR | |
| reports_dir = os.path.join(out_dir, "reports") | |
| data_dir = os.path.join(out_dir, "data") | |
| ts = timestamp_for_filename() | |
| loc_slug = args.location.replace(" ", "_") | |
| print(f"\n π Exporting results...") | |
| paths = {} | |
| if "pdf" in formats: | |
| try: | |
| pdf = PDFExporter() | |
| p = pdf.export( | |
| leads_by_service, | |
| location = args.location, | |
| out_dir = reports_dir, | |
| filename = f"lead_report_{loc_slug}_{ts}.pdf", | |
| ) | |
| paths["pdf"] = p | |
| print(f" β PDF β {p}") | |
| except Exception as exc: | |
| logger.error(f"PDF export failed: {exc}") | |
| if "csv" in formats: | |
| try: | |
| csv_exp = CSVExporter() | |
| p = csv_exp.export( | |
| leads_by_service, | |
| location = args.location, | |
| out_dir = data_dir, | |
| filename = f"leads_{loc_slug}_{ts}.csv", | |
| ) | |
| paths["csv"] = p | |
| print(f" β CSV β {p}") | |
| except Exception as exc: | |
| logger.error(f"CSV export failed: {exc}") | |
| if "json" in formats: | |
| try: | |
| json_exp = JSONExporter() | |
| p = json_exp.export( | |
| leads_by_service, | |
| location = args.location, | |
| out_dir = data_dir, | |
| filename = f"leads_{loc_slug}_{ts}.json", | |
| ) | |
| paths["json"] = p | |
| print(f" β JSON β {p}") | |
| except Exception as exc: | |
| logger.error(f"JSON export failed: {exc}") | |
| return paths | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Pretty I/O helpers | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _banner(args, services): | |
| print("\n" + "β" * 60) | |
| print(" π LeadGen Pro β Business Lead Generation System") | |
| print("β" * 60) | |
| print(f" Location : {args.location}") | |
| print(f" Services : {', '.join(s.upper() for s in services)}") | |
| print(f" Source : {'Demo Data' if args.demo else args.source.title()}") | |
| print(f" Limit : {args.limit} per service") | |
| print(f" Mode : {'DEMO (offline)' if args.demo else 'LIVE'}") | |
| print("β" * 60 + "\n") | |
| def _print_summary(all_leads, output_paths, elapsed): | |
| from collections import Counter | |
| scores = Counter(l.score for l in all_leads) | |
| cats = Counter(l.service_category for l in all_leads) | |
| print("\n" + "β" * 60) | |
| print(" π RESULTS SUMMARY") | |
| print("β" * 60) | |
| print(f" Total Leads : {len(all_leads)}") | |
| print(f" ββ Web Dev : {cats.get('web', 0)}") | |
| print(f" ββ App Dev : {cats.get('app', 0)}") | |
| print(f" ββ AI Auto : {cats.get('ai', 0)}") | |
| print() | |
| print(f" Priority Breakdown:") | |
| print(f" ββ π΄ HIGH : {scores.get('HIGH', 0)}") | |
| print(f" ββ π MEDIUM : {scores.get('MEDIUM', 0)}") | |
| print(f" ββ π’ LOW : {scores.get('LOW', 0)}") | |
| print() | |
| print(f" β± Time : {elapsed:.1f}s") | |
| print() | |
| if output_paths: | |
| print(" π Output Files:") | |
| for fmt, path in output_paths.items(): | |
| print(f" ββ {fmt.upper():5s} β {path}") | |
| print("β" * 60 + "\n") | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Entry point | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == "__main__": | |
| parser = build_parser() | |
| args = parser.parse_args() | |
| try: | |
| run_pipeline(args) | |
| except KeyboardInterrupt: | |
| print("\n\nβ οΈ Interrupted by user.\n") | |
| sys.exit(0) | |
| except Exception as exc: | |
| logger.exception(f"Unhandled error: {exc}") | |
| print(f"\nβ Fatal error: {exc}\n") | |
| sys.exit(1) | |