Spaces:
Running
Running
fix(B-05 step3, NEW-1): add load_affidavits() to GraphLoader; modern fulltext in setup_schema()
6b02e4a | """ | |
| BharatGraph - Neo4j Graph Loader | |
| Loads cleaned + resolved data from Phase 2 pipeline into Neo4j. | |
| """ | |
| import os, sys, json, hashlib, argparse | |
| from datetime import datetime | |
| from loguru import logger | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from graph.schema import SETUP_QUERIES, NODE_SCHEMAS | |
| # BUG-19 FIX: upgraded from MD5 (16-char) to SHA-256 (20-char) for lower collision | |
| # risk across large datasets. | |
| def make_id(*parts) -> str: | |
| """Create a stable SHA-256 derived ID from multiple string parts.""" | |
| combined = "|".join(str(p).lower().strip() for p in parts) | |
| return hashlib.sha256(combined.encode()).hexdigest()[:32] | |
| class GraphLoader: | |
| """ | |
| Loads BharatGraph data into Neo4j. | |
| BUG-21 FIX: __init__ now accepts an optional pre-built driver so admin.py | |
| can pass the FastAPI dependency driver instead of opening a second connection. | |
| """ | |
| def __init__(self, dry_run: bool = False, driver=None): | |
| self.dry_run = dry_run | |
| self.driver = driver # BUG-21 FIX: accept injected driver | |
| self.stats = { | |
| "nodes_created": 0, | |
| "nodes_merged": 0, | |
| "rels_created": 0, | |
| "errors": 0, | |
| } | |
| if dry_run: | |
| logger.info("[Loader] DRY RUN mode -- no data written to Neo4j") | |
| elif driver is None: | |
| self._connect() | |
| def _connect(self): | |
| """Connect to Neo4j AuraDB using credentials from .env""" | |
| try: | |
| from neo4j import GraphDatabase | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| uri = os.getenv("NEO4J_URI", "") | |
| user = os.getenv("NEO4J_USER", "neo4j") | |
| pwd = os.getenv("NEO4J_PASSWORD", "") | |
| if not pwd: | |
| logger.error("[Loader] NEO4J_PASSWORD not set in .env") | |
| raise ValueError("NEO4J_PASSWORD missing") | |
| self.driver = GraphDatabase.driver(uri, auth=(user, pwd)) | |
| self.driver.verify_connectivity() | |
| logger.success(f"[Loader] Connected to Neo4j: {uri[:40]}...") | |
| except ImportError: | |
| logger.error("[Loader] neo4j package not installed.") | |
| raise | |
| except Exception as e: | |
| logger.error(f"[Loader] Connection failed: {e}") | |
| raise | |
| def setup_schema(self): | |
| """Create constraints and indexes defined in schema.py""" | |
| if self.dry_run: | |
| logger.info(f"[Loader] DRY RUN: would run {len(SETUP_QUERIES)} setup queries") | |
| return | |
| with self.driver.session() as session: | |
| # BUG-3 FIX: fulltext index now covers all 20 node types (was 8). | |
| try: | |
| # NEW-1 FIX: modern CREATE FULLTEXT INDEX (Neo4j 4.3+ / AuraDB) | |
| # CALL db.index.fulltext.createNodeIndex removed in Neo4j 5.x | |
| session.run( | |
| "CREATE FULLTEXT INDEX globalSearch IF NOT EXISTS " | |
| "FOR (n:Politician|Company|Contract|AuditReport|Scheme|Ministry|" | |
| " Party|PressRelease|NGO|ElectoralBond|InsolvencyOrder|Tender|" | |
| " RegulatoryOrder|EnforcementAction|ParliamentQuestion|" | |
| " VigilanceCircular|ICIJEntity|SanctionedEntity|" | |
| " CourtCase|LocalBody|Affidavit) " | |
| "ON EACH [n.name, n.title, n.aliases, n.item_desc, n.product, " | |
| " n.buyer_org, n.cin, n.ministry, n.summary, " | |
| " n.seller_name, n.ngo_name, n.purchaser_name, " | |
| " n.redeemed_by, n.company_name, n.accused, " | |
| " n.entity_name, n.subject, n.jurisdiction]" | |
| ) | |
| logger.info("[Loader] Full-text index created/verified (21 types, modern syntax)") | |
| except Exception as e: | |
| logger.debug(f"[Loader] Full-text index note: {e}") | |
| for query in SETUP_QUERIES: | |
| try: | |
| session.run(query) | |
| except Exception as e: | |
| logger.debug(f"[Loader] Schema query skipped (may exist): {e}") | |
| logger.success("[Loader] Schema constraints and indexes ready") | |
| def _run(self, query: str, params: dict = None, **kwargs): | |
| """ | |
| Execute a single Cypher query. | |
| BUG-17 FIX: now accepts both calling styles: | |
| - old style: self._run(query, {"key": val}) <- positional dict | |
| - new style: self._run(query, key=val, ...) <- keyword args | |
| Both are merged so existing callers and the 8 new loaders both work. | |
| """ | |
| if self.dry_run: | |
| logger.debug(f"[Loader] DRY RUN query: {query[:80]}...") | |
| return None | |
| # Merge positional dict with any keyword args | |
| all_params = {**(params or {}), **kwargs} | |
| try: | |
| with self.driver.session() as session: | |
| return session.run(query, all_params) | |
| except Exception as e: | |
| logger.error(f"[Loader] Query failed: {e}\nQuery: {query[:100]}") | |
| self.stats["errors"] += 1 | |
| return None | |
| # ?? Node loaders ?????????????????????????????????????? | |
| def load_politicians(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| name = r.get("name", "").strip() | |
| if not name: | |
| continue | |
| node_id = make_id(name, r.get("state", ""), r.get("source_election", "")) | |
| query = """ | |
| MERGE (p:Politician {id: $id}) | |
| SET p.name = $name, | |
| p.name_raw = $name_raw, | |
| p.party = $party, | |
| p.state = $state, | |
| p.election = $election, | |
| p.total_assets = $total_assets, | |
| p.liabilities = $liabilities, | |
| p.criminal_cases = $criminal_cases, | |
| p.education = $education, | |
| p.source = $source, | |
| p.scraped_at = $scraped_at | |
| """ | |
| params = { | |
| "id": node_id, | |
| "name": name, | |
| "name_raw": r.get("name_raw", name), | |
| "party": r.get("party", ""), | |
| "state": r.get("state", ""), | |
| "election": r.get("source_election", ""), | |
| "total_assets": str(r.get("total_assets", "")), | |
| "liabilities": str(r.get("liabilities", "")), | |
| "criminal_cases":str(r.get("criminal_case_count", "0")), | |
| "education": r.get("education", ""), | |
| "source": r.get("_source", "myneta"), | |
| "scraped_at": r.get("scraped_at", datetime.now().isoformat()), | |
| } | |
| self._run(query, params) | |
| count += 1 | |
| party = r.get("party", "").strip() | |
| if party: | |
| self._create_party_and_link(node_id, party) | |
| logger.success(f"[Loader] Politicians loaded: {count}") | |
| self.stats["nodes_created"] += count | |
| return count | |
| def _create_party_and_link(self, politician_id: str, party_name: str): | |
| party_id = make_id(party_name) | |
| query = """ | |
| MERGE (party:Party {id: $party_id}) | |
| SET party.name = $party_name | |
| WITH party | |
| MATCH (p:Politician {id: $pol_id}) | |
| MERGE (p)-[:MEMBER_OF]->(party) | |
| """ | |
| self._run(query, { | |
| "party_id": party_id, | |
| "party_name": party_name, | |
| "pol_id": politician_id, | |
| }) | |
| def load_companies(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| name = r.get("name", "").strip() | |
| if not name: | |
| continue | |
| node_id = r.get("cin", make_id(name, r.get("state", ""))) | |
| query = """ | |
| MERGE (c:Company {id: $id}) | |
| SET c.name = $name, | |
| c.name_raw = $name_raw, | |
| c.cin = $cin, | |
| c.status = $status, | |
| c.state = $state, | |
| c.registration_date = $reg_date, | |
| c.company_class = $company_class, | |
| c.source = $source, | |
| c.scraped_at = $scraped_at | |
| """ | |
| params = { | |
| "id": node_id, | |
| "name": name, | |
| "name_raw": r.get("name_raw", name), | |
| "cin": r.get("cin", ""), | |
| "status": r.get("status", ""), | |
| "state": r.get("state", ""), | |
| "reg_date": r.get("registration_date", ""), | |
| "company_class":r.get("company_class", ""), | |
| "source": r.get("_source", "mca"), | |
| "scraped_at": r.get("scraped_at", datetime.now().isoformat()), | |
| } | |
| self._run(query, params) | |
| count += 1 | |
| logger.success(f"[Loader] Companies loaded: {count}") | |
| self.stats["nodes_created"] += count | |
| return count | |
| def load_contracts(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| order_id = r.get("order_id", "").strip() | |
| seller = r.get("seller_name", r.get("seller_name_raw","")).strip() | |
| if not order_id: | |
| continue | |
| node_id = make_id(order_id) | |
| query = """ | |
| MERGE (c:Contract {id: $id}) | |
| SET c.order_id = $order_id, | |
| c.seller_name = $seller, | |
| c.buyer_org = $buyer, | |
| c.product = $product, | |
| c.amount_crore = $amount, | |
| c.order_date = $date, | |
| c.state = $state, | |
| c.source = $source, | |
| c.scraped_at = $scraped_at | |
| """ | |
| params = { | |
| "id": node_id, | |
| "order_id": order_id, | |
| "seller": seller, | |
| "buyer": r.get("buyer_org", ""), | |
| "product": r.get("product", ""), | |
| "amount": float(r.get("amount_crore") or (r.get("amount_inr",0)/1e7) or 0), | |
| "date": r.get("order_date", ""), | |
| "state": r.get("state", ""), | |
| "source": r.get("_source", "gem"), | |
| "scraped_at": r.get("scraped_at", datetime.now().isoformat()), | |
| } | |
| self._run(query, params) | |
| count += 1 | |
| if seller: | |
| self._link_company_to_contract(seller, node_id, float(r.get("amount_crore", 0) or 0)) | |
| buyer = r.get("buyer_org", "").strip() | |
| if buyer: | |
| self._link_contract_to_ministry(node_id, buyer) | |
| logger.success(f"[Loader] Contracts loaded: {count}") | |
| self.stats["nodes_created"] += count | |
| return count | |
| def _link_company_to_contract(self, seller_name, contract_id, amount): | |
| company_id = make_id(seller_name) | |
| self._run(""" | |
| MERGE (co:Company {id: $co_id}) | |
| SET co.name = $seller_name | |
| WITH co | |
| MATCH (c:Contract {id: $contract_id}) | |
| MERGE (co)-[r:WON_CONTRACT]->(c) | |
| SET r.amount_crore = $amount | |
| """, {"co_id": company_id, "seller_name": seller_name, | |
| "contract_id": contract_id, "amount": amount}) | |
| self.stats["rels_created"] += 1 | |
| def _link_contract_to_ministry(self, contract_id, ministry_name): | |
| ministry_id = make_id(ministry_name) | |
| self._run(""" | |
| MERGE (m:Ministry {id: $m_id}) | |
| SET m.name = $m_name | |
| WITH m | |
| MATCH (c:Contract {id: $contract_id}) | |
| MERGE (c)-[:AWARDED_BY]->(m) | |
| """, {"m_id": ministry_id, "m_name": ministry_name, "contract_id": contract_id}) | |
| self.stats["rels_created"] += 1 | |
| def load_audit_reports(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = r.get("title", "").strip() | |
| if not title: | |
| continue | |
| node_id = make_id(title, r.get("url", "")) | |
| params = { | |
| "id": node_id, | |
| "title": title, | |
| "url": r.get("url", ""), | |
| "year": str(r.get("year", "")), | |
| "state": r.get("state", ""), | |
| "scheme": r.get("scheme", ""), | |
| "amount": float(r.get("amount_crore") or (r.get("amount_inr",0)/1e7) or 0), | |
| "irr_type": r.get("irregularity_type", ""), | |
| "finding": r.get("finding", ""), | |
| "keywords": str(r.get("alert_keywords", [])), | |
| "source": r.get("_source", "cag"), | |
| "scraped_at": r.get("scraped_at", datetime.now().isoformat()), | |
| } | |
| self._run(""" | |
| MERGE (a:AuditReport {id: $id}) | |
| SET a.title=$title, a.url=$url, a.year=$year, a.state=$state, | |
| a.scheme=$scheme, a.amount_crore=$amount, | |
| a.irregularity_type=$irr_type, a.finding=$finding, | |
| a.alert_keywords=$keywords, a.source=$source, a.scraped_at=$scraped_at | |
| """, params) | |
| count += 1 | |
| scheme = r.get("scheme", "").strip() | |
| if scheme: | |
| self._link_audit_to_scheme(node_id, scheme, float(r.get("amount_crore", 0) or 0)) | |
| logger.success(f"[Loader] Audit reports loaded: {count}") | |
| self.stats["nodes_created"] += count | |
| return count | |
| def _link_audit_to_scheme(self, audit_id, scheme_name, amount): | |
| scheme_id = make_id(scheme_name) | |
| self._run(""" | |
| MERGE (s:Scheme {id: $s_id}) | |
| SET s.name = $s_name | |
| WITH s | |
| MATCH (a:AuditReport {id: $audit_id}) | |
| MERGE (a)-[r:FLAGS]->(s) | |
| SET r.amount_crore = $amount | |
| """, {"s_id": scheme_id, "s_name": scheme_name, "audit_id": audit_id, "amount": amount}) | |
| self.stats["rels_created"] += 1 | |
| def load_press_releases(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = r.get("title", "").strip() | |
| if not title: | |
| continue | |
| node_id = make_id(title, r.get("link", "")) | |
| self._run(""" | |
| MERGE (pr:PressRelease {id: $id}) | |
| SET pr.title=$title, pr.link=$link, pr.published=$published, | |
| pr.alert_keywords=$keywords, pr.source=$source, pr.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "title": title, "link": r.get("link", ""), | |
| "published": r.get("published", ""), | |
| "keywords": str(r.get("alert_keywords", [])), | |
| "source": r.get("_source", "pib"), | |
| "scraped_at":r.get("scraped_at", datetime.now().isoformat()), | |
| }) | |
| count += 1 | |
| logger.success(f"[Loader] Press releases loaded: {count}") | |
| self.stats["nodes_created"] += count | |
| return count | |
| def load_politician_company_links(self, matches: list) -> int: | |
| count = 0 | |
| for match in matches: | |
| pol_name = match.get("name_a", match.get("politician_name", "")).strip() | |
| co_name = match.get("name_b", match.get("company_name", "")).strip() | |
| score = match.get("score", 0.0) | |
| if not pol_name or not co_name: | |
| continue | |
| link_query = """ | |
| OPTIONAL MATCH (existing_p:Politician) | |
| WHERE toLower(existing_p.name) = toLower($pol_name) | |
| WITH coalesce(existing_p.id, $pol_id_fallback) AS pol_id | |
| MERGE (p:Politician {id: pol_id}) | |
| SET p.name = coalesce(p.name, $pol_name) | |
| WITH p | |
| OPTIONAL MATCH (existing_c:Company) | |
| WHERE toLower(existing_c.name) = toLower($co_name) | |
| WITH p, coalesce(existing_c.id, $co_id_fallback) AS co_id | |
| MERGE (c:Company {id: co_id}) | |
| SET c.name = coalesce(c.name, $co_name) | |
| MERGE (p)-[r:DIRECTOR_OF]->(c) | |
| SET r.confidence = $score, | |
| r.source = 'entity_resolution', | |
| r.detected_at = $now | |
| """ | |
| try: | |
| self._run(link_query, { | |
| "pol_name": pol_name, | |
| "pol_id_fallback": make_id(pol_name), | |
| "co_name": co_name, | |
| "co_id_fallback": make_id(co_name), | |
| "score": score, | |
| "now": datetime.now().isoformat(), | |
| }) | |
| count += 1 | |
| self.stats["rels_created"] += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] DIRECTOR_OF link {pol_name}->{co_name} failed: {e}") | |
| logger.info(f"[Loader] DIRECTOR_OF links created/updated: {count}") | |
| return count | |
| def load_affidavits(self, records: list) -> int: | |
| """B-05 FIX: was missing -- affidavit data scraped but never written to graph. | |
| Investigators query (p:Politician)-[:FILED_AFFIDAVIT]->(a:Affidavit) | |
| but zero Affidavit nodes existed. All affidavit queries returned empty. | |
| """ | |
| count = 0 | |
| for r in records: | |
| pol_id = (r.get("politician_id") or "").strip() | |
| year = r.get("year") | |
| if not pol_id or not year: | |
| continue | |
| aff_id = make_id(pol_id, str(year)) | |
| result = self._run( | |
| """ | |
| MERGE (a:Affidavit {id: }) | |
| SET a.politician_id = , | |
| a.year = , | |
| a.total_assets_crore = , | |
| a.movable_assets_crore = , | |
| a.liabilities_crore = , | |
| a.criminal_cases = , | |
| a.source = , | |
| a.scraped_at = | |
| WITH a | |
| MATCH (p:Politician {id: }) | |
| MERGE (p)-[:FILED_AFFIDAVIT]->(a) | |
| """, | |
| { | |
| "id": aff_id, | |
| "politician_id": pol_id, | |
| "year": int(year), | |
| "total_assets_crore": float(r.get("total_assets_crore") or 0), | |
| "movable_assets_crore": float(r.get("movable_assets_crore") or 0), | |
| "liabilities_crore": float(r.get("liabilities_crore") or 0), | |
| "criminal_cases": int(r.get("criminal_cases") or 0), | |
| "source": r.get("source", "myneta"), | |
| "scraped_at": r.get( | |
| "scraped_at", | |
| __import__("datetime").datetime.now().isoformat() | |
| ), | |
| } | |
| ) | |
| if result is not None: | |
| count += 1 | |
| self.stats["nodes_created"] += 1 | |
| logger.info(f"[Loader] Affidavits loaded: {count}") | |
| return count | |
| def load_from_pipeline_output(self, filepath: str) -> dict: | |
| """Load everything from a pipeline JSON output file.""" | |
| logger.info(f"[Loader] Loading from: {filepath}") | |
| with open(filepath, encoding="utf-8") as f: | |
| data = json.load(f) | |
| raw = data.get("raw", {}) | |
| links = data.get("links", []) | |
| self.setup_schema() | |
| results = {} | |
| if raw.get("myneta"): results["politicians"] = self.load_politicians(raw["myneta"]) | |
| if raw.get("mca"): results["companies"] = self.load_companies(raw["mca"]) | |
| if raw.get("gem"): results["contracts"] = self.load_contracts(raw["gem"]) | |
| if raw.get("cag"): results["audit_reports"] = self.load_audit_reports(raw["cag"]) | |
| if raw.get("pib"): results["press_releases"] = self.load_press_releases(raw["pib"]) | |
| if links: results["director_of_links"] = self.load_politician_company_links(links) | |
| if raw.get("sebi"): results["regulatory_orders"] = self.load_regulatory_orders(raw["sebi"]) | |
| if raw.get("ed"): results["enforcement_actions"] = self.load_enforcement_actions(raw["ed"]) | |
| if raw.get("electoral_bond"):results["electoral_bonds"] = self.load_electoral_bonds(raw["electoral_bond"]) | |
| if raw.get("ibbi"): results["insolvency_orders"] = self.load_insolvency_orders(raw["ibbi"]) | |
| if raw.get("ngo_darpan"): results["ngos"] = self.load_ngos(raw["ngo_darpan"]) | |
| if raw.get("cppp"): results["tenders"] = self.load_tenders(raw["cppp"]) | |
| if raw.get("loksabha"): results["parliament_questions"]= self.load_parliament_questions(raw["loksabha"]) | |
| if raw.get("cvc"): results["vigilance_circulars"] = self.load_vigilance_circulars(raw["cvc"]) | |
| # BUG-2 FIX: 7 previously missing loaders -- these datasets were scraped but | |
| # silently dropped because load_from_pipeline_output never called them. | |
| if raw.get("icij"): results["icij_entities"] = self.load_icij_entities(raw["icij"]) | |
| if raw.get("opensanctions"):results["sanctioned_entities"] = self.load_sanctioned_entities(raw["opensanctions"]) | |
| if raw.get("njdg"): results["court_cases"] = self.load_court_cases(raw["njdg"]) | |
| if raw.get("lgd"): results["local_bodies"] = self.load_local_bodies(raw["lgd"]) | |
| if raw.get("ncrb"): results["crime_reports"] = self.load_crime_reports(raw["ncrb"]) | |
| if raw.get("wikidata"): results["wikidata_enrichments"]= self.load_wikidata_enrichments(raw["wikidata"]) | |
| if raw.get("datagov"): results["datagov_documents"] = self.load_datagov_documents(raw["datagov"]) | |
| logger.success(f"[Loader] Load complete. Stats: {self.stats}") | |
| return {**results, "stats": self.stats} | |
| # ?? Phase 28 loaders (8 datasets) ???????????????????????????????????????? | |
| def load_regulatory_orders(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = (r.get("title") or "").strip() | |
| if not title: continue | |
| node_id = make_id(title) | |
| try: | |
| self._run(""" | |
| MERGE (n:RegulatoryOrder {id:$id}) | |
| SET n.title=$title, n.url=$url, n.order_type=$order_type, | |
| n.entity_name=$entity_name, n.violation=$violation, | |
| n.source=$source, n.dataset='sebi', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "title": title, "url": r.get("url",""), | |
| "order_type": r.get("order_type",""), | |
| "entity_name": r.get("entity_name", r.get("accused","")), | |
| "violation": r.get("violation",""), | |
| "source": r.get("source","SEBI"), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] RegulatoryOrder failed: {e}") | |
| self.stats["regulatory_orders"] = count | |
| logger.success(f"[Loader] Loaded {count} SEBI regulatory orders") | |
| return count | |
| def load_enforcement_actions(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = (r.get("title") or "").strip() | |
| if not title: continue | |
| node_id = make_id(title) | |
| try: | |
| self._run(""" | |
| MERGE (n:EnforcementAction {id:$id}) | |
| SET n.title=$title, n.url=$url, n.date=$date, | |
| n.amount_crore=$amount_crore, n.case_type=$case_type, | |
| n.accused=$accused, n.source=$source, | |
| n.dataset='ed', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "title": title, "url": r.get("url",""), | |
| "date": r.get("date",""), | |
| "amount_crore": float(r.get("amount_crore",0) or 0), | |
| "case_type": r.get("case_type",""), | |
| "accused": r.get("accused",""), | |
| "source": r.get("source","ED"), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| accused = r.get("accused","").strip() | |
| if accused: | |
| self._run(""" | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($accused) | |
| MATCH (n:EnforcementAction {id:$id}) | |
| MERGE (p)-[:SUBJECT_OF]->(n) | |
| """, {"accused": accused, "id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] EnforcementAction failed: {e}") | |
| self.stats["enforcement_actions"] = count | |
| logger.success(f"[Loader] Loaded {count} ED enforcement actions") | |
| return count | |
| def load_electoral_bonds(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| purchaser = (r.get("purchaser_name") or "").strip() | |
| if not purchaser: continue | |
| node_id = make_id(f"bond_{r.get('bond_number','')}_{purchaser}") | |
| try: | |
| self._run(""" | |
| MERGE (n:ElectoralBond {id:$id}) | |
| SET n.bond_number=$bond_no, n.purchaser_name=$purchaser, | |
| n.denomination_crore=$denom, n.purchase_date=$p_date, | |
| n.redemption_date=$r_date, n.redeemed_by=$redeemer, | |
| n.source=$source, n.dataset='electoral_bond', | |
| n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "bond_no": r.get("bond_number",""), | |
| "purchaser": purchaser, | |
| "denom": float(r.get("denomination_crore",0) or 0), | |
| "p_date": r.get("purchase_date",""), | |
| "r_date": r.get("redemption_date",""), | |
| "redeemer": r.get("redeemed_by",""), | |
| "source": r.get("source","ECI"), | |
| "scraped_at":r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| redeemer = r.get("redeemed_by","").strip() | |
| if purchaser and redeemer: | |
| self._run(""" | |
| MERGE (c:Company {id:$co_id}) | |
| ON CREATE SET c.name=$purchaser, c.source='electoral_bond' | |
| MERGE (p:Party {id:$pt_id}) | |
| ON CREATE SET p.name=$redeemer, p.source='electoral_bond' | |
| MATCH (b:ElectoralBond {id:$b_id}) | |
| MERGE (c)-[:DONATED_VIA]->(b)-[:REDEEMED_BY]->(p) | |
| """, { | |
| "co_id": make_id(purchaser), "purchaser": purchaser, | |
| "pt_id": make_id(redeemer), "redeemer": redeemer, | |
| "b_id": node_id, | |
| }) | |
| except Exception as e: | |
| logger.warning(f"[Loader] ElectoralBond failed: {e}") | |
| self.stats["electoral_bonds"] = count | |
| logger.success(f"[Loader] Loaded {count} electoral bonds") | |
| return count | |
| def load_insolvency_orders(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| company = (r.get("company_name") or "").strip() | |
| if not company: continue | |
| node_id = make_id(f"ibbi_{company}") | |
| try: | |
| self._run(""" | |
| MERGE (n:InsolvencyOrder {id:$id}) | |
| SET n.company_name=$company, n.cin=$cin, | |
| n.process_type=$process_type, n.admitted_date=$admitted_date, | |
| n.status=$status, n.admitted_claims=$claims, | |
| n.source=$source, n.dataset='ibbi', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "company": company, "cin": r.get("cin",""), | |
| "process_type": r.get("process_type",""), | |
| "admitted_date": r.get("admitted_date",""), | |
| "status": r.get("status",""), | |
| "claims": float(r.get("admitted_claims",0) or 0), | |
| "source": r.get("source","IBBI"), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| self._run(""" | |
| MATCH (c:Company) | |
| WHERE toLower(c.name) CONTAINS toLower($company) | |
| MATCH (n:InsolvencyOrder {id:$id}) | |
| MERGE (c)-[:HAS_INSOLVENCY]->(n) | |
| """, {"company": company, "id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] InsolvencyOrder failed: {e}") | |
| self.stats["insolvency_orders"] = count | |
| logger.success(f"[Loader] Loaded {count} IBBI insolvency orders") | |
| return count | |
| def load_ngos(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| name = (r.get("ngo_name") or "").strip() | |
| if not name: continue | |
| node_id = make_id(f"ngo_{r.get('darpan_id',name)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:NGO {id:$id}) | |
| SET n.ngo_name=$name, n.darpan_id=$darpan_id, | |
| n.state=$state, n.district=$district, | |
| n.registration_type=$reg_type, n.year_of_reg=$year, | |
| n.key_issues=$issues, n.csr_receipts=$csr, | |
| n.govt_grants=$grants, n.source=$source, | |
| n.dataset='ngo_darpan', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "name": name, | |
| "darpan_id": r.get("darpan_id",""), | |
| "state": r.get("state",""), | |
| "district": r.get("district",""), | |
| "reg_type": r.get("registration_type",""), | |
| "year": str(r.get("year_of_reg","")), | |
| "issues": str(r.get("key_issues",[])), | |
| "csr": float(r.get("csr_receipts",0) or 0), | |
| "grants": float(r.get("govt_grants",0) or 0), | |
| "source": r.get("source","NGO Darpan"), | |
| "scraped_at":r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] NGO failed: {e}") | |
| self.stats["ngos"] = count | |
| logger.success(f"[Loader] Loaded {count} NGOs") | |
| return count | |
| def load_tenders(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = (r.get("title") or "").strip() | |
| if not title: continue | |
| node_id = make_id(f"tender_{r.get('tender_id',title)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:Tender {id:$id}) | |
| SET n.tender_id=$tender_id, n.title=$title, | |
| n.ministry=$ministry, n.department=$dept, | |
| n.estimated_crore=$value, n.tender_type=$t_type, | |
| n.bid_end_date=$bid_end, n.status=$status, | |
| n.awarded_to=$awarded_to, n.awarded_value=$awarded_val, | |
| n.single_bid=$single_bid, n.source=$source, | |
| n.dataset='cppp', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "tender_id": r.get("tender_id",""), | |
| "title": title, | |
| "ministry": r.get("ministry",""), | |
| "dept": r.get("department",""), | |
| "value": float(r.get("estimated_crore",0) or 0), | |
| "t_type": r.get("tender_type",""), | |
| "bid_end": r.get("bid_submission_end",""), | |
| "status": r.get("status",""), | |
| "awarded_to": r.get("awarded_to",""), | |
| "awarded_val":float(r.get("awarded_value",0) or 0), | |
| "single_bid": bool(r.get("single_bid",False)), | |
| "source": r.get("source","CPPP"), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| ministry = r.get("ministry","").strip() | |
| if ministry: | |
| self._run(""" | |
| MERGE (m:Ministry {id:$m_id}) | |
| ON CREATE SET m.name=$ministry, m.source='cppp' | |
| MATCH (t:Tender {id:$t_id}) | |
| MERGE (m)-[:ISSUED_TENDER]->(t) | |
| """, {"m_id": make_id(ministry), "ministry": ministry, "t_id": node_id}) | |
| if r.get("single_bid"): | |
| self._run("MATCH (t:Tender {id:$id}) SET t.risk_flag='SINGLE_BID'", | |
| {"id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] Tender failed: {e}") | |
| self.stats["tenders"] = count | |
| logger.success(f"[Loader] Loaded {count} CPPP tenders") | |
| return count | |
| def load_parliament_questions(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| subject = (r.get("subject") or "").strip() | |
| if not subject: continue | |
| node_id = make_id(f"pq_{r.get('question_number','')}_{r.get('member_name','')}") | |
| try: | |
| self._run(""" | |
| MERGE (n:ParliamentQuestion {id:$id}) | |
| SET n.subject=$subject, n.question_number=$qno, | |
| n.member_name=$member, n.ministry=$ministry, | |
| n.session=$session, n.source=$source, | |
| n.dataset='loksabha', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "subject": subject, | |
| "qno": r.get("question_number",""), | |
| "member": r.get("member_name",""), | |
| "ministry": r.get("ministry",""), | |
| "session": r.get("session",""), | |
| "source": r.get("source","Lok Sabha"), | |
| "scraped_at":r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| member = r.get("member_name","").strip() | |
| if member: | |
| self._run(""" | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($member) | |
| MATCH (q:ParliamentQuestion {id:$id}) | |
| MERGE (p)-[:ASKED_QUESTION]->(q) | |
| """, {"member": member, "id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] ParliamentQuestion failed: {e}") | |
| self.stats["parliament_questions"] = count | |
| logger.success(f"[Loader] Loaded {count} Lok Sabha questions") | |
| return count | |
| def load_vigilance_circulars(self, records: list) -> int: | |
| count = 0 | |
| for r in records: | |
| title = (r.get("title") or "").strip() | |
| if not title: continue | |
| node_id = make_id(f"cvc_{r.get('circular_number',title)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:VigilanceCircular {id:$id}) | |
| SET n.title=$title, n.circular_number=$cno, | |
| n.date=$date, n.ministry=$ministry, | |
| n.subject=$subject, n.source=$source, | |
| n.dataset='cvc', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "title": title, | |
| "cno": r.get("circular_number",""), | |
| "date": r.get("date",""), | |
| "ministry": r.get("ministry",""), | |
| "subject": r.get("subject",""), | |
| "source": r.get("source","CVC"), | |
| "scraped_at":r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] VigilanceCircular failed: {e}") | |
| self.stats["vigilance_circulars"] = count | |
| logger.success(f"[Loader] Loaded {count} CVC circulars") | |
| return count | |
| # ?? BUG-2 FIX: 7 NEW loaders -- were scraped but never loaded ????????????? | |
| def load_icij_entities(self, records: list) -> int: | |
| """ICIJ Offshore Leaks entities -> ICIJEntity nodes.""" | |
| count = 0 | |
| for r in records: | |
| name = (r.get("name") or r.get("entity_name") or "").strip() | |
| if not name: continue | |
| node_id = make_id(f"icij_{r.get('node_id', name)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:ICIJEntity {id:$id}) | |
| SET n.name=$name, n.entity_type=$entity_type, | |
| n.jurisdiction=$jurisdiction, n.linked_to=$linked_to, | |
| n.dataset_name=$dataset_name, n.source='ICIJ', | |
| n.dataset='icij', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "name": name, | |
| "entity_type": r.get("entity_type", r.get("type","")), | |
| "jurisdiction": r.get("jurisdiction",""), | |
| "linked_to": str(r.get("linked_to",[])), | |
| "dataset_name": r.get("sourceID", r.get("dataset_name","")), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| # Cross-link to Indian politicians/companies by name similarity | |
| self._run(""" | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($name) | |
| MATCH (n:ICIJEntity {id:$id}) | |
| MERGE (p)-[:ASSOCIATED_WITH]->(n) | |
| """, {"name": name, "id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] ICIJEntity failed: {e}") | |
| self.stats["icij_entities"] = count | |
| logger.success(f"[Loader] Loaded {count} ICIJ entities") | |
| return count | |
| def load_sanctioned_entities(self, records: list) -> int: | |
| """OpenSanctions -> SanctionedEntity nodes.""" | |
| count = 0 | |
| for r in records: | |
| name = (r.get("name") or r.get("caption") or "").strip() | |
| if not name: continue | |
| node_id = make_id(f"sanction_{r.get('id', name)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:SanctionedEntity {id:$id}) | |
| SET n.name=$name, n.schema_type=$schema_type, | |
| n.topics=$topics, n.countries=$countries, | |
| n.sanctions_list=$sanctions_list, | |
| n.source='OpenSanctions', n.dataset='opensanctions', | |
| n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "name": name, | |
| "schema_type": r.get("schema",""), | |
| "topics": str(r.get("topics",[])), | |
| "countries": str(r.get("countries",[])), | |
| "sanctions_list":r.get("datasets",""), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| self._run(""" | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($name) | |
| MATCH (n:SanctionedEntity {id:$id}) | |
| MERGE (p)-[:ASSOCIATED_WITH]->(n) | |
| """, {"name": name, "id": node_id}) | |
| except Exception as e: | |
| logger.warning(f"[Loader] SanctionedEntity failed: {e}") | |
| self.stats["sanctioned_entities"] = count | |
| logger.success(f"[Loader] Loaded {count} sanctioned entities") | |
| return count | |
| def load_court_cases(self, records: list) -> int: | |
| """NJDG court pendency stats -> CourtCase nodes.""" | |
| count = 0 | |
| for r in records: | |
| court = (r.get("court_name") or r.get("state") or "").strip() | |
| if not court: continue | |
| node_id = make_id(f"njdg_{court}_{r.get('year','')}") | |
| try: | |
| self._run(""" | |
| MERGE (n:CourtCase {id:$id}) | |
| SET n.court_name=$court, n.state=$state, | |
| n.pending_cases=$pending, n.disposed_cases=$disposed, | |
| n.year=$year, n.source='NJDG', | |
| n.dataset='njdg', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "court": court, | |
| "state": r.get("state",""), | |
| "pending": int(r.get("pending_cases",0) or 0), | |
| "disposed": int(r.get("disposed_cases",0) or 0), | |
| "year": str(r.get("year","")), | |
| "scraped_at":r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] CourtCase failed: {e}") | |
| self.stats["court_cases"] = count | |
| logger.success(f"[Loader] Loaded {count} court case records") | |
| return count | |
| def load_local_bodies(self, records: list) -> int: | |
| """LGD (Local Government Directory) -> LocalBody nodes.""" | |
| count = 0 | |
| for r in records: | |
| name = (r.get("name") or r.get("state_name") or "").strip() | |
| if not name: continue | |
| node_id = make_id(f"lgd_{r.get('lgd_code',name)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:LocalBody {id:$id}) | |
| SET n.name=$name, n.lgd_code=$lgd_code, | |
| n.entity_type=$entity_type, n.state=$state, | |
| n.district=$district, n.source='LGD', | |
| n.dataset='lgd', n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "name": name, | |
| "lgd_code": str(r.get("lgd_code","")), | |
| "entity_type":r.get("entity_type", r.get("type","")), | |
| "state": r.get("state",""), | |
| "district": r.get("district",""), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] LocalBody failed: {e}") | |
| self.stats["local_bodies"] = count | |
| logger.success(f"[Loader] Loaded {count} local bodies") | |
| return count | |
| def load_crime_reports(self, records: list) -> int: | |
| """NCRB crime statistics -- stored as metadata nodes for context.""" | |
| count = 0 | |
| for r in records: | |
| state = (r.get("state") or "").strip() | |
| year = str(r.get("year","")).strip() | |
| if not state or not year: continue | |
| node_id = make_id(f"ncrb_{state}_{year}") | |
| try: | |
| self._run(""" | |
| MERGE (n:CrimeReport {id:$id}) | |
| SET n.state=$state, n.year=$year, | |
| n.crime_head=$crime_head, | |
| n.cases_registered=$cases, | |
| n.source='NCRB', n.dataset='ncrb', | |
| n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "state": state, "year": year, | |
| "crime_head": r.get("crime_head",""), | |
| "cases": int(r.get("cases_registered",0) or 0), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] CrimeReport failed: {e}") | |
| self.stats["crime_reports"] = count | |
| logger.success(f"[Loader] Loaded {count} NCRB crime reports") | |
| return count | |
| def load_wikidata_enrichments(self, records: list) -> int: | |
| """ | |
| Wikidata -- enriches EXISTING Politician nodes; does NOT create new ones. | |
| Uses MATCH not MERGE to avoid phantom nodes. | |
| """ | |
| count = 0 | |
| for r in records: | |
| name = (r.get("name") or "").strip() | |
| if not name: continue | |
| try: | |
| result = self._run(""" | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($name) OR toLower($name) CONTAINS toLower(p.name) | |
| OR any(a IN coalesce(p.aliases,[]) WHERE toLower(a) = toLower($name)) | |
| SET p.wikidata_id = coalesce($wikidata_id, p.wikidata_id), | |
| p.birth_year = coalesce($birth_year, p.birth_year), | |
| p.gender = coalesce($gender, p.gender), | |
| p.positions = coalesce($positions, p.positions), | |
| p.wikidata_enriched = true | |
| RETURN count(p) AS updated | |
| """, { | |
| "name": name, | |
| "wikidata_id":r.get("wikidata_id",""), | |
| "birth_year": str(r.get("birth_year","")), | |
| "gender": r.get("gender",""), | |
| "positions": str(r.get("positions",[])), | |
| }) | |
| if result: | |
| row = result.single() | |
| if row and row["updated"] > 0: | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] Wikidata enrichment failed for {name}: {e}") | |
| self.stats["wikidata_enrichments"] = count | |
| logger.success(f"[Loader] Wikidata enriched {count} existing Politician nodes") | |
| return count | |
| def load_datagov_documents(self, records: list) -> int: | |
| """data.gov.in datasets -- generic document nodes.""" | |
| count = 0 | |
| for r in records: | |
| title = (r.get("title") or r.get("resource_title") or "").strip() | |
| if not title: continue | |
| node_id = make_id(f"datagov_{r.get('resource_id',title)}") | |
| try: | |
| self._run(""" | |
| MERGE (n:DataGovDocument {id:$id}) | |
| SET n.title=$title, n.dataset_name=$dataset_name, | |
| n.resource_id=$resource_id, | |
| n.ministry=$ministry, | |
| n.source='data.gov.in', n.dataset='datagov', | |
| n.scraped_at=$scraped_at | |
| """, { | |
| "id": node_id, "title": title, | |
| "dataset_name":r.get("_dataset",""), | |
| "resource_id": r.get("resource_id",""), | |
| "ministry": r.get("ministry",""), | |
| "scraped_at": r.get("scraped_at",""), | |
| }) | |
| count += 1 | |
| except Exception as e: | |
| logger.warning(f"[Loader] DataGovDocument failed: {e}") | |
| self.stats["datagov_documents"] = count | |
| logger.success(f"[Loader] Loaded {count} DataGov documents") | |
| return count | |
| def close(self): | |
| if self.driver: | |
| self.driver.close() | |
| logger.info("[Loader] Neo4j connection closed") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="BharatGraph Graph Loader") | |
| parser.add_argument("--dry-run", action="store_true") | |
| parser.add_argument("--file", type=str, default=None) | |
| args = parser.parse_args() | |
| print("=" * 55) | |
| print("BharatGraph - Graph Loader") | |
| print("=" * 55) | |
| loader = GraphLoader(dry_run=args.dry_run) | |
| if args.file: | |
| results = loader.load_from_pipeline_output(args.file) | |
| else: | |
| import glob | |
| files = sorted(glob.glob("data/processed/pipeline_*.json")) | |
| if files: | |
| results = loader.load_from_pipeline_output(files[-1]) | |
| else: | |
| print("\nNo pipeline output found. Run: python -m processing.pipeline") | |
| results = {} | |
| loader.close() | |
| s = loader.stats | |
| print(f"\n{'='*55}\nLOAD SUMMARY\n{'='*55}") | |
| print(f" Nodes created: {s['nodes_created']}") | |
| print(f" Nodes merged: {s['nodes_merged']}") | |
| print(f" Relationships: {s['rels_created']}") | |
| print(f" Errors: {s['errors']}") | |
| if args.dry_run: | |
| print(" (DRY RUN -- nothing was written to Neo4j)") | |
| print("=" * 55) | |