Spaces:
Running
Running
| """ | |
| BharatGraph - Full Data Pipeline | |
| Runs ALL 20 scrapers in parallel threads, cleans, resolves, saves. | |
| Usage: | |
| python -m processing.pipeline # all scrapers | |
| python -m processing.pipeline --scrapers cag,gem,pib # specific | |
| python -m processing.pipeline --parallel # force parallel | |
| """ | |
| import sys, os | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| import json, argparse | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from datetime import datetime | |
| from loguru import logger | |
| from processing.cleaner import NameCleaner | |
| from processing.entity_resolver_v2 import EntityResolverV2 as EntityResolver | |
| class BharatGraphPipeline: | |
| def __init__(self): | |
| self.cleaner = NameCleaner() | |
| self.resolver = EntityResolver(threshold=0.72) | |
| self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| os.makedirs("data/processed", exist_ok=True) | |
| os.makedirs("data/samples", exist_ok=True) | |
| logger.info("[Pipeline] Initialized") | |
| # ?? Scrapers ????????????????????????????????????????????????????????????? | |
| def run_datagov(self) -> list: | |
| try: | |
| from scrapers.datagov_scraper import DataGovScraper | |
| data = DataGovScraper().fetch_all_datasets(save=False) | |
| records = [] | |
| for ds_name, ds in data.items(): | |
| for r in ds.get("records", []): | |
| r["_source"] = "datagov" | |
| r["_dataset"] = ds_name | |
| records.append(r) | |
| logger.success(f"[Pipeline] DataGov: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.error(f"[Pipeline] DataGov failed: {e}") | |
| return [] | |
| def run_cag(self) -> list: | |
| try: | |
| from scrapers.cag_scraper import CAGScraper | |
| reports = CAGScraper().fetch_report_list(limit=50) | |
| for r in reports: r["_source"] = "cag" | |
| logger.success(f"[Pipeline] CAG: {len(reports)} reports") | |
| return reports | |
| except Exception as e: | |
| logger.error(f"[Pipeline] CAG failed: {e}") | |
| return [] | |
| def run_gem(self) -> list: | |
| try: | |
| from scrapers.gem_scraper import GeMScraper | |
| contracts = GeMScraper().fetch_contracts_by_ministry(limit=50) | |
| cleaned = [] | |
| for c in contracts: | |
| c["_source"] = "gem" | |
| cleaned.append(self.cleaner.clean_record(c, "contract")) | |
| logger.success(f"[Pipeline] GeM: {len(cleaned)} contracts") | |
| return cleaned | |
| except Exception as e: | |
| logger.error(f"[Pipeline] GeM failed: {e}") | |
| return [] | |
| def run_myneta(self) -> list: | |
| try: | |
| from scrapers.myneta_scraper import MyNetaScraper | |
| candidates = MyNetaScraper().fetch_sample_data(save=False) | |
| cleaned = [] | |
| for c in candidates: | |
| c["_source"] = "myneta" | |
| cleaned.append(self.cleaner.clean_record(c, "person")) | |
| logger.success(f"[Pipeline] MyNeta: {len(cleaned)} candidates") | |
| return cleaned | |
| except Exception as e: | |
| logger.error(f"[Pipeline] MyNeta failed: {e}") | |
| return [] | |
| def run_mca(self) -> list: | |
| try: | |
| from scrapers.mca_scraper import MCAScraper | |
| result = MCAScraper().fetch_and_save_sample(save=False) | |
| companies = [] | |
| for state, recs in result.items(): | |
| for r in recs: | |
| r["_source"] = "mca" | |
| companies.append(self.cleaner.clean_record(r, "company")) | |
| logger.success(f"[Pipeline] MCA: {len(companies)} companies") | |
| return companies | |
| except Exception as e: | |
| logger.error(f"[Pipeline] MCA failed: {e}") | |
| return [] | |
| def run_pib(self) -> list: | |
| try: | |
| from scrapers.pib_scraper import PIBScraper | |
| articles = PIBScraper().fetch_all_feeds(save=False) | |
| for a in articles: a["_source"] = "pib" | |
| logger.success(f"[Pipeline] PIB: {len(articles)} articles") | |
| return articles | |
| except Exception as e: | |
| logger.error(f"[Pipeline] PIB failed: {e}") | |
| return [] | |
| def run_loksabha(self) -> list: | |
| try: | |
| from scrapers.loksabha_scraper import LokSabhaScraper | |
| records = LokSabhaScraper().fetch_questions(limit=50) | |
| for r in records: r["_source"] = "loksabha" | |
| logger.success(f"[Pipeline] LokSabha: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] LokSabha failed: {e}") | |
| return [] | |
| def run_sebi(self) -> list: | |
| try: | |
| from scrapers.sebi_scraper import SEBIScraper | |
| records = SEBIScraper().fetch_enforcement_orders(limit=30) | |
| for r in records: r["_source"] = "sebi" | |
| logger.success(f"[Pipeline] SEBI: {len(records)} orders") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] SEBI failed: {e}") | |
| return [] | |
| def run_ed(self) -> list: | |
| try: | |
| from scrapers.ed_scraper import EDScraper | |
| records = EDScraper().fetch_press_releases(limit=30) | |
| for r in records: r["_source"] = "ed" | |
| logger.success(f"[Pipeline] ED: {len(records)} press releases") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] ED failed: {e}") | |
| return [] | |
| def run_cvc(self) -> list: | |
| try: | |
| from scrapers.cvc_scraper import CVCScraper | |
| records = CVCScraper().fetch_circulars(limit=30) | |
| for r in records: r["_source"] = "cvc" | |
| logger.success(f"[Pipeline] CVC: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] CVC failed: {e}") | |
| return [] | |
| def run_njdg(self) -> list: | |
| try: | |
| from scrapers.njdg_scraper import NJDGScraper | |
| records = NJDGScraper().fetch_pendency_stats() | |
| for r in records: r["_source"] = "njdg" | |
| logger.success(f"[Pipeline] NJDG: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] NJDG failed: {e}") | |
| return [] | |
| def run_electoral_bond(self) -> list: | |
| try: | |
| from scrapers.electoral_bond_scraper import ElectoralBondScraper | |
| records = ElectoralBondScraper().fetch_bond_data() | |
| for r in records: r["_source"] = "electoral_bond" | |
| logger.success(f"[Pipeline] Electoral Bonds: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] Electoral Bond failed: {e}") | |
| return [] | |
| def run_icij(self) -> list: | |
| try: | |
| from scrapers.icij_scraper import ICIJScraper | |
| # C-06 FIX: was hardcoded to 5 names -- now uses full known-entity list | |
| # from the graph to check for matches in ICIJ database | |
| entities = [ | |
| "Adani", "Reliance", "Tata", "Birla", "Ambani", | |
| "Gautam Adani", "Mukesh Ambani", "Anil Ambani", | |
| "Lodha", "DLF", "Essar", "Mallya", "Choksi", "Modi", | |
| "Ratan Tata", "Kumar Mangalam Birla", "Anil Agarwal", | |
| ] | |
| records = [] | |
| scraper = ICIJScraper() | |
| for name in entities: | |
| records.extend(scraper.search_entity(name)) | |
| for r in records: r["_source"] = "icij" | |
| logger.success(f"[Pipeline] ICIJ: {len(records)} matches") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] ICIJ failed: {e}") | |
| return [] | |
| def run_opensanctions(self) -> list: | |
| try: | |
| from scrapers.opensanctions_scraper import OpenSanctionsScraper | |
| # C-06 FIX: expanded from 5 hardcoded names to broader coverage | |
| entities = [ | |
| "Modi", "Gandhi", "Adani", "Choksi", "Mallya", | |
| "Nirav Modi", "Vijay Mallya", "Mehul Choksi", | |
| "Lalit Modi", "Subrata Roy", "Rana Kapoor", | |
| "Jagan Reddy", "Saradha Group", "Rose Valley", | |
| ] | |
| records = [] | |
| scraper = OpenSanctionsScraper() | |
| for name in entities: | |
| records.extend(scraper.search_entity(name)) | |
| for r in records: r["_source"] = "opensanctions" | |
| logger.success(f"[Pipeline] OpenSanctions: {len(records)} matches") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] OpenSanctions failed: {e}") | |
| return [] | |
| def run_wikidata(self) -> list: | |
| try: | |
| from scrapers.wikidata_scraper import WikidataScraper | |
| politicians = [ | |
| "Narendra Modi", "Rahul Gandhi", "Amit Shah", | |
| "Arvind Kejriwal", "Mamata Banerjee", "Nitish Kumar", | |
| "Yogi Adityanath", "Shashi Tharoor", "Anurag Thakur", | |
| ] | |
| records = WikidataScraper().enrich_entity_list(politicians) | |
| for r in records: r["_source"] = "wikidata" | |
| logger.success(f"[Pipeline] Wikidata: {len(records)} enrichments") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] Wikidata failed: {e}") | |
| return [] | |
| def run_ibbi(self) -> list: | |
| try: | |
| from scrapers.ibbi_scraper import IBBIScraper | |
| records = IBBIScraper().fetch_orders(limit=30) | |
| for r in records: r["_source"] = "ibbi" | |
| logger.success(f"[Pipeline] IBBI: {len(records)} orders") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] IBBI failed: {e}") | |
| return [] | |
| def run_ngo_darpan(self) -> list: | |
| try: | |
| from scrapers.ngo_darpan_scraper import NGODarpanScraper | |
| records = NGODarpanScraper().fetch_ngo_list(limit=30) | |
| for r in records: r["_source"] = "ngo_darpan" | |
| logger.success(f"[Pipeline] NGO Darpan: {len(records)} NGOs") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] NGO Darpan failed: {e}") | |
| return [] | |
| def run_cppp(self) -> list: | |
| try: | |
| from scrapers.cppp_scraper import CPPPScraper | |
| records = CPPPScraper().fetch_tenders(limit=30) | |
| for r in records: r["_source"] = "cppp" | |
| logger.success(f"[Pipeline] CPPP: {len(records)} tenders") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] CPPP failed: {e}") | |
| return [] | |
| def run_ncrb(self) -> list: | |
| try: | |
| from scrapers.ncrb_scraper import NCRBScraper | |
| records = NCRBScraper().fetch_crime_statistics(limit=20) | |
| for r in records: r["_source"] = "ncrb" | |
| logger.success(f"[Pipeline] NCRB: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] NCRB failed: {e}") | |
| return [] | |
| def run_lgd(self) -> list: | |
| try: | |
| from scrapers.lgd_scraper import LGDScraper | |
| records = LGDScraper().fetch_state_codes() | |
| for r in records: r["_source"] = "lgd" | |
| logger.success(f"[Pipeline] LGD: {len(records)} records") | |
| return records | |
| except Exception as e: | |
| logger.warning(f"[Pipeline] LGD failed: {e}") | |
| return [] | |
| def _get_runner_map(self): | |
| return { | |
| "datagov": self.run_datagov, | |
| "cag": self.run_cag, | |
| "gem": self.run_gem, | |
| "myneta": self.run_myneta, | |
| "mca": self.run_mca, | |
| "pib": self.run_pib, | |
| "loksabha": self.run_loksabha, | |
| "sebi": self.run_sebi, | |
| "ed": self.run_ed, | |
| "cvc": self.run_cvc, | |
| "njdg": self.run_njdg, | |
| "electoral_bond": self.run_electoral_bond, | |
| "icij": self.run_icij, | |
| "opensanctions": self.run_opensanctions, | |
| "wikidata": self.run_wikidata, | |
| "ibbi": self.run_ibbi, | |
| "ngo_darpan": self.run_ngo_darpan, | |
| "cppp": self.run_cppp, | |
| "ncrb": self.run_ncrb, | |
| "lgd": self.run_lgd, | |
| } | |
| def find_politician_company_links(self, politicians, companies): | |
| logger.info("[Pipeline] Finding politician-company links...") | |
| matches = self.resolver.cross_dataset_match( | |
| politicians, companies, | |
| name_field_a="name", name_field_b="name" | |
| ) | |
| if matches: | |
| logger.warning(f"[Pipeline] {len(matches)} politician-company links found!") | |
| return matches | |
| def save(self, data: dict) -> str: | |
| filepath = f"data/processed/pipeline_{self.timestamp}.json" | |
| with open(filepath, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| logger.success(f"[Pipeline] Saved to {filepath}") | |
| return filepath | |
| def run(self, scrapers: list = None, parallel: bool = True) -> dict: | |
| runner_map = self._get_runner_map() | |
| all_scrapers = list(runner_map.keys()) | |
| to_run = scrapers if scrapers else all_scrapers | |
| to_run = [s.strip() for s in to_run if s.strip() in runner_map] | |
| if not to_run: | |
| to_run = all_scrapers | |
| logger.info(f"[Pipeline] Starting {len(to_run)} scrapers " | |
| f"({'parallel' if parallel else 'sequential'}): {to_run}") | |
| start = datetime.now() | |
| raw = {} | |
| if parallel and len(to_run) > 1: | |
| with ThreadPoolExecutor(max_workers=6) as pool: | |
| futures = {pool.submit(runner_map[name]): name for name in to_run} | |
| for future in as_completed(futures): | |
| name = futures[future] | |
| try: | |
| raw[name] = future.result() | |
| except Exception as e: | |
| logger.error(f"[Pipeline] {name} thread failed: {e}") | |
| raw[name] = [] | |
| else: | |
| for name in to_run: | |
| raw[name] = runner_map[name]() | |
| # BUG-13 FIX: entity resolver now runs on ALL data sources that could | |
| # contain Indian persons/companies, not just myneta+mca. | |
| # ICIJ and OpenSanctions entities are now resolved against politicians | |
| # and companies so cross-dataset evidence chains are created. | |
| politicians = ( | |
| raw.get("myneta", []) + | |
| raw.get("wikidata", []) + | |
| raw.get("opensanctions",[]) + # BUG-13 FIX: added | |
| raw.get("icij", []) # BUG-13 FIX: added | |
| ) | |
| companies = ( | |
| raw.get("mca", []) + | |
| raw.get("ibbi", []) # BUG-13 FIX: added | |
| ) | |
| links = [] | |
| if politicians and companies: | |
| politicians = self.resolver.resolve_dataset(politicians, "name") | |
| companies = self.resolver.resolve_dataset(companies, "name") | |
| links = self.find_politician_company_links(politicians, companies) | |
| # Phase 32: rebuild alias graph from resolved records | |
| try: | |
| from processing.alias_graph import AliasGraph | |
| _ag = AliasGraph() | |
| _ag.bulk_add(politicians, "name", "id") | |
| _ag.bulk_add(companies, "name", "id") | |
| # Also add aliases from resolution | |
| for rec in politicians + companies: | |
| cid = rec.get("id", "") | |
| if cid: | |
| for alias in rec.get("aliases", []): | |
| if alias.get("name"): | |
| _ag.add(alias["name"], cid) | |
| _ag.save() | |
| logger.info(f"[Pipeline] Alias graph rebuilt: {len(_ag)} entries") | |
| except Exception as _ag_e: | |
| logger.warning(f"[Pipeline] Alias graph rebuild failed: {type(_ag_e).__name__}") | |
| duration = (datetime.now() - start).seconds | |
| per_source = {name: len(records) for name, records in raw.items()} | |
| summary = { | |
| "scrapers_run": to_run, | |
| "duration_seconds": duration, | |
| "parallel": parallel, | |
| "total_raw_records": sum(len(v) for v in raw.values()), | |
| "per_source": per_source, | |
| "politicians_found": len(raw.get("myneta", [])), | |
| "companies_found": len(raw.get("mca", [])), | |
| "contracts_found": len(raw.get("gem", [])), | |
| "cag_reports_found": len(raw.get("cag", [])), | |
| "pib_articles_found": len(raw.get("pib", [])), | |
| "politician_co_links":len(links), | |
| } | |
| logger.info(f"[Pipeline] Done in {duration}s -- " | |
| f"{summary['total_raw_records']} total records") | |
| results = { | |
| "raw": raw, | |
| "links": links, | |
| "summary": summary, | |
| "run_at": start.isoformat(), | |
| } | |
| # H-12 FIX: save only summary+links to disk -- raw scraper output | |
| # is NOT saved to git (it bloats git history unboundedly). | |
| # The full results dict (with raw) is returned in memory for the loader. | |
| saveable = { | |
| "summary": summary, | |
| "links": links, | |
| "run_at": start.isoformat(), | |
| } | |
| results["saved_to"] = self.save(saveable) | |
| return results | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="BharatGraph Full Pipeline (20 scrapers)") | |
| parser.add_argument("--scrapers", type=str, default=None) | |
| # M-08 FIX: --parallel was action=store_true,default=True making it always True | |
| # --sequential now sets parallel=False, and --parallel is removed | |
| parser.add_argument("--sequential", action="store_true", default=False, | |
| help="Run scrapers sequentially (default: parallel)") | |
| args = parser.parse_args() | |
| scrapers = args.scrapers.split(",") if args.scrapers else None | |
| parallel = not args.sequential | |
| results = BharatGraphPipeline().run(scrapers=scrapers, parallel=parallel) | |
| s = results["summary"] | |
| print(f"\n{'='*55}\nPIPELINE SUMMARY\n{'='*55}") | |
| print(f" Scrapers run: {len(s['scrapers_run'])}") | |
| print(f" Duration: {s['duration_seconds']}s") | |
| print(f" Total records: {s['total_raw_records']}") | |
| for src, n in sorted(s["per_source"].items(), key=lambda x: -x[1]): | |
| if n > 0: | |
| print(f" {src:<20} {n}") | |
| print(f" Saved to: {results.get('saved_to')}") | |
| print("=" * 55) | |