AnalyzrAI / apps /copilot /graph.py
thejagstudio's picture
Upload 92 files
0310410 verified
"""Mock supply chain graph and traversal helpers for news correlation."""
from collections import deque
SUPPLY_CHAIN_GRAPH = {
"nodes": [
{"id": "lithium", "name": "Lithium", "type": "Commodity"},
{"id": "nickel", "name": "Nickel", "type": "Commodity"},
{"id": "neon-gas", "name": "Neon Gas", "type": "Commodity"},
{"id": "sqm", "name": "SQM", "type": "Supplier"},
{"id": "albemarle", "name": "Albemarle", "type": "Supplier"},
{"id": "tsmc", "name": "TSMC", "type": "Supplier"},
{"id": "foxconn", "name": "Foxconn", "type": "Supplier"},
{"id": "catl", "name": "CATL", "type": "Supplier"},
{"id": "tesla", "name": "Tesla", "type": "Company"},
{"id": "apple", "name": "Apple", "type": "Company"},
{"id": "nvidia", "name": "NVIDIA", "type": "Company"},
],
"edges": [
{"from": "lithium", "to": "sqm", "type": "ImpactedBy"},
{"from": "lithium", "to": "albemarle", "type": "ImpactedBy"},
{"from": "nickel", "to": "catl", "type": "ImpactedBy"},
{"from": "neon-gas", "to": "tsmc", "type": "ImpactedBy"},
{"from": "sqm", "to": "catl", "type": "Supplies"},
{"from": "albemarle", "to": "catl", "type": "Supplies"},
{"from": "catl", "to": "tesla", "type": "Supplies"},
{"from": "tsmc", "to": "apple", "type": "Supplies"},
{"from": "tsmc", "to": "nvidia", "type": "Supplies"},
{"from": "foxconn", "to": "apple", "type": "Supplies"},
],
}
NEWS_TRIGGER_KEYWORDS = {
"lithium": "lithium",
"nickel": "nickel",
"neon": "neon-gas",
"taiwan": "tsmc",
"chile": "lithium",
"semiconductor": "tsmc",
}
def _nodes_by_id():
return {node["id"]: node for node in SUPPLY_CHAIN_GRAPH["nodes"]}
def _outgoing_edges():
adjacency = {}
for edge in SUPPLY_CHAIN_GRAPH["edges"]:
adjacency.setdefault(edge["from"], []).append(edge)
return adjacency
def resolve_start_nodes(headline: str, extracted_entities: list[str]) -> list[str]:
headline_lc = (headline or "").lower()
start_nodes = set()
for keyword, node_id in NEWS_TRIGGER_KEYWORDS.items():
if keyword in headline_lc:
start_nodes.add(node_id)
node_map = _nodes_by_id()
extracted_entities_lc = {entity.lower() for entity in extracted_entities}
for node in node_map.values():
if node["name"].lower() in extracted_entities_lc:
start_nodes.add(node["id"])
if not start_nodes:
start_nodes.add("lithium")
return list(start_nodes)
def traverse_impacts(headline: str, extracted_entities: list[str]) -> list[dict]:
"""Run BFS from resolved start nodes and return downstream impact flow."""
node_map = _nodes_by_id()
adjacency = _outgoing_edges()
queue = deque((start_id, 1) for start_id in resolve_start_nodes(headline, extracted_entities))
visited = set()
impact_flow = []
while queue:
node_id, depth = queue.popleft()
if node_id in visited:
continue
visited.add(node_id)
node = node_map.get(node_id)
if node is None:
continue
severity = "High" if depth == 1 else "Medium" if depth == 2 else "Low"
probability = 0.78 if depth == 1 else 0.61 if depth == 2 else 0.44
timeframe = "0-30d" if depth == 1 else "1-2q" if depth == 2 else "2-4q"
impact_flow.append(
{
"step": depth,
"nodeName": node["name"],
"description": f"{node['name']} is exposed through the supply chain propagation path.",
"financialImpact": severity,
"probability": probability,
"timeframe": timeframe,
"nodeType": node["type"],
}
)
for edge in adjacency.get(node_id, []):
queue.append((edge["to"], depth + 1))
return sorted(impact_flow, key=lambda item: (item["step"], item["nodeName"]))