bharatgraph / graph /loader.py
abinazebinoy's picture
fix(B-05 step3, NEW-1): add load_affidavits() to GraphLoader; modern fulltext in setup_schema()
6b02e4a
Raw
History Blame Contribute Delete
48.7 kB
"""
BharatGraph - Neo4j Graph Loader
Loads cleaned + resolved data from Phase 2 pipeline into Neo4j.
"""
import os, sys, json, hashlib, argparse
from datetime import datetime
from loguru import logger
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from graph.schema import SETUP_QUERIES, NODE_SCHEMAS
# BUG-19 FIX: upgraded from MD5 (16-char) to SHA-256 (20-char) for lower collision
# risk across large datasets.
def make_id(*parts) -> str:
"""Create a stable SHA-256 derived ID from multiple string parts."""
combined = "|".join(str(p).lower().strip() for p in parts)
return hashlib.sha256(combined.encode()).hexdigest()[:32]
class GraphLoader:
"""
Loads BharatGraph data into Neo4j.
BUG-21 FIX: __init__ now accepts an optional pre-built driver so admin.py
can pass the FastAPI dependency driver instead of opening a second connection.
"""
def __init__(self, dry_run: bool = False, driver=None):
self.dry_run = dry_run
self.driver = driver # BUG-21 FIX: accept injected driver
self.stats = {
"nodes_created": 0,
"nodes_merged": 0,
"rels_created": 0,
"errors": 0,
}
if dry_run:
logger.info("[Loader] DRY RUN mode -- no data written to Neo4j")
elif driver is None:
self._connect()
def _connect(self):
"""Connect to Neo4j AuraDB using credentials from .env"""
try:
from neo4j import GraphDatabase
from dotenv import load_dotenv
load_dotenv()
uri = os.getenv("NEO4J_URI", "")
user = os.getenv("NEO4J_USER", "neo4j")
pwd = os.getenv("NEO4J_PASSWORD", "")
if not pwd:
logger.error("[Loader] NEO4J_PASSWORD not set in .env")
raise ValueError("NEO4J_PASSWORD missing")
self.driver = GraphDatabase.driver(uri, auth=(user, pwd))
self.driver.verify_connectivity()
logger.success(f"[Loader] Connected to Neo4j: {uri[:40]}...")
except ImportError:
logger.error("[Loader] neo4j package not installed.")
raise
except Exception as e:
logger.error(f"[Loader] Connection failed: {e}")
raise
def setup_schema(self):
"""Create constraints and indexes defined in schema.py"""
if self.dry_run:
logger.info(f"[Loader] DRY RUN: would run {len(SETUP_QUERIES)} setup queries")
return
with self.driver.session() as session:
# BUG-3 FIX: fulltext index now covers all 20 node types (was 8).
try:
# NEW-1 FIX: modern CREATE FULLTEXT INDEX (Neo4j 4.3+ / AuraDB)
# CALL db.index.fulltext.createNodeIndex removed in Neo4j 5.x
session.run(
"CREATE FULLTEXT INDEX globalSearch IF NOT EXISTS "
"FOR (n:Politician|Company|Contract|AuditReport|Scheme|Ministry|"
" Party|PressRelease|NGO|ElectoralBond|InsolvencyOrder|Tender|"
" RegulatoryOrder|EnforcementAction|ParliamentQuestion|"
" VigilanceCircular|ICIJEntity|SanctionedEntity|"
" CourtCase|LocalBody|Affidavit) "
"ON EACH [n.name, n.title, n.aliases, n.item_desc, n.product, "
" n.buyer_org, n.cin, n.ministry, n.summary, "
" n.seller_name, n.ngo_name, n.purchaser_name, "
" n.redeemed_by, n.company_name, n.accused, "
" n.entity_name, n.subject, n.jurisdiction]"
)
logger.info("[Loader] Full-text index created/verified (21 types, modern syntax)")
except Exception as e:
logger.debug(f"[Loader] Full-text index note: {e}")
for query in SETUP_QUERIES:
try:
session.run(query)
except Exception as e:
logger.debug(f"[Loader] Schema query skipped (may exist): {e}")
logger.success("[Loader] Schema constraints and indexes ready")
def _run(self, query: str, params: dict = None, **kwargs):
"""
Execute a single Cypher query.
BUG-17 FIX: now accepts both calling styles:
- old style: self._run(query, {"key": val}) <- positional dict
- new style: self._run(query, key=val, ...) <- keyword args
Both are merged so existing callers and the 8 new loaders both work.
"""
if self.dry_run:
logger.debug(f"[Loader] DRY RUN query: {query[:80]}...")
return None
# Merge positional dict with any keyword args
all_params = {**(params or {}), **kwargs}
try:
with self.driver.session() as session:
return session.run(query, all_params)
except Exception as e:
logger.error(f"[Loader] Query failed: {e}\nQuery: {query[:100]}")
self.stats["errors"] += 1
return None
# ?? Node loaders ??????????????????????????????????????
def load_politicians(self, records: list) -> int:
count = 0
for r in records:
name = r.get("name", "").strip()
if not name:
continue
node_id = make_id(name, r.get("state", ""), r.get("source_election", ""))
query = """
MERGE (p:Politician {id: $id})
SET p.name = $name,
p.name_raw = $name_raw,
p.party = $party,
p.state = $state,
p.election = $election,
p.total_assets = $total_assets,
p.liabilities = $liabilities,
p.criminal_cases = $criminal_cases,
p.education = $education,
p.source = $source,
p.scraped_at = $scraped_at
"""
params = {
"id": node_id,
"name": name,
"name_raw": r.get("name_raw", name),
"party": r.get("party", ""),
"state": r.get("state", ""),
"election": r.get("source_election", ""),
"total_assets": str(r.get("total_assets", "")),
"liabilities": str(r.get("liabilities", "")),
"criminal_cases":str(r.get("criminal_case_count", "0")),
"education": r.get("education", ""),
"source": r.get("_source", "myneta"),
"scraped_at": r.get("scraped_at", datetime.now().isoformat()),
}
self._run(query, params)
count += 1
party = r.get("party", "").strip()
if party:
self._create_party_and_link(node_id, party)
logger.success(f"[Loader] Politicians loaded: {count}")
self.stats["nodes_created"] += count
return count
def _create_party_and_link(self, politician_id: str, party_name: str):
party_id = make_id(party_name)
query = """
MERGE (party:Party {id: $party_id})
SET party.name = $party_name
WITH party
MATCH (p:Politician {id: $pol_id})
MERGE (p)-[:MEMBER_OF]->(party)
"""
self._run(query, {
"party_id": party_id,
"party_name": party_name,
"pol_id": politician_id,
})
def load_companies(self, records: list) -> int:
count = 0
for r in records:
name = r.get("name", "").strip()
if not name:
continue
node_id = r.get("cin", make_id(name, r.get("state", "")))
query = """
MERGE (c:Company {id: $id})
SET c.name = $name,
c.name_raw = $name_raw,
c.cin = $cin,
c.status = $status,
c.state = $state,
c.registration_date = $reg_date,
c.company_class = $company_class,
c.source = $source,
c.scraped_at = $scraped_at
"""
params = {
"id": node_id,
"name": name,
"name_raw": r.get("name_raw", name),
"cin": r.get("cin", ""),
"status": r.get("status", ""),
"state": r.get("state", ""),
"reg_date": r.get("registration_date", ""),
"company_class":r.get("company_class", ""),
"source": r.get("_source", "mca"),
"scraped_at": r.get("scraped_at", datetime.now().isoformat()),
}
self._run(query, params)
count += 1
logger.success(f"[Loader] Companies loaded: {count}")
self.stats["nodes_created"] += count
return count
def load_contracts(self, records: list) -> int:
count = 0
for r in records:
order_id = r.get("order_id", "").strip()
seller = r.get("seller_name", r.get("seller_name_raw","")).strip()
if not order_id:
continue
node_id = make_id(order_id)
query = """
MERGE (c:Contract {id: $id})
SET c.order_id = $order_id,
c.seller_name = $seller,
c.buyer_org = $buyer,
c.product = $product,
c.amount_crore = $amount,
c.order_date = $date,
c.state = $state,
c.source = $source,
c.scraped_at = $scraped_at
"""
params = {
"id": node_id,
"order_id": order_id,
"seller": seller,
"buyer": r.get("buyer_org", ""),
"product": r.get("product", ""),
"amount": float(r.get("amount_crore") or (r.get("amount_inr",0)/1e7) or 0),
"date": r.get("order_date", ""),
"state": r.get("state", ""),
"source": r.get("_source", "gem"),
"scraped_at": r.get("scraped_at", datetime.now().isoformat()),
}
self._run(query, params)
count += 1
if seller:
self._link_company_to_contract(seller, node_id, float(r.get("amount_crore", 0) or 0))
buyer = r.get("buyer_org", "").strip()
if buyer:
self._link_contract_to_ministry(node_id, buyer)
logger.success(f"[Loader] Contracts loaded: {count}")
self.stats["nodes_created"] += count
return count
def _link_company_to_contract(self, seller_name, contract_id, amount):
company_id = make_id(seller_name)
self._run("""
MERGE (co:Company {id: $co_id})
SET co.name = $seller_name
WITH co
MATCH (c:Contract {id: $contract_id})
MERGE (co)-[r:WON_CONTRACT]->(c)
SET r.amount_crore = $amount
""", {"co_id": company_id, "seller_name": seller_name,
"contract_id": contract_id, "amount": amount})
self.stats["rels_created"] += 1
def _link_contract_to_ministry(self, contract_id, ministry_name):
ministry_id = make_id(ministry_name)
self._run("""
MERGE (m:Ministry {id: $m_id})
SET m.name = $m_name
WITH m
MATCH (c:Contract {id: $contract_id})
MERGE (c)-[:AWARDED_BY]->(m)
""", {"m_id": ministry_id, "m_name": ministry_name, "contract_id": contract_id})
self.stats["rels_created"] += 1
def load_audit_reports(self, records: list) -> int:
count = 0
for r in records:
title = r.get("title", "").strip()
if not title:
continue
node_id = make_id(title, r.get("url", ""))
params = {
"id": node_id,
"title": title,
"url": r.get("url", ""),
"year": str(r.get("year", "")),
"state": r.get("state", ""),
"scheme": r.get("scheme", ""),
"amount": float(r.get("amount_crore") or (r.get("amount_inr",0)/1e7) or 0),
"irr_type": r.get("irregularity_type", ""),
"finding": r.get("finding", ""),
"keywords": str(r.get("alert_keywords", [])),
"source": r.get("_source", "cag"),
"scraped_at": r.get("scraped_at", datetime.now().isoformat()),
}
self._run("""
MERGE (a:AuditReport {id: $id})
SET a.title=$title, a.url=$url, a.year=$year, a.state=$state,
a.scheme=$scheme, a.amount_crore=$amount,
a.irregularity_type=$irr_type, a.finding=$finding,
a.alert_keywords=$keywords, a.source=$source, a.scraped_at=$scraped_at
""", params)
count += 1
scheme = r.get("scheme", "").strip()
if scheme:
self._link_audit_to_scheme(node_id, scheme, float(r.get("amount_crore", 0) or 0))
logger.success(f"[Loader] Audit reports loaded: {count}")
self.stats["nodes_created"] += count
return count
def _link_audit_to_scheme(self, audit_id, scheme_name, amount):
scheme_id = make_id(scheme_name)
self._run("""
MERGE (s:Scheme {id: $s_id})
SET s.name = $s_name
WITH s
MATCH (a:AuditReport {id: $audit_id})
MERGE (a)-[r:FLAGS]->(s)
SET r.amount_crore = $amount
""", {"s_id": scheme_id, "s_name": scheme_name, "audit_id": audit_id, "amount": amount})
self.stats["rels_created"] += 1
def load_press_releases(self, records: list) -> int:
count = 0
for r in records:
title = r.get("title", "").strip()
if not title:
continue
node_id = make_id(title, r.get("link", ""))
self._run("""
MERGE (pr:PressRelease {id: $id})
SET pr.title=$title, pr.link=$link, pr.published=$published,
pr.alert_keywords=$keywords, pr.source=$source, pr.scraped_at=$scraped_at
""", {
"id": node_id, "title": title, "link": r.get("link", ""),
"published": r.get("published", ""),
"keywords": str(r.get("alert_keywords", [])),
"source": r.get("_source", "pib"),
"scraped_at":r.get("scraped_at", datetime.now().isoformat()),
})
count += 1
logger.success(f"[Loader] Press releases loaded: {count}")
self.stats["nodes_created"] += count
return count
def load_politician_company_links(self, matches: list) -> int:
count = 0
for match in matches:
pol_name = match.get("name_a", match.get("politician_name", "")).strip()
co_name = match.get("name_b", match.get("company_name", "")).strip()
score = match.get("score", 0.0)
if not pol_name or not co_name:
continue
link_query = """
OPTIONAL MATCH (existing_p:Politician)
WHERE toLower(existing_p.name) = toLower($pol_name)
WITH coalesce(existing_p.id, $pol_id_fallback) AS pol_id
MERGE (p:Politician {id: pol_id})
SET p.name = coalesce(p.name, $pol_name)
WITH p
OPTIONAL MATCH (existing_c:Company)
WHERE toLower(existing_c.name) = toLower($co_name)
WITH p, coalesce(existing_c.id, $co_id_fallback) AS co_id
MERGE (c:Company {id: co_id})
SET c.name = coalesce(c.name, $co_name)
MERGE (p)-[r:DIRECTOR_OF]->(c)
SET r.confidence = $score,
r.source = 'entity_resolution',
r.detected_at = $now
"""
try:
self._run(link_query, {
"pol_name": pol_name,
"pol_id_fallback": make_id(pol_name),
"co_name": co_name,
"co_id_fallback": make_id(co_name),
"score": score,
"now": datetime.now().isoformat(),
})
count += 1
self.stats["rels_created"] += 1
except Exception as e:
logger.warning(f"[Loader] DIRECTOR_OF link {pol_name}->{co_name} failed: {e}")
logger.info(f"[Loader] DIRECTOR_OF links created/updated: {count}")
return count
def load_affidavits(self, records: list) -> int:
"""B-05 FIX: was missing -- affidavit data scraped but never written to graph.
Investigators query (p:Politician)-[:FILED_AFFIDAVIT]->(a:Affidavit)
but zero Affidavit nodes existed. All affidavit queries returned empty.
"""
count = 0
for r in records:
pol_id = (r.get("politician_id") or "").strip()
year = r.get("year")
if not pol_id or not year:
continue
aff_id = make_id(pol_id, str(year))
result = self._run(
"""
MERGE (a:Affidavit {id: })
SET a.politician_id = ,
a.year = ,
a.total_assets_crore = ,
a.movable_assets_crore = ,
a.liabilities_crore = ,
a.criminal_cases = ,
a.source = ,
a.scraped_at =
WITH a
MATCH (p:Politician {id: })
MERGE (p)-[:FILED_AFFIDAVIT]->(a)
""",
{
"id": aff_id,
"politician_id": pol_id,
"year": int(year),
"total_assets_crore": float(r.get("total_assets_crore") or 0),
"movable_assets_crore": float(r.get("movable_assets_crore") or 0),
"liabilities_crore": float(r.get("liabilities_crore") or 0),
"criminal_cases": int(r.get("criminal_cases") or 0),
"source": r.get("source", "myneta"),
"scraped_at": r.get(
"scraped_at",
__import__("datetime").datetime.now().isoformat()
),
}
)
if result is not None:
count += 1
self.stats["nodes_created"] += 1
logger.info(f"[Loader] Affidavits loaded: {count}")
return count
def load_from_pipeline_output(self, filepath: str) -> dict:
"""Load everything from a pipeline JSON output file."""
logger.info(f"[Loader] Loading from: {filepath}")
with open(filepath, encoding="utf-8") as f:
data = json.load(f)
raw = data.get("raw", {})
links = data.get("links", [])
self.setup_schema()
results = {}
if raw.get("myneta"): results["politicians"] = self.load_politicians(raw["myneta"])
if raw.get("mca"): results["companies"] = self.load_companies(raw["mca"])
if raw.get("gem"): results["contracts"] = self.load_contracts(raw["gem"])
if raw.get("cag"): results["audit_reports"] = self.load_audit_reports(raw["cag"])
if raw.get("pib"): results["press_releases"] = self.load_press_releases(raw["pib"])
if links: results["director_of_links"] = self.load_politician_company_links(links)
if raw.get("sebi"): results["regulatory_orders"] = self.load_regulatory_orders(raw["sebi"])
if raw.get("ed"): results["enforcement_actions"] = self.load_enforcement_actions(raw["ed"])
if raw.get("electoral_bond"):results["electoral_bonds"] = self.load_electoral_bonds(raw["electoral_bond"])
if raw.get("ibbi"): results["insolvency_orders"] = self.load_insolvency_orders(raw["ibbi"])
if raw.get("ngo_darpan"): results["ngos"] = self.load_ngos(raw["ngo_darpan"])
if raw.get("cppp"): results["tenders"] = self.load_tenders(raw["cppp"])
if raw.get("loksabha"): results["parliament_questions"]= self.load_parliament_questions(raw["loksabha"])
if raw.get("cvc"): results["vigilance_circulars"] = self.load_vigilance_circulars(raw["cvc"])
# BUG-2 FIX: 7 previously missing loaders -- these datasets were scraped but
# silently dropped because load_from_pipeline_output never called them.
if raw.get("icij"): results["icij_entities"] = self.load_icij_entities(raw["icij"])
if raw.get("opensanctions"):results["sanctioned_entities"] = self.load_sanctioned_entities(raw["opensanctions"])
if raw.get("njdg"): results["court_cases"] = self.load_court_cases(raw["njdg"])
if raw.get("lgd"): results["local_bodies"] = self.load_local_bodies(raw["lgd"])
if raw.get("ncrb"): results["crime_reports"] = self.load_crime_reports(raw["ncrb"])
if raw.get("wikidata"): results["wikidata_enrichments"]= self.load_wikidata_enrichments(raw["wikidata"])
if raw.get("datagov"): results["datagov_documents"] = self.load_datagov_documents(raw["datagov"])
logger.success(f"[Loader] Load complete. Stats: {self.stats}")
return {**results, "stats": self.stats}
# ?? Phase 28 loaders (8 datasets) ????????????????????????????????????????
def load_regulatory_orders(self, records: list) -> int:
count = 0
for r in records:
title = (r.get("title") or "").strip()
if not title: continue
node_id = make_id(title)
try:
self._run("""
MERGE (n:RegulatoryOrder {id:$id})
SET n.title=$title, n.url=$url, n.order_type=$order_type,
n.entity_name=$entity_name, n.violation=$violation,
n.source=$source, n.dataset='sebi', n.scraped_at=$scraped_at
""", {
"id": node_id, "title": title, "url": r.get("url",""),
"order_type": r.get("order_type",""),
"entity_name": r.get("entity_name", r.get("accused","")),
"violation": r.get("violation",""),
"source": r.get("source","SEBI"),
"scraped_at": r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] RegulatoryOrder failed: {e}")
self.stats["regulatory_orders"] = count
logger.success(f"[Loader] Loaded {count} SEBI regulatory orders")
return count
def load_enforcement_actions(self, records: list) -> int:
count = 0
for r in records:
title = (r.get("title") or "").strip()
if not title: continue
node_id = make_id(title)
try:
self._run("""
MERGE (n:EnforcementAction {id:$id})
SET n.title=$title, n.url=$url, n.date=$date,
n.amount_crore=$amount_crore, n.case_type=$case_type,
n.accused=$accused, n.source=$source,
n.dataset='ed', n.scraped_at=$scraped_at
""", {
"id": node_id, "title": title, "url": r.get("url",""),
"date": r.get("date",""),
"amount_crore": float(r.get("amount_crore",0) or 0),
"case_type": r.get("case_type",""),
"accused": r.get("accused",""),
"source": r.get("source","ED"),
"scraped_at": r.get("scraped_at",""),
})
count += 1
accused = r.get("accused","").strip()
if accused:
self._run("""
MATCH (p:Politician)
WHERE toLower(p.name) CONTAINS toLower($accused)
MATCH (n:EnforcementAction {id:$id})
MERGE (p)-[:SUBJECT_OF]->(n)
""", {"accused": accused, "id": node_id})
except Exception as e:
logger.warning(f"[Loader] EnforcementAction failed: {e}")
self.stats["enforcement_actions"] = count
logger.success(f"[Loader] Loaded {count} ED enforcement actions")
return count
def load_electoral_bonds(self, records: list) -> int:
count = 0
for r in records:
purchaser = (r.get("purchaser_name") or "").strip()
if not purchaser: continue
node_id = make_id(f"bond_{r.get('bond_number','')}_{purchaser}")
try:
self._run("""
MERGE (n:ElectoralBond {id:$id})
SET n.bond_number=$bond_no, n.purchaser_name=$purchaser,
n.denomination_crore=$denom, n.purchase_date=$p_date,
n.redemption_date=$r_date, n.redeemed_by=$redeemer,
n.source=$source, n.dataset='electoral_bond',
n.scraped_at=$scraped_at
""", {
"id": node_id, "bond_no": r.get("bond_number",""),
"purchaser": purchaser,
"denom": float(r.get("denomination_crore",0) or 0),
"p_date": r.get("purchase_date",""),
"r_date": r.get("redemption_date",""),
"redeemer": r.get("redeemed_by",""),
"source": r.get("source","ECI"),
"scraped_at":r.get("scraped_at",""),
})
count += 1
redeemer = r.get("redeemed_by","").strip()
if purchaser and redeemer:
self._run("""
MERGE (c:Company {id:$co_id})
ON CREATE SET c.name=$purchaser, c.source='electoral_bond'
MERGE (p:Party {id:$pt_id})
ON CREATE SET p.name=$redeemer, p.source='electoral_bond'
MATCH (b:ElectoralBond {id:$b_id})
MERGE (c)-[:DONATED_VIA]->(b)-[:REDEEMED_BY]->(p)
""", {
"co_id": make_id(purchaser), "purchaser": purchaser,
"pt_id": make_id(redeemer), "redeemer": redeemer,
"b_id": node_id,
})
except Exception as e:
logger.warning(f"[Loader] ElectoralBond failed: {e}")
self.stats["electoral_bonds"] = count
logger.success(f"[Loader] Loaded {count} electoral bonds")
return count
def load_insolvency_orders(self, records: list) -> int:
count = 0
for r in records:
company = (r.get("company_name") or "").strip()
if not company: continue
node_id = make_id(f"ibbi_{company}")
try:
self._run("""
MERGE (n:InsolvencyOrder {id:$id})
SET n.company_name=$company, n.cin=$cin,
n.process_type=$process_type, n.admitted_date=$admitted_date,
n.status=$status, n.admitted_claims=$claims,
n.source=$source, n.dataset='ibbi', n.scraped_at=$scraped_at
""", {
"id": node_id, "company": company, "cin": r.get("cin",""),
"process_type": r.get("process_type",""),
"admitted_date": r.get("admitted_date",""),
"status": r.get("status",""),
"claims": float(r.get("admitted_claims",0) or 0),
"source": r.get("source","IBBI"),
"scraped_at": r.get("scraped_at",""),
})
count += 1
self._run("""
MATCH (c:Company)
WHERE toLower(c.name) CONTAINS toLower($company)
MATCH (n:InsolvencyOrder {id:$id})
MERGE (c)-[:HAS_INSOLVENCY]->(n)
""", {"company": company, "id": node_id})
except Exception as e:
logger.warning(f"[Loader] InsolvencyOrder failed: {e}")
self.stats["insolvency_orders"] = count
logger.success(f"[Loader] Loaded {count} IBBI insolvency orders")
return count
def load_ngos(self, records: list) -> int:
count = 0
for r in records:
name = (r.get("ngo_name") or "").strip()
if not name: continue
node_id = make_id(f"ngo_{r.get('darpan_id',name)}")
try:
self._run("""
MERGE (n:NGO {id:$id})
SET n.ngo_name=$name, n.darpan_id=$darpan_id,
n.state=$state, n.district=$district,
n.registration_type=$reg_type, n.year_of_reg=$year,
n.key_issues=$issues, n.csr_receipts=$csr,
n.govt_grants=$grants, n.source=$source,
n.dataset='ngo_darpan', n.scraped_at=$scraped_at
""", {
"id": node_id, "name": name,
"darpan_id": r.get("darpan_id",""),
"state": r.get("state",""),
"district": r.get("district",""),
"reg_type": r.get("registration_type",""),
"year": str(r.get("year_of_reg","")),
"issues": str(r.get("key_issues",[])),
"csr": float(r.get("csr_receipts",0) or 0),
"grants": float(r.get("govt_grants",0) or 0),
"source": r.get("source","NGO Darpan"),
"scraped_at":r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] NGO failed: {e}")
self.stats["ngos"] = count
logger.success(f"[Loader] Loaded {count} NGOs")
return count
def load_tenders(self, records: list) -> int:
count = 0
for r in records:
title = (r.get("title") or "").strip()
if not title: continue
node_id = make_id(f"tender_{r.get('tender_id',title)}")
try:
self._run("""
MERGE (n:Tender {id:$id})
SET n.tender_id=$tender_id, n.title=$title,
n.ministry=$ministry, n.department=$dept,
n.estimated_crore=$value, n.tender_type=$t_type,
n.bid_end_date=$bid_end, n.status=$status,
n.awarded_to=$awarded_to, n.awarded_value=$awarded_val,
n.single_bid=$single_bid, n.source=$source,
n.dataset='cppp', n.scraped_at=$scraped_at
""", {
"id": node_id, "tender_id": r.get("tender_id",""),
"title": title,
"ministry": r.get("ministry",""),
"dept": r.get("department",""),
"value": float(r.get("estimated_crore",0) or 0),
"t_type": r.get("tender_type",""),
"bid_end": r.get("bid_submission_end",""),
"status": r.get("status",""),
"awarded_to": r.get("awarded_to",""),
"awarded_val":float(r.get("awarded_value",0) or 0),
"single_bid": bool(r.get("single_bid",False)),
"source": r.get("source","CPPP"),
"scraped_at": r.get("scraped_at",""),
})
count += 1
ministry = r.get("ministry","").strip()
if ministry:
self._run("""
MERGE (m:Ministry {id:$m_id})
ON CREATE SET m.name=$ministry, m.source='cppp'
MATCH (t:Tender {id:$t_id})
MERGE (m)-[:ISSUED_TENDER]->(t)
""", {"m_id": make_id(ministry), "ministry": ministry, "t_id": node_id})
if r.get("single_bid"):
self._run("MATCH (t:Tender {id:$id}) SET t.risk_flag='SINGLE_BID'",
{"id": node_id})
except Exception as e:
logger.warning(f"[Loader] Tender failed: {e}")
self.stats["tenders"] = count
logger.success(f"[Loader] Loaded {count} CPPP tenders")
return count
def load_parliament_questions(self, records: list) -> int:
count = 0
for r in records:
subject = (r.get("subject") or "").strip()
if not subject: continue
node_id = make_id(f"pq_{r.get('question_number','')}_{r.get('member_name','')}")
try:
self._run("""
MERGE (n:ParliamentQuestion {id:$id})
SET n.subject=$subject, n.question_number=$qno,
n.member_name=$member, n.ministry=$ministry,
n.session=$session, n.source=$source,
n.dataset='loksabha', n.scraped_at=$scraped_at
""", {
"id": node_id, "subject": subject,
"qno": r.get("question_number",""),
"member": r.get("member_name",""),
"ministry": r.get("ministry",""),
"session": r.get("session",""),
"source": r.get("source","Lok Sabha"),
"scraped_at":r.get("scraped_at",""),
})
count += 1
member = r.get("member_name","").strip()
if member:
self._run("""
MATCH (p:Politician)
WHERE toLower(p.name) CONTAINS toLower($member)
MATCH (q:ParliamentQuestion {id:$id})
MERGE (p)-[:ASKED_QUESTION]->(q)
""", {"member": member, "id": node_id})
except Exception as e:
logger.warning(f"[Loader] ParliamentQuestion failed: {e}")
self.stats["parliament_questions"] = count
logger.success(f"[Loader] Loaded {count} Lok Sabha questions")
return count
def load_vigilance_circulars(self, records: list) -> int:
count = 0
for r in records:
title = (r.get("title") or "").strip()
if not title: continue
node_id = make_id(f"cvc_{r.get('circular_number',title)}")
try:
self._run("""
MERGE (n:VigilanceCircular {id:$id})
SET n.title=$title, n.circular_number=$cno,
n.date=$date, n.ministry=$ministry,
n.subject=$subject, n.source=$source,
n.dataset='cvc', n.scraped_at=$scraped_at
""", {
"id": node_id, "title": title,
"cno": r.get("circular_number",""),
"date": r.get("date",""),
"ministry": r.get("ministry",""),
"subject": r.get("subject",""),
"source": r.get("source","CVC"),
"scraped_at":r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] VigilanceCircular failed: {e}")
self.stats["vigilance_circulars"] = count
logger.success(f"[Loader] Loaded {count} CVC circulars")
return count
# ?? BUG-2 FIX: 7 NEW loaders -- were scraped but never loaded ?????????????
def load_icij_entities(self, records: list) -> int:
"""ICIJ Offshore Leaks entities -> ICIJEntity nodes."""
count = 0
for r in records:
name = (r.get("name") or r.get("entity_name") or "").strip()
if not name: continue
node_id = make_id(f"icij_{r.get('node_id', name)}")
try:
self._run("""
MERGE (n:ICIJEntity {id:$id})
SET n.name=$name, n.entity_type=$entity_type,
n.jurisdiction=$jurisdiction, n.linked_to=$linked_to,
n.dataset_name=$dataset_name, n.source='ICIJ',
n.dataset='icij', n.scraped_at=$scraped_at
""", {
"id": node_id, "name": name,
"entity_type": r.get("entity_type", r.get("type","")),
"jurisdiction": r.get("jurisdiction",""),
"linked_to": str(r.get("linked_to",[])),
"dataset_name": r.get("sourceID", r.get("dataset_name","")),
"scraped_at": r.get("scraped_at",""),
})
count += 1
# Cross-link to Indian politicians/companies by name similarity
self._run("""
MATCH (p:Politician)
WHERE toLower(p.name) CONTAINS toLower($name)
MATCH (n:ICIJEntity {id:$id})
MERGE (p)-[:ASSOCIATED_WITH]->(n)
""", {"name": name, "id": node_id})
except Exception as e:
logger.warning(f"[Loader] ICIJEntity failed: {e}")
self.stats["icij_entities"] = count
logger.success(f"[Loader] Loaded {count} ICIJ entities")
return count
def load_sanctioned_entities(self, records: list) -> int:
"""OpenSanctions -> SanctionedEntity nodes."""
count = 0
for r in records:
name = (r.get("name") or r.get("caption") or "").strip()
if not name: continue
node_id = make_id(f"sanction_{r.get('id', name)}")
try:
self._run("""
MERGE (n:SanctionedEntity {id:$id})
SET n.name=$name, n.schema_type=$schema_type,
n.topics=$topics, n.countries=$countries,
n.sanctions_list=$sanctions_list,
n.source='OpenSanctions', n.dataset='opensanctions',
n.scraped_at=$scraped_at
""", {
"id": node_id, "name": name,
"schema_type": r.get("schema",""),
"topics": str(r.get("topics",[])),
"countries": str(r.get("countries",[])),
"sanctions_list":r.get("datasets",""),
"scraped_at": r.get("scraped_at",""),
})
count += 1
self._run("""
MATCH (p:Politician)
WHERE toLower(p.name) CONTAINS toLower($name)
MATCH (n:SanctionedEntity {id:$id})
MERGE (p)-[:ASSOCIATED_WITH]->(n)
""", {"name": name, "id": node_id})
except Exception as e:
logger.warning(f"[Loader] SanctionedEntity failed: {e}")
self.stats["sanctioned_entities"] = count
logger.success(f"[Loader] Loaded {count} sanctioned entities")
return count
def load_court_cases(self, records: list) -> int:
"""NJDG court pendency stats -> CourtCase nodes."""
count = 0
for r in records:
court = (r.get("court_name") or r.get("state") or "").strip()
if not court: continue
node_id = make_id(f"njdg_{court}_{r.get('year','')}")
try:
self._run("""
MERGE (n:CourtCase {id:$id})
SET n.court_name=$court, n.state=$state,
n.pending_cases=$pending, n.disposed_cases=$disposed,
n.year=$year, n.source='NJDG',
n.dataset='njdg', n.scraped_at=$scraped_at
""", {
"id": node_id, "court": court,
"state": r.get("state",""),
"pending": int(r.get("pending_cases",0) or 0),
"disposed": int(r.get("disposed_cases",0) or 0),
"year": str(r.get("year","")),
"scraped_at":r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] CourtCase failed: {e}")
self.stats["court_cases"] = count
logger.success(f"[Loader] Loaded {count} court case records")
return count
def load_local_bodies(self, records: list) -> int:
"""LGD (Local Government Directory) -> LocalBody nodes."""
count = 0
for r in records:
name = (r.get("name") or r.get("state_name") or "").strip()
if not name: continue
node_id = make_id(f"lgd_{r.get('lgd_code',name)}")
try:
self._run("""
MERGE (n:LocalBody {id:$id})
SET n.name=$name, n.lgd_code=$lgd_code,
n.entity_type=$entity_type, n.state=$state,
n.district=$district, n.source='LGD',
n.dataset='lgd', n.scraped_at=$scraped_at
""", {
"id": node_id, "name": name,
"lgd_code": str(r.get("lgd_code","")),
"entity_type":r.get("entity_type", r.get("type","")),
"state": r.get("state",""),
"district": r.get("district",""),
"scraped_at": r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] LocalBody failed: {e}")
self.stats["local_bodies"] = count
logger.success(f"[Loader] Loaded {count} local bodies")
return count
def load_crime_reports(self, records: list) -> int:
"""NCRB crime statistics -- stored as metadata nodes for context."""
count = 0
for r in records:
state = (r.get("state") or "").strip()
year = str(r.get("year","")).strip()
if not state or not year: continue
node_id = make_id(f"ncrb_{state}_{year}")
try:
self._run("""
MERGE (n:CrimeReport {id:$id})
SET n.state=$state, n.year=$year,
n.crime_head=$crime_head,
n.cases_registered=$cases,
n.source='NCRB', n.dataset='ncrb',
n.scraped_at=$scraped_at
""", {
"id": node_id, "state": state, "year": year,
"crime_head": r.get("crime_head",""),
"cases": int(r.get("cases_registered",0) or 0),
"scraped_at": r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] CrimeReport failed: {e}")
self.stats["crime_reports"] = count
logger.success(f"[Loader] Loaded {count} NCRB crime reports")
return count
def load_wikidata_enrichments(self, records: list) -> int:
"""
Wikidata -- enriches EXISTING Politician nodes; does NOT create new ones.
Uses MATCH not MERGE to avoid phantom nodes.
"""
count = 0
for r in records:
name = (r.get("name") or "").strip()
if not name: continue
try:
result = self._run("""
MATCH (p:Politician)
WHERE toLower(p.name) CONTAINS toLower($name) OR toLower($name) CONTAINS toLower(p.name)
OR any(a IN coalesce(p.aliases,[]) WHERE toLower(a) = toLower($name))
SET p.wikidata_id = coalesce($wikidata_id, p.wikidata_id),
p.birth_year = coalesce($birth_year, p.birth_year),
p.gender = coalesce($gender, p.gender),
p.positions = coalesce($positions, p.positions),
p.wikidata_enriched = true
RETURN count(p) AS updated
""", {
"name": name,
"wikidata_id":r.get("wikidata_id",""),
"birth_year": str(r.get("birth_year","")),
"gender": r.get("gender",""),
"positions": str(r.get("positions",[])),
})
if result:
row = result.single()
if row and row["updated"] > 0:
count += 1
except Exception as e:
logger.warning(f"[Loader] Wikidata enrichment failed for {name}: {e}")
self.stats["wikidata_enrichments"] = count
logger.success(f"[Loader] Wikidata enriched {count} existing Politician nodes")
return count
def load_datagov_documents(self, records: list) -> int:
"""data.gov.in datasets -- generic document nodes."""
count = 0
for r in records:
title = (r.get("title") or r.get("resource_title") or "").strip()
if not title: continue
node_id = make_id(f"datagov_{r.get('resource_id',title)}")
try:
self._run("""
MERGE (n:DataGovDocument {id:$id})
SET n.title=$title, n.dataset_name=$dataset_name,
n.resource_id=$resource_id,
n.ministry=$ministry,
n.source='data.gov.in', n.dataset='datagov',
n.scraped_at=$scraped_at
""", {
"id": node_id, "title": title,
"dataset_name":r.get("_dataset",""),
"resource_id": r.get("resource_id",""),
"ministry": r.get("ministry",""),
"scraped_at": r.get("scraped_at",""),
})
count += 1
except Exception as e:
logger.warning(f"[Loader] DataGovDocument failed: {e}")
self.stats["datagov_documents"] = count
logger.success(f"[Loader] Loaded {count} DataGov documents")
return count
def close(self):
if self.driver:
self.driver.close()
logger.info("[Loader] Neo4j connection closed")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="BharatGraph Graph Loader")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--file", type=str, default=None)
args = parser.parse_args()
print("=" * 55)
print("BharatGraph - Graph Loader")
print("=" * 55)
loader = GraphLoader(dry_run=args.dry_run)
if args.file:
results = loader.load_from_pipeline_output(args.file)
else:
import glob
files = sorted(glob.glob("data/processed/pipeline_*.json"))
if files:
results = loader.load_from_pipeline_output(files[-1])
else:
print("\nNo pipeline output found. Run: python -m processing.pipeline")
results = {}
loader.close()
s = loader.stats
print(f"\n{'='*55}\nLOAD SUMMARY\n{'='*55}")
print(f" Nodes created: {s['nodes_created']}")
print(f" Nodes merged: {s['nodes_merged']}")
print(f" Relationships: {s['rels_created']}")
print(f" Errors: {s['errors']}")
if args.dry_run:
print(" (DRY RUN -- nothing was written to Neo4j)")
print("=" * 55)