bharatgraph / ai /graph_analytics.py
abinazebinoy's picture
fix(NEW-A3 part1): replace em-dashes/smart-quotes in 56+ Python files -- CI requires pure ASCII source
8a0fba4
Raw
History Blame Contribute Delete
9.83 kB
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from datetime import datetime
from loguru import logger
class GraphAnalytics:
def __init__(self, driver=None):
self.driver = driver
self._nx = None
self._load_networkx()
def _load_networkx(self):
try:
import networkx as nx
self._nx = nx
logger.success(f"[GraphAnalytics] NetworkX {nx.__version__} loaded")
except ImportError:
logger.error("[GraphAnalytics] NetworkX not installed. Run: pip install networkx")
raise
def _fetch_graph_from_neo4j(self, entity_id: str = None,
depth: int = 3) -> tuple:
if not self.driver:
return [], []
with self.driver.session() as session:
if entity_id:
rows = session.run(
f"""
MATCH path = (start {{id: $id}})-[*1..{depth}]-(end)
RETURN path LIMIT 500
""",
id=entity_id
).data()
else:
rows = session.run(
"""
MATCH (a)-[r]->(b)
RETURN a.id AS src, a.name AS src_name,
labels(a)[0] AS src_type,
type(r) AS rel,
b.id AS tgt, b.name AS tgt_name,
labels(b)[0] AS tgt_type
LIMIT 2000
"""
).data()
nodes = {}
edges = []
for row in rows:
src = row.get("src", "")
tgt = row.get("tgt", "")
if src and tgt:
nodes[src] = {
"name": row.get("src_name", src),
"type": row.get("src_type", "Unknown"),
}
nodes[tgt] = {
"name": row.get("tgt_name", tgt),
"type": row.get("tgt_type", "Unknown"),
}
edges.append((src, tgt, {"rel": row.get("rel", "")}))
return list(nodes.items()), edges
def _build_nx_graph(self, nodes: list, edges: list,
directed: bool = True):
nx = self._nx
G = nx.DiGraph() if directed else nx.Graph()
for node_id, attrs in nodes:
G.add_node(node_id, **attrs)
for src, tgt, attrs in edges:
G.add_edge(src, tgt, **attrs)
return G
def compute_betweenness_centrality(self, nodes: list,
edges: list) -> list:
nx = self._nx
G = self._build_nx_graph(nodes, edges, directed=False)
if G.number_of_nodes() < 3:
logger.warning("[GraphAnalytics] Too few nodes for centrality")
return []
centrality = nx.betweenness_centrality(G, normalized=True)
results = []
for node_id, score in sorted(
centrality.items(), key=lambda x: x[1], reverse=True
)[:20]:
attrs = G.nodes.get(node_id, {})
results.append({
"entity_id": node_id,
"name": attrs.get("name", node_id),
"type": attrs.get("type", "Unknown"),
"betweenness_centrality": round(score, 6),
"interpretation": (
"High betweenness: entity acts as a key bridge "
"between institutional networks"
if score > 0.1 else
"Low betweenness: entity is not a primary network bridge"
),
})
logger.success(
f"[GraphAnalytics] Betweenness computed for {len(centrality)} nodes. "
f"Top: {results[0]['name']} ({results[0]['betweenness_centrality']:.4f})"
if results else
"[GraphAnalytics] Betweenness computed -- no results"
)
return results
def compute_pagerank(self, nodes: list, edges: list) -> list:
nx = self._nx
G = self._build_nx_graph(nodes, edges, directed=True)
if G.number_of_nodes() < 2:
return []
pagerank = nx.pagerank(G, alpha=0.85)
results = []
for node_id, score in sorted(
pagerank.items(), key=lambda x: x[1], reverse=True
)[:20]:
attrs = G.nodes.get(node_id, {})
results.append({
"entity_id": node_id,
"name": attrs.get("name", node_id),
"type": attrs.get("type", "Unknown"),
"pagerank": round(score, 6),
})
logger.success(
f"[GraphAnalytics] PageRank computed for {len(pagerank)} nodes"
)
return results
def detect_communities(self, nodes: list, edges: list) -> list:
nx = self._nx
G = self._build_nx_graph(nodes, edges, directed=False)
if G.number_of_nodes() < 4:
return []
try:
from networkx.algorithms.community import greedy_modularity_communities
communities = list(greedy_modularity_communities(G))
except Exception:
communities = list(nx.connected_components(G))
results = []
for i, community in enumerate(communities):
if len(community) < 2:
continue
members = []
for node_id in community:
attrs = G.nodes.get(node_id, {})
members.append({
"id": node_id,
"name": attrs.get("name", node_id),
"type": attrs.get("type", "Unknown"),
})
results.append({
"community_id": i + 1,
"size": len(community),
"members": members,
"interpretation": (
"Large community detected -- may indicate a procurement "
"cluster or shared-director network warranting review"
if len(community) >= 5 else
"Small community -- limited network cluster"
),
})
logger.success(
f"[GraphAnalytics] {len(results)} communities detected"
)
return results
def write_centrality_to_graph(self, results: list, metric: str):
if not self.driver or not results:
return
with self.driver.session() as session:
for r in results:
session.run(
f"MATCH (n {{id: $id}}) SET n.{metric} = $score",
id=r["entity_id"],
score=r.get(metric, 0.0),
)
logger.success(
f"[GraphAnalytics] Wrote {metric} scores to {len(results)} nodes"
)
def run_full_analysis(self, entity_id: str = None) -> dict:
logger.info("[GraphAnalytics] Running full graph analysis")
nodes, edges = self._fetch_graph_from_neo4j(entity_id)
if not nodes:
logger.warning("[GraphAnalytics] No graph data from Neo4j")
return {"status": "no_data", "analyzed_at": datetime.now().isoformat()}
betweenness = self.compute_betweenness_centrality(nodes, edges)
pagerank = self.compute_pagerank(nodes, edges)
communities = self.detect_communities(nodes, edges)
if self.driver:
self.write_centrality_to_graph(betweenness, "betweenness_centrality")
self.write_centrality_to_graph(pagerank, "pagerank")
return {
"node_count": len(nodes),
"edge_count": len(edges),
"top_betweenness": betweenness[:5],
"top_pagerank": pagerank[:5],
"communities": communities[:10],
"analyzed_at": datetime.now().isoformat(),
}
if __name__ == "__main__":
print("=" * 55)
print("BharatGraph - Graph Analytics Test")
print("=" * 55)
sample_nodes = [
("P001", {"name": "Politician A", "type": "Politician"}),
("P002", {"name": "Politician B", "type": "Politician"}),
("C001", {"name": "Company X", "type": "Company"}),
("C002", {"name": "Company Y", "type": "Company"}),
("C003", {"name": "Company Z", "type": "Company"}),
("CT01", {"name": "Contract 1", "type": "Contract"}),
("CT02", {"name": "Contract 2", "type": "Contract"}),
("M001", {"name": "Ministry A", "type": "Ministry"}),
]
sample_edges = [
("P001", "C001", {"rel": "DIRECTOR_OF"}),
("P001", "C002", {"rel": "DIRECTOR_OF"}),
("P002", "C003", {"rel": "DIRECTOR_OF"}),
("C001", "CT01", {"rel": "WON_CONTRACT"}),
("C002", "CT01", {"rel": "WON_CONTRACT"}),
("C003", "CT02", {"rel": "WON_CONTRACT"}),
("M001", "CT01", {"rel": "AWARDED_BY"}),
("M001", "CT02", {"rel": "AWARDED_BY"}),
("P001", "P002", {"rel": "MEMBER_OF"}),
]
analytics = GraphAnalytics(driver=None)
print("\n Betweenness Centrality:")
bc = analytics.compute_betweenness_centrality(sample_nodes, sample_edges)
for r in bc[:3]:
print(f" {r['name']:20s} {r['betweenness_centrality']:.4f}")
print("\n PageRank:")
pr = analytics.compute_pagerank(sample_nodes, sample_edges)
for r in pr[:3]:
print(f" {r['name']:20s} {r['pagerank']:.4f}")
print("\n Communities:")
comms = analytics.detect_communities(sample_nodes, sample_edges)
for c in comms:
names = [m["name"] for m in c["members"]]
print(f" Community {c['community_id']}: {names}")
print("\nDone!")