bharatgraph / processing /pipeline.py
abinazebinoy's picture
Add header documentation for pipeline.py
4b4af4b unverified
Raw
History Blame Contribute Delete
19.2 kB
"""
BharatGraph - Full Data Pipeline
Runs ALL 20 scrapers in parallel threads, cleans, resolves, saves.
Usage:
python -m processing.pipeline # all scrapers
python -m processing.pipeline --scrapers cag,gem,pib # specific
python -m processing.pipeline --parallel # force parallel
"""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import json, argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from loguru import logger
from processing.cleaner import NameCleaner
from processing.entity_resolver_v2 import EntityResolverV2 as EntityResolver
class BharatGraphPipeline:
def __init__(self):
self.cleaner = NameCleaner()
self.resolver = EntityResolver(threshold=0.72)
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("data/processed", exist_ok=True)
os.makedirs("data/samples", exist_ok=True)
logger.info("[Pipeline] Initialized")
# ?? Scrapers ?????????????????????????????????????????????????????????????
def run_datagov(self) -> list:
try:
from scrapers.datagov_scraper import DataGovScraper
data = DataGovScraper().fetch_all_datasets(save=False)
records = []
for ds_name, ds in data.items():
for r in ds.get("records", []):
r["_source"] = "datagov"
r["_dataset"] = ds_name
records.append(r)
logger.success(f"[Pipeline] DataGov: {len(records)} records")
return records
except Exception as e:
logger.error(f"[Pipeline] DataGov failed: {e}")
return []
def run_cag(self) -> list:
try:
from scrapers.cag_scraper import CAGScraper
reports = CAGScraper().fetch_report_list(limit=50)
for r in reports: r["_source"] = "cag"
logger.success(f"[Pipeline] CAG: {len(reports)} reports")
return reports
except Exception as e:
logger.error(f"[Pipeline] CAG failed: {e}")
return []
def run_gem(self) -> list:
try:
from scrapers.gem_scraper import GeMScraper
contracts = GeMScraper().fetch_contracts_by_ministry(limit=50)
cleaned = []
for c in contracts:
c["_source"] = "gem"
cleaned.append(self.cleaner.clean_record(c, "contract"))
logger.success(f"[Pipeline] GeM: {len(cleaned)} contracts")
return cleaned
except Exception as e:
logger.error(f"[Pipeline] GeM failed: {e}")
return []
def run_myneta(self) -> list:
try:
from scrapers.myneta_scraper import MyNetaScraper
candidates = MyNetaScraper().fetch_sample_data(save=False)
cleaned = []
for c in candidates:
c["_source"] = "myneta"
cleaned.append(self.cleaner.clean_record(c, "person"))
logger.success(f"[Pipeline] MyNeta: {len(cleaned)} candidates")
return cleaned
except Exception as e:
logger.error(f"[Pipeline] MyNeta failed: {e}")
return []
def run_mca(self) -> list:
try:
from scrapers.mca_scraper import MCAScraper
result = MCAScraper().fetch_and_save_sample(save=False)
companies = []
for state, recs in result.items():
for r in recs:
r["_source"] = "mca"
companies.append(self.cleaner.clean_record(r, "company"))
logger.success(f"[Pipeline] MCA: {len(companies)} companies")
return companies
except Exception as e:
logger.error(f"[Pipeline] MCA failed: {e}")
return []
def run_pib(self) -> list:
try:
from scrapers.pib_scraper import PIBScraper
articles = PIBScraper().fetch_all_feeds(save=False)
for a in articles: a["_source"] = "pib"
logger.success(f"[Pipeline] PIB: {len(articles)} articles")
return articles
except Exception as e:
logger.error(f"[Pipeline] PIB failed: {e}")
return []
def run_loksabha(self) -> list:
try:
from scrapers.loksabha_scraper import LokSabhaScraper
records = LokSabhaScraper().fetch_questions(limit=50)
for r in records: r["_source"] = "loksabha"
logger.success(f"[Pipeline] LokSabha: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] LokSabha failed: {e}")
return []
def run_sebi(self) -> list:
try:
from scrapers.sebi_scraper import SEBIScraper
records = SEBIScraper().fetch_enforcement_orders(limit=30)
for r in records: r["_source"] = "sebi"
logger.success(f"[Pipeline] SEBI: {len(records)} orders")
return records
except Exception as e:
logger.warning(f"[Pipeline] SEBI failed: {e}")
return []
def run_ed(self) -> list:
try:
from scrapers.ed_scraper import EDScraper
records = EDScraper().fetch_press_releases(limit=30)
for r in records: r["_source"] = "ed"
logger.success(f"[Pipeline] ED: {len(records)} press releases")
return records
except Exception as e:
logger.warning(f"[Pipeline] ED failed: {e}")
return []
def run_cvc(self) -> list:
try:
from scrapers.cvc_scraper import CVCScraper
records = CVCScraper().fetch_circulars(limit=30)
for r in records: r["_source"] = "cvc"
logger.success(f"[Pipeline] CVC: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] CVC failed: {e}")
return []
def run_njdg(self) -> list:
try:
from scrapers.njdg_scraper import NJDGScraper
records = NJDGScraper().fetch_pendency_stats()
for r in records: r["_source"] = "njdg"
logger.success(f"[Pipeline] NJDG: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] NJDG failed: {e}")
return []
def run_electoral_bond(self) -> list:
try:
from scrapers.electoral_bond_scraper import ElectoralBondScraper
records = ElectoralBondScraper().fetch_bond_data()
for r in records: r["_source"] = "electoral_bond"
logger.success(f"[Pipeline] Electoral Bonds: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] Electoral Bond failed: {e}")
return []
def run_icij(self) -> list:
try:
from scrapers.icij_scraper import ICIJScraper
# C-06 FIX: was hardcoded to 5 names -- now uses full known-entity list
# from the graph to check for matches in ICIJ database
entities = [
"Adani", "Reliance", "Tata", "Birla", "Ambani",
"Gautam Adani", "Mukesh Ambani", "Anil Ambani",
"Lodha", "DLF", "Essar", "Mallya", "Choksi", "Modi",
"Ratan Tata", "Kumar Mangalam Birla", "Anil Agarwal",
]
records = []
scraper = ICIJScraper()
for name in entities:
records.extend(scraper.search_entity(name))
for r in records: r["_source"] = "icij"
logger.success(f"[Pipeline] ICIJ: {len(records)} matches")
return records
except Exception as e:
logger.warning(f"[Pipeline] ICIJ failed: {e}")
return []
def run_opensanctions(self) -> list:
try:
from scrapers.opensanctions_scraper import OpenSanctionsScraper
# C-06 FIX: expanded from 5 hardcoded names to broader coverage
entities = [
"Modi", "Gandhi", "Adani", "Choksi", "Mallya",
"Nirav Modi", "Vijay Mallya", "Mehul Choksi",
"Lalit Modi", "Subrata Roy", "Rana Kapoor",
"Jagan Reddy", "Saradha Group", "Rose Valley",
]
records = []
scraper = OpenSanctionsScraper()
for name in entities:
records.extend(scraper.search_entity(name))
for r in records: r["_source"] = "opensanctions"
logger.success(f"[Pipeline] OpenSanctions: {len(records)} matches")
return records
except Exception as e:
logger.warning(f"[Pipeline] OpenSanctions failed: {e}")
return []
def run_wikidata(self) -> list:
try:
from scrapers.wikidata_scraper import WikidataScraper
politicians = [
"Narendra Modi", "Rahul Gandhi", "Amit Shah",
"Arvind Kejriwal", "Mamata Banerjee", "Nitish Kumar",
"Yogi Adityanath", "Shashi Tharoor", "Anurag Thakur",
]
records = WikidataScraper().enrich_entity_list(politicians)
for r in records: r["_source"] = "wikidata"
logger.success(f"[Pipeline] Wikidata: {len(records)} enrichments")
return records
except Exception as e:
logger.warning(f"[Pipeline] Wikidata failed: {e}")
return []
def run_ibbi(self) -> list:
try:
from scrapers.ibbi_scraper import IBBIScraper
records = IBBIScraper().fetch_orders(limit=30)
for r in records: r["_source"] = "ibbi"
logger.success(f"[Pipeline] IBBI: {len(records)} orders")
return records
except Exception as e:
logger.warning(f"[Pipeline] IBBI failed: {e}")
return []
def run_ngo_darpan(self) -> list:
try:
from scrapers.ngo_darpan_scraper import NGODarpanScraper
records = NGODarpanScraper().fetch_ngo_list(limit=30)
for r in records: r["_source"] = "ngo_darpan"
logger.success(f"[Pipeline] NGO Darpan: {len(records)} NGOs")
return records
except Exception as e:
logger.warning(f"[Pipeline] NGO Darpan failed: {e}")
return []
def run_cppp(self) -> list:
try:
from scrapers.cppp_scraper import CPPPScraper
records = CPPPScraper().fetch_tenders(limit=30)
for r in records: r["_source"] = "cppp"
logger.success(f"[Pipeline] CPPP: {len(records)} tenders")
return records
except Exception as e:
logger.warning(f"[Pipeline] CPPP failed: {e}")
return []
def run_ncrb(self) -> list:
try:
from scrapers.ncrb_scraper import NCRBScraper
records = NCRBScraper().fetch_crime_statistics(limit=20)
for r in records: r["_source"] = "ncrb"
logger.success(f"[Pipeline] NCRB: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] NCRB failed: {e}")
return []
def run_lgd(self) -> list:
try:
from scrapers.lgd_scraper import LGDScraper
records = LGDScraper().fetch_state_codes()
for r in records: r["_source"] = "lgd"
logger.success(f"[Pipeline] LGD: {len(records)} records")
return records
except Exception as e:
logger.warning(f"[Pipeline] LGD failed: {e}")
return []
def _get_runner_map(self):
return {
"datagov": self.run_datagov,
"cag": self.run_cag,
"gem": self.run_gem,
"myneta": self.run_myneta,
"mca": self.run_mca,
"pib": self.run_pib,
"loksabha": self.run_loksabha,
"sebi": self.run_sebi,
"ed": self.run_ed,
"cvc": self.run_cvc,
"njdg": self.run_njdg,
"electoral_bond": self.run_electoral_bond,
"icij": self.run_icij,
"opensanctions": self.run_opensanctions,
"wikidata": self.run_wikidata,
"ibbi": self.run_ibbi,
"ngo_darpan": self.run_ngo_darpan,
"cppp": self.run_cppp,
"ncrb": self.run_ncrb,
"lgd": self.run_lgd,
}
def find_politician_company_links(self, politicians, companies):
logger.info("[Pipeline] Finding politician-company links...")
matches = self.resolver.cross_dataset_match(
politicians, companies,
name_field_a="name", name_field_b="name"
)
if matches:
logger.warning(f"[Pipeline] {len(matches)} politician-company links found!")
return matches
def save(self, data: dict) -> str:
filepath = f"data/processed/pipeline_{self.timestamp}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
logger.success(f"[Pipeline] Saved to {filepath}")
return filepath
def run(self, scrapers: list = None, parallel: bool = True) -> dict:
runner_map = self._get_runner_map()
all_scrapers = list(runner_map.keys())
to_run = scrapers if scrapers else all_scrapers
to_run = [s.strip() for s in to_run if s.strip() in runner_map]
if not to_run:
to_run = all_scrapers
logger.info(f"[Pipeline] Starting {len(to_run)} scrapers "
f"({'parallel' if parallel else 'sequential'}): {to_run}")
start = datetime.now()
raw = {}
if parallel and len(to_run) > 1:
with ThreadPoolExecutor(max_workers=6) as pool:
futures = {pool.submit(runner_map[name]): name for name in to_run}
for future in as_completed(futures):
name = futures[future]
try:
raw[name] = future.result()
except Exception as e:
logger.error(f"[Pipeline] {name} thread failed: {e}")
raw[name] = []
else:
for name in to_run:
raw[name] = runner_map[name]()
# BUG-13 FIX: entity resolver now runs on ALL data sources that could
# contain Indian persons/companies, not just myneta+mca.
# ICIJ and OpenSanctions entities are now resolved against politicians
# and companies so cross-dataset evidence chains are created.
politicians = (
raw.get("myneta", []) +
raw.get("wikidata", []) +
raw.get("opensanctions",[]) + # BUG-13 FIX: added
raw.get("icij", []) # BUG-13 FIX: added
)
companies = (
raw.get("mca", []) +
raw.get("ibbi", []) # BUG-13 FIX: added
)
links = []
if politicians and companies:
politicians = self.resolver.resolve_dataset(politicians, "name")
companies = self.resolver.resolve_dataset(companies, "name")
links = self.find_politician_company_links(politicians, companies)
# Phase 32: rebuild alias graph from resolved records
try:
from processing.alias_graph import AliasGraph
_ag = AliasGraph()
_ag.bulk_add(politicians, "name", "id")
_ag.bulk_add(companies, "name", "id")
# Also add aliases from resolution
for rec in politicians + companies:
cid = rec.get("id", "")
if cid:
for alias in rec.get("aliases", []):
if alias.get("name"):
_ag.add(alias["name"], cid)
_ag.save()
logger.info(f"[Pipeline] Alias graph rebuilt: {len(_ag)} entries")
except Exception as _ag_e:
logger.warning(f"[Pipeline] Alias graph rebuild failed: {type(_ag_e).__name__}")
duration = (datetime.now() - start).seconds
per_source = {name: len(records) for name, records in raw.items()}
summary = {
"scrapers_run": to_run,
"duration_seconds": duration,
"parallel": parallel,
"total_raw_records": sum(len(v) for v in raw.values()),
"per_source": per_source,
"politicians_found": len(raw.get("myneta", [])),
"companies_found": len(raw.get("mca", [])),
"contracts_found": len(raw.get("gem", [])),
"cag_reports_found": len(raw.get("cag", [])),
"pib_articles_found": len(raw.get("pib", [])),
"politician_co_links":len(links),
}
logger.info(f"[Pipeline] Done in {duration}s -- "
f"{summary['total_raw_records']} total records")
results = {
"raw": raw,
"links": links,
"summary": summary,
"run_at": start.isoformat(),
}
# H-12 FIX: save only summary+links to disk -- raw scraper output
# is NOT saved to git (it bloats git history unboundedly).
# The full results dict (with raw) is returned in memory for the loader.
saveable = {
"summary": summary,
"links": links,
"run_at": start.isoformat(),
}
results["saved_to"] = self.save(saveable)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="BharatGraph Full Pipeline (20 scrapers)")
parser.add_argument("--scrapers", type=str, default=None)
# M-08 FIX: --parallel was action=store_true,default=True making it always True
# --sequential now sets parallel=False, and --parallel is removed
parser.add_argument("--sequential", action="store_true", default=False,
help="Run scrapers sequentially (default: parallel)")
args = parser.parse_args()
scrapers = args.scrapers.split(",") if args.scrapers else None
parallel = not args.sequential
results = BharatGraphPipeline().run(scrapers=scrapers, parallel=parallel)
s = results["summary"]
print(f"\n{'='*55}\nPIPELINE SUMMARY\n{'='*55}")
print(f" Scrapers run: {len(s['scrapers_run'])}")
print(f" Duration: {s['duration_seconds']}s")
print(f" Total records: {s['total_raw_records']}")
for src, n in sorted(s["per_source"].items(), key=lambda x: -x[1]):
if n > 0:
print(f" {src:<20} {n}")
print(f" Saved to: {results.get('saved_to')}")
print("=" * 55)