Spaces:
Running
Running
| """ | |
| BharatGraph - Cypher Query Library | |
| Pre-built queries for corruption pattern detection. | |
| Each query answers a real investigative question: | |
| Q: Which politicians are directors of companies that won contracts? | |
| Q: Which ministries have the most CAG audit flags? | |
| Q: Which companies won contracts repeatedly from the same ministry? | |
| Q: Show all entities linked to a named politician | |
| """ | |
| import os | |
| import sys | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from loguru import logger | |
| QUERIES = { | |
| # -- Core corruption pattern --------------------------- | |
| "politician_company_contracts": { | |
| "description": "Find politicians linked to companies that won govt contracts", | |
| "cypher": """ | |
| MATCH (p:Politician)-[:DIRECTOR_OF]->(c:Company)-[:WON_CONTRACT]->(ct:Contract) | |
| RETURN p.name AS politician, | |
| p.party AS party, | |
| p.state AS state, | |
| c.name AS company, | |
| ct.order_id AS contract_id, | |
| ct.amount_crore AS amount_crore, | |
| ct.buyer_org AS buyer_org, | |
| ct.order_date AS order_date | |
| ORDER BY ct.amount_crore DESC | |
| LIMIT 50 | |
| """, | |
| "risk_level": "HIGH", | |
| }, | |
| # -- Repeated contract winners ------------------------- | |
| "repeated_contract_winners": { | |
| "description": "Companies that won multiple contracts - concentration risk", | |
| "cypher": """ | |
| MATCH (c:Company)-[:WON_CONTRACT]->(ct:Contract) | |
| WITH c, count(ct) AS contract_count, sum(ct.amount_crore) AS total_crore | |
| WHERE contract_count >= 2 | |
| RETURN c.name AS company, | |
| contract_count, | |
| total_crore | |
| ORDER BY total_crore DESC | |
| LIMIT 20 | |
| """, | |
| "risk_level": "MEDIUM", | |
| }, | |
| # -- Ministry audit flags ------------------------------ | |
| "ministry_audit_flags": { | |
| "description": "Ministries most flagged by CAG audit reports", | |
| "cypher": """ | |
| MATCH (a:AuditReport)-[:FLAGS]->(m:Ministry) | |
| WITH m, count(a) AS audit_count, sum(a.amount_crore) AS total_flagged | |
| RETURN m.name AS ministry, | |
| audit_count, | |
| total_flagged AS total_crore_flagged | |
| ORDER BY total_crore_flagged DESC | |
| LIMIT 20 | |
| """, | |
| "risk_level": "MEDIUM", | |
| }, | |
| # -- Scheme irregularities ----------------------------- | |
| "scheme_irregularities": { | |
| "description": "Government schemes with largest CAG-flagged irregularities", | |
| "cypher": """ | |
| MATCH (a:AuditReport)-[r:FLAGS]->(s:Scheme) | |
| WITH s, count(a) AS report_count, sum(r.amount_crore) AS total_crore | |
| RETURN s.name AS scheme, | |
| report_count, | |
| total_crore | |
| ORDER BY total_crore DESC | |
| LIMIT 20 | |
| """, | |
| "risk_level": "HIGH", | |
| }, | |
| # -- Politicians with criminal cases ------------------- | |
| "politicians_with_cases": { | |
| "description": "Politicians with declared criminal cases", | |
| "cypher": """ | |
| MATCH (p:Politician) | |
| WHERE toInteger(p.criminal_cases) > 0 | |
| RETURN p.name AS name, | |
| p.party AS party, | |
| p.state AS state, | |
| p.criminal_cases AS criminal_cases, | |
| p.total_assets AS total_assets | |
| ORDER BY toInteger(p.criminal_cases) DESC | |
| LIMIT 50 | |
| """, | |
| "risk_level": "MEDIUM", | |
| }, | |
| # -- Full profile: one politician ---------------------- | |
| "politician_profile": { | |
| "description": "Full graph profile for a named politician", | |
| "cypher": """ | |
| MATCH (p:Politician) | |
| WHERE toLower(p.name) CONTAINS toLower($name) | |
| OPTIONAL MATCH (p)-[:MEMBER_OF]->(party:Party) | |
| OPTIONAL MATCH (p)-[:DIRECTOR_OF]->(co:Company) | |
| OPTIONAL MATCH (co)-[:WON_CONTRACT]->(ct:Contract) | |
| RETURN p.name AS politician, | |
| p.party AS party, | |
| p.state AS state, | |
| p.criminal_cases AS criminal_cases, | |
| p.total_assets AS total_assets, | |
| collect(DISTINCT co.name) AS companies, | |
| collect(DISTINCT ct.order_id) AS contracts, | |
| collect(DISTINCT ct.amount_crore) AS contract_amounts | |
| """, | |
| "risk_level": "INFO", | |
| "params": {"name": "politician name to search"}, | |
| }, | |
| # -- High value contracts ------------------------------ | |
| "high_value_contracts": { | |
| "description": "All contracts above threshold crore value", | |
| "cypher": """ | |
| MATCH (c:Company)-[:WON_CONTRACT]->(ct:Contract)-[:AWARDED_BY]->(m:Ministry) | |
| WHERE ct.amount_crore >= $min_crore | |
| RETURN ct.order_id AS order_id, | |
| c.name AS seller, | |
| ct.amount_crore AS crore, | |
| m.name AS ministry, | |
| ct.order_date AS date | |
| ORDER BY ct.amount_crore DESC | |
| LIMIT 50 | |
| """, | |
| "risk_level": "HIGH", | |
| "params": {"min_crore": 1.0}, | |
| }, | |
| # -- Node counts (health check) ------------------------ | |
| "node_counts": { | |
| "description": "Count of all node types in the graph", | |
| "cypher": """ | |
| MATCH (n) | |
| RETURN labels(n)[0] AS node_type, count(n) AS count | |
| ORDER BY count DESC | |
| """, | |
| "risk_level": "INFO", | |
| }, | |
| # -- Relationship counts (health check) ---------------- | |
| "relationship_counts": { | |
| "description": "Count of all relationship types in the graph", | |
| "cypher": """ | |
| MATCH ()-[r]->() | |
| RETURN type(r) AS relationship_type, count(r) AS count | |
| ORDER BY count DESC | |
| """, | |
| "risk_level": "INFO", | |
| }, | |
| } | |
| class QueryRunner: | |
| """ | |
| Runs pre-built Cypher queries against the Neo4j graph. | |
| """ | |
| def __init__(self): | |
| self.driver = None | |
| self._connect() | |
| def _connect(self): | |
| try: | |
| from neo4j import GraphDatabase | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| uri = os.getenv("NEO4J_URI", "") | |
| user = os.getenv("NEO4J_USER", "neo4j") | |
| pwd = os.getenv("NEO4J_PASSWORD", "") | |
| self.driver = GraphDatabase.driver(uri, auth=(user, pwd)) | |
| self.driver.verify_connectivity() | |
| logger.success(f"[Queries] Connected to Neo4j") | |
| except Exception as e: | |
| logger.error(f"[Queries] Connection failed: {e}") | |
| raise | |
| def run(self, query_name: str, params: dict = None) -> list: | |
| """Run a named query and return results as list of dicts.""" | |
| if query_name not in QUERIES: | |
| logger.error(f"[Queries] Unknown query: {query_name}") | |
| return [] | |
| q = QUERIES[query_name] | |
| logger.info(f"[Queries] Running: {query_name} ({q['risk_level']})") | |
| try: | |
| with self.driver.session() as session: | |
| result = session.run(q["cypher"], params or {}) | |
| rows = [dict(r) for r in result] | |
| logger.success(f"[Queries] {query_name}: {len(rows)} rows") | |
| return rows | |
| except Exception as e: | |
| logger.error(f"[Queries] {query_name} failed: {e}") | |
| return [] | |
| def run_all_checks(self) -> dict: | |
| """Run all INFO-level checks and return summary.""" | |
| results = {} | |
| results["node_counts"] = self.run("node_counts") | |
| results["relationship_counts"] = self.run("relationship_counts") | |
| return results | |
| def close(self): | |
| if self.driver: | |
| self.driver.close() | |
| def print_query_library(): | |
| """Print all available queries.""" | |
| print("=" * 55) | |
| print(" BharatGraph -- Cypher Query Library") | |
| print("=" * 55) | |
| for name, q in QUERIES.items(): | |
| risk = q["risk_level"] | |
| emoji = {"HIGH": "?", "MEDIUM": "?", "INFO": "?"}.get(risk, "?") | |
| print(f"\n {emoji} {name}") | |
| print(f" {q['description']}") | |
| if "params" in q: | |
| print(f" Params: {q['params']}") | |
| print(f"\n Total: {len(QUERIES)} queries") | |
| if __name__ == "__main__": | |
| print_query_library() | |
| print("\n\nTesting Neo4j connection...") | |
| try: | |
| runner = QueryRunner() | |
| print("\nRunning health checks...") | |
| checks = runner.run_all_checks() | |
| print("\nNode counts:") | |
| for row in checks.get("node_counts", []): | |
| print(f" ({row['node_type']}): {row['count']}") | |
| print("\nRelationship counts:") | |
| for row in checks.get("relationship_counts", []): | |
| print(f" [{row['relationship_type']}]: {row['count']}") | |
| runner.close() | |
| print("\nAll queries ready.") | |
| except Exception as e: | |
| print(f"\nCould not connect: {e}") | |
| print("Check NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD in .env") | |