|
|
from typing import List, Tuple |
|
|
|
|
|
|
|
|
def _normalize_table(table): |
|
|
if table is None: |
|
|
return [] |
|
|
return [list(row) for row in table if row and any(str(c).strip() for c in row)] |
|
|
|
|
|
|
|
|
def _upsert_row(table: List[List[str]], row: List[str], key_index: int) -> List[List[str]]: |
|
|
""" |
|
|
Insert or update a row in a table based on a key column index. |
|
|
""" |
|
|
key = row[key_index] |
|
|
updated = False |
|
|
new_table = [] |
|
|
for existing in table: |
|
|
if len(existing) > key_index and existing[key_index] == key: |
|
|
new_table.append(row) |
|
|
updated = True |
|
|
else: |
|
|
new_table.append(existing) |
|
|
if not updated: |
|
|
new_table.append(row) |
|
|
return new_table |
|
|
|
|
|
|
|
|
def _parse_params(tokens: list) -> dict: |
|
|
""" |
|
|
Parse simple key=value tokens, where value may be quoted. |
|
|
Example: id=event_in label="Incoming Event" type=event |
|
|
""" |
|
|
params = {} |
|
|
for t in tokens: |
|
|
if "=" not in t: |
|
|
continue |
|
|
k, v = t.split("=", 1) |
|
|
params[k.strip().lower()] = v.strip() |
|
|
return params |
|
|
|
|
|
|
|
|
def apply_command( |
|
|
command: str, |
|
|
nodes_table, |
|
|
edges_table, |
|
|
) -> Tuple[list, list, str]: |
|
|
""" |
|
|
Interpret a very small SQL-ish DSL to modify nodes and edges. |
|
|
|
|
|
Supported commands (case-insensitive): |
|
|
|
|
|
ADD NODE id=<id> label="<label>" type=<type> [style="<style>"] |
|
|
ADD EDGE source=<id> target=<id> [label="<label>"] [style="<style>"] |
|
|
DELETE NODE id=<id> |
|
|
DELETE EDGE source=<id> target=<id> |
|
|
STYLE NODE id=<id> style="<style>" |
|
|
STYLE EDGE source=<id> target=<id> style="<style>" |
|
|
|
|
|
Example: |
|
|
ADD NODE id=event_in label="Incoming Event" type=event |
|
|
ADD EDGE source=event_in target=handler label="route" |
|
|
DELETE NODE id=log |
|
|
DELETE EDGE source=handler target=log |
|
|
STYLE NODE id=handler style="fill:#0ea5e9,stroke:#0369a1" |
|
|
STYLE EDGE source=handler target=log style="stroke:#f97316,stroke-width:2px" |
|
|
|
|
|
Returns: |
|
|
new_nodes_table, new_edges_table, message |
|
|
""" |
|
|
cmd = command.strip() |
|
|
if not cmd: |
|
|
return nodes_table, edges_table, "No command provided." |
|
|
|
|
|
nodes = _normalize_table(nodes_table) |
|
|
edges = _normalize_table(edges_table) |
|
|
|
|
|
|
|
|
import shlex |
|
|
|
|
|
try: |
|
|
tokens = shlex.split(cmd) |
|
|
except ValueError as e: |
|
|
return nodes, edges, f"Command parse error: {e}" |
|
|
|
|
|
if not tokens: |
|
|
return nodes, edges, "Empty command." |
|
|
|
|
|
action = tokens[0].upper() |
|
|
|
|
|
|
|
|
|
|
|
if action == "ADD" and len(tokens) >= 2 and tokens[1].upper() == "NODE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
node_id = params.get("id") |
|
|
if not node_id: |
|
|
return nodes, edges, "ADD NODE requires id=<id>." |
|
|
label = params.get("label", node_id) |
|
|
ntype = params.get("type", "") |
|
|
style = params.get("style", "") |
|
|
|
|
|
new_row = [node_id, label, ntype, style] |
|
|
nodes = _upsert_row(nodes, new_row, 0) |
|
|
return nodes, edges, f"Node `{node_id}` added/updated." |
|
|
|
|
|
|
|
|
|
|
|
if action == "ADD" and len(tokens) >= 2 and tokens[1].upper() == "EDGE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
src = params.get("source") |
|
|
dst = params.get("target") |
|
|
if not src or not dst: |
|
|
return nodes, edges, "ADD EDGE requires source=<id> and target=<id>." |
|
|
label = params.get("label", "") |
|
|
style = params.get("style", "") |
|
|
|
|
|
new_row = [src, dst, label, style] |
|
|
edges.append(new_row) |
|
|
return nodes, edges, f"Edge {src} -> {dst} added." |
|
|
|
|
|
|
|
|
|
|
|
if action == "DELETE" and len(tokens) >= 2 and tokens[1].upper() == "NODE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
node_id = params.get("id") |
|
|
if not node_id: |
|
|
return nodes, edges, "DELETE NODE requires id=<id>." |
|
|
nodes = [r for r in nodes if str(r[0]) != node_id] |
|
|
edges = [r for r in edges if str(r[0]) != node_id and str(r[1]) != node_id] |
|
|
return nodes, edges, f"Node `{node_id}` and attached edges deleted." |
|
|
|
|
|
|
|
|
|
|
|
if action == "DELETE" and len(tokens) >= 2 and tokens[1].upper() == "EDGE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
src = params.get("source") |
|
|
dst = params.get("target") |
|
|
if not src or not dst: |
|
|
return nodes, edges, "DELETE EDGE requires source=<id> and target=<id>." |
|
|
new_edges = [] |
|
|
removed = 0 |
|
|
for r in edges: |
|
|
if len(r) >= 2 and str(r[0]) == src and str(r[1]) == dst: |
|
|
removed += 1 |
|
|
continue |
|
|
new_edges.append(r) |
|
|
edges = new_edges |
|
|
if removed: |
|
|
return nodes, edges, f"{removed} edge(s) {src} -> {dst} deleted." |
|
|
else: |
|
|
return nodes, edges, f"No edge {src} -> {dst} found." |
|
|
|
|
|
|
|
|
|
|
|
if action == "STYLE" and len(tokens) >= 2 and tokens[1].upper() == "NODE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
node_id = params.get("id") |
|
|
style = params.get("style", "") |
|
|
if not node_id or not style: |
|
|
return nodes, edges, "STYLE NODE requires id=<id> and style=\"...\"." |
|
|
new_nodes = [] |
|
|
updated = False |
|
|
for r in nodes: |
|
|
|
|
|
row = list(r) + ["", "", "", ""] |
|
|
if row[0] == node_id: |
|
|
row[3] = style |
|
|
updated = True |
|
|
new_nodes.append(row[:4]) |
|
|
if not updated: |
|
|
return nodes, edges, f"Node `{node_id}` not found." |
|
|
return new_nodes, edges, f"Style applied to node `{node_id}`." |
|
|
|
|
|
|
|
|
|
|
|
if action == "STYLE" and len(tokens) >= 2 and tokens[1].upper() == "EDGE": |
|
|
params = _parse_params(tokens[2:]) |
|
|
src = params.get("source") |
|
|
dst = params.get("target") |
|
|
style = params.get("style", "") |
|
|
if not src or not dst or not style: |
|
|
return nodes, edges, "STYLE EDGE requires source=<id>, target=<id>, and style=\"...\"." |
|
|
new_edges = [] |
|
|
updated = False |
|
|
for r in edges: |
|
|
|
|
|
row = list(r) + ["", "", "", ""] |
|
|
if row[0] == src and row[1] == dst: |
|
|
row[3] = style |
|
|
updated = True |
|
|
new_edges.append(row[:4]) |
|
|
if not updated: |
|
|
return nodes, edges, f"Edge {src} -> {dst} not found." |
|
|
return nodes, new_edges, f"Style applied to edge {src} -> {dst}." |
|
|
|
|
|
return nodes, edges, f"Unrecognized command: `{cmd}`." |
|
|
|