"""Mock supply chain graph and traversal helpers for news correlation.""" from collections import deque SUPPLY_CHAIN_GRAPH = { "nodes": [ {"id": "lithium", "name": "Lithium", "type": "Commodity"}, {"id": "nickel", "name": "Nickel", "type": "Commodity"}, {"id": "neon-gas", "name": "Neon Gas", "type": "Commodity"}, {"id": "sqm", "name": "SQM", "type": "Supplier"}, {"id": "albemarle", "name": "Albemarle", "type": "Supplier"}, {"id": "tsmc", "name": "TSMC", "type": "Supplier"}, {"id": "foxconn", "name": "Foxconn", "type": "Supplier"}, {"id": "catl", "name": "CATL", "type": "Supplier"}, {"id": "tesla", "name": "Tesla", "type": "Company"}, {"id": "apple", "name": "Apple", "type": "Company"}, {"id": "nvidia", "name": "NVIDIA", "type": "Company"}, ], "edges": [ {"from": "lithium", "to": "sqm", "type": "ImpactedBy"}, {"from": "lithium", "to": "albemarle", "type": "ImpactedBy"}, {"from": "nickel", "to": "catl", "type": "ImpactedBy"}, {"from": "neon-gas", "to": "tsmc", "type": "ImpactedBy"}, {"from": "sqm", "to": "catl", "type": "Supplies"}, {"from": "albemarle", "to": "catl", "type": "Supplies"}, {"from": "catl", "to": "tesla", "type": "Supplies"}, {"from": "tsmc", "to": "apple", "type": "Supplies"}, {"from": "tsmc", "to": "nvidia", "type": "Supplies"}, {"from": "foxconn", "to": "apple", "type": "Supplies"}, ], } NEWS_TRIGGER_KEYWORDS = { "lithium": "lithium", "nickel": "nickel", "neon": "neon-gas", "taiwan": "tsmc", "chile": "lithium", "semiconductor": "tsmc", } def _nodes_by_id(): return {node["id"]: node for node in SUPPLY_CHAIN_GRAPH["nodes"]} def _outgoing_edges(): adjacency = {} for edge in SUPPLY_CHAIN_GRAPH["edges"]: adjacency.setdefault(edge["from"], []).append(edge) return adjacency def resolve_start_nodes(headline: str, extracted_entities: list[str]) -> list[str]: headline_lc = (headline or "").lower() start_nodes = set() for keyword, node_id in NEWS_TRIGGER_KEYWORDS.items(): if keyword in headline_lc: start_nodes.add(node_id) node_map = _nodes_by_id() extracted_entities_lc = {entity.lower() for entity in extracted_entities} for node in node_map.values(): if node["name"].lower() in extracted_entities_lc: start_nodes.add(node["id"]) if not start_nodes: start_nodes.add("lithium") return list(start_nodes) def traverse_impacts(headline: str, extracted_entities: list[str]) -> list[dict]: """Run BFS from resolved start nodes and return downstream impact flow.""" node_map = _nodes_by_id() adjacency = _outgoing_edges() queue = deque((start_id, 1) for start_id in resolve_start_nodes(headline, extracted_entities)) visited = set() impact_flow = [] while queue: node_id, depth = queue.popleft() if node_id in visited: continue visited.add(node_id) node = node_map.get(node_id) if node is None: continue severity = "High" if depth == 1 else "Medium" if depth == 2 else "Low" probability = 0.78 if depth == 1 else 0.61 if depth == 2 else 0.44 timeframe = "0-30d" if depth == 1 else "1-2q" if depth == 2 else "2-4q" impact_flow.append( { "step": depth, "nodeName": node["name"], "description": f"{node['name']} is exposed through the supply chain propagation path.", "financialImpact": severity, "probability": probability, "timeframe": timeframe, "nodeType": node["type"], } ) for edge in adjacency.get(node_id, []): queue.append((edge["to"], depth + 1)) return sorted(impact_flow, key=lambda item: (item["step"], item["nodeName"]))