PeacebinfLow's picture
Create sql_adapter.py
678fe1a verified
from typing import List, Tuple
def _normalize_table(table):
if table is None:
return []
return [list(row) for row in table if row and any(str(c).strip() for c in row)]
def _upsert_row(table: List[List[str]], row: List[str], key_index: int) -> List[List[str]]:
"""
Insert or update a row in a table based on a key column index.
"""
key = row[key_index]
updated = False
new_table = []
for existing in table:
if len(existing) > key_index and existing[key_index] == key:
new_table.append(row)
updated = True
else:
new_table.append(existing)
if not updated:
new_table.append(row)
return new_table
def _parse_params(tokens: list) -> dict:
"""
Parse simple key=value tokens, where value may be quoted.
Example: id=event_in label="Incoming Event" type=event
"""
params = {}
for t in tokens:
if "=" not in t:
continue
k, v = t.split("=", 1)
params[k.strip().lower()] = v.strip()
return params
def apply_command(
command: str,
nodes_table,
edges_table,
) -> Tuple[list, list, str]:
"""
Interpret a very small SQL-ish DSL to modify nodes and edges.
Supported commands (case-insensitive):
ADD NODE id=<id> label="<label>" type=<type> [style="<style>"]
ADD EDGE source=<id> target=<id> [label="<label>"] [style="<style>"]
DELETE NODE id=<id>
DELETE EDGE source=<id> target=<id>
STYLE NODE id=<id> style="<style>"
STYLE EDGE source=<id> target=<id> style="<style>"
Example:
ADD NODE id=event_in label="Incoming Event" type=event
ADD EDGE source=event_in target=handler label="route"
DELETE NODE id=log
DELETE EDGE source=handler target=log
STYLE NODE id=handler style="fill:#0ea5e9,stroke:#0369a1"
STYLE EDGE source=handler target=log style="stroke:#f97316,stroke-width:2px"
Returns:
new_nodes_table, new_edges_table, message
"""
cmd = command.strip()
if not cmd:
return nodes_table, edges_table, "No command provided."
nodes = _normalize_table(nodes_table)
edges = _normalize_table(edges_table)
# Tokenize in a very simple way: split by spaces, keep quoted chunks
import shlex
try:
tokens = shlex.split(cmd)
except ValueError as e:
return nodes, edges, f"Command parse error: {e}"
if not tokens:
return nodes, edges, "Empty command."
action = tokens[0].upper()
# --- ADD NODE ------------------------------------------------------------
if action == "ADD" and len(tokens) >= 2 and tokens[1].upper() == "NODE":
params = _parse_params(tokens[2:])
node_id = params.get("id")
if not node_id:
return nodes, edges, "ADD NODE requires id=<id>."
label = params.get("label", node_id)
ntype = params.get("type", "")
style = params.get("style", "")
# nodes table: [id, label, type, style]
new_row = [node_id, label, ntype, style]
nodes = _upsert_row(nodes, new_row, 0)
return nodes, edges, f"Node `{node_id}` added/updated."
# --- ADD EDGE ------------------------------------------------------------
if action == "ADD" and len(tokens) >= 2 and tokens[1].upper() == "EDGE":
params = _parse_params(tokens[2:])
src = params.get("source")
dst = params.get("target")
if not src or not dst:
return nodes, edges, "ADD EDGE requires source=<id> and target=<id>."
label = params.get("label", "")
style = params.get("style", "")
# edges table: [source, target, label, style]
new_row = [src, dst, label, style]
edges.append(new_row)
return nodes, edges, f"Edge {src} -> {dst} added."
# --- DELETE NODE ---------------------------------------------------------
if action == "DELETE" and len(tokens) >= 2 and tokens[1].upper() == "NODE":
params = _parse_params(tokens[2:])
node_id = params.get("id")
if not node_id:
return nodes, edges, "DELETE NODE requires id=<id>."
nodes = [r for r in nodes if str(r[0]) != node_id]
edges = [r for r in edges if str(r[0]) != node_id and str(r[1]) != node_id]
return nodes, edges, f"Node `{node_id}` and attached edges deleted."
# --- DELETE EDGE ---------------------------------------------------------
if action == "DELETE" and len(tokens) >= 2 and tokens[1].upper() == "EDGE":
params = _parse_params(tokens[2:])
src = params.get("source")
dst = params.get("target")
if not src or not dst:
return nodes, edges, "DELETE EDGE requires source=<id> and target=<id>."
new_edges = []
removed = 0
for r in edges:
if len(r) >= 2 and str(r[0]) == src and str(r[1]) == dst:
removed += 1
continue
new_edges.append(r)
edges = new_edges
if removed:
return nodes, edges, f"{removed} edge(s) {src} -> {dst} deleted."
else:
return nodes, edges, f"No edge {src} -> {dst} found."
# --- STYLE NODE ----------------------------------------------------------
if action == "STYLE" and len(tokens) >= 2 and tokens[1].upper() == "NODE":
params = _parse_params(tokens[2:])
node_id = params.get("id")
style = params.get("style", "")
if not node_id or not style:
return nodes, edges, "STYLE NODE requires id=<id> and style=\"...\"."
new_nodes = []
updated = False
for r in nodes:
# nodes table expected as [id, label, type, style]
row = list(r) + ["", "", "", ""]
if row[0] == node_id:
row[3] = style
updated = True
new_nodes.append(row[:4])
if not updated:
return nodes, edges, f"Node `{node_id}` not found."
return new_nodes, edges, f"Style applied to node `{node_id}`."
# --- STYLE EDGE ----------------------------------------------------------
if action == "STYLE" and len(tokens) >= 2 and tokens[1].upper() == "EDGE":
params = _parse_params(tokens[2:])
src = params.get("source")
dst = params.get("target")
style = params.get("style", "")
if not src or not dst or not style:
return nodes, edges, "STYLE EDGE requires source=<id>, target=<id>, and style=\"...\"."
new_edges = []
updated = False
for r in edges:
# edges table expected as [source, target, label, style]
row = list(r) + ["", "", "", ""]
if row[0] == src and row[1] == dst:
row[3] = style
updated = True
new_edges.append(row[:4])
if not updated:
return nodes, edges, f"Edge {src} -> {dst} not found."
return nodes, new_edges, f"Style applied to edge {src} -> {dst}."
return nodes, edges, f"Unrecognized command: `{cmd}`."