Spaces:
Runtime error
Runtime error
| # -*- coding: utf-8; -*- | |
| # GraphaLogue Analyzer | |
| # Marco Kuhlmann <marco.kuhlmann@liu.se> | |
| # Stephan Oepen <oe@ifi.uio.no> | |
| from datetime import datetime; | |
| import html; | |
| import operator; | |
| from pathlib import Path; | |
| import sys; | |
| import score.core; | |
| # | |
| # default values on edge attributes, which will be removed in normalization. | |
| # because all constants are normalized to lowercase strings prior to testing | |
| # for default values, we need to deal in the normalized values here. | |
| # | |
| ATTRIBUTE_DEFAULTS = {"remote": "false", | |
| "effective": "false", "member": "false"}; | |
| FLAVORS = {"dm": 0, "psd": 0, "ptg": 0, | |
| "eds": 1, "ptg": 1, "ucca": 1, | |
| "amr": 2, "drg": 2}; | |
| class Node(object): | |
| def __init__(self, id, label = None, properties = None, values = None, | |
| anchors = None, top = False, type = 1, anchorings = None): | |
| self.id = id | |
| self.type = type; | |
| self.label = label; | |
| self.properties = properties; | |
| self.values = values; | |
| self.anchorings = anchorings; | |
| self.incoming_edges = set() | |
| self.outgoing_edges = set() | |
| self.anchors = anchors; | |
| self.is_top = top | |
| def set_property(self, name, value): | |
| if self.properties and self.values: | |
| try: | |
| i = self.properties.index(name); | |
| self.values[i] = value; | |
| except ValueError: | |
| self.properties.append(name); | |
| self.values.append(value); | |
| else: | |
| self.properties = [name]; | |
| self.values = [value]; | |
| def set_anchoring(self, name, value): | |
| # | |
| # _fix_me_ | |
| # this (currently only used in the AMR overlay) will not work in the | |
| # general case, where all three arrays should correspond in order. | |
| # (22-jun-20; oe) | |
| if self.properties and self.anchorings: | |
| try: | |
| i = self.properties.index(name); | |
| self.anchorings[i] = value; | |
| except ValueError: | |
| self.properties.append(name); | |
| self.anchorings.append(value); | |
| else: | |
| self.properties = [name]; | |
| self.anchorings = [value]; | |
| def add_anchor(self, anchor): | |
| if anchor is not None: | |
| if self.anchors is None: self.anchors = [anchor]; | |
| elif anchor not in self.anchors: self.anchors.append(anchor); | |
| def is_root(self): | |
| return len(self.incoming_edges) == 0 | |
| def is_leaf(self): | |
| return len(self.outgoing_edges) == 0 | |
| def is_singleton(self): | |
| return self.is_root() and self.is_leaf() and not self.is_top | |
| def normalize(self, actions, input = None, trace = 0): | |
| def union(anchors): | |
| characters = set(); | |
| for anchor in anchors: | |
| if "from" in anchor and "to" in anchor: | |
| for i in range(anchor["from"], anchor["to"]): | |
| characters.add(i); | |
| result = []; | |
| last = start = None; | |
| for i in sorted(characters): | |
| if start is None: start = i; | |
| if last is None: | |
| last = i; | |
| continue; | |
| elif i == last + 1 \ | |
| or all(c in score.core.SPACE for c in input[last:i]): | |
| last = i; | |
| continue; | |
| else: | |
| result.append({"from": start, "to": last + 1}); | |
| last = start = i; | |
| if len(characters) > 0: | |
| result.append({"from": start, "to": i + 1}); | |
| if anchors != result: | |
| old = [anchor for anchor in anchors if anchor not in result]; | |
| new = [anchor for anchor in result if anchor not in anchors]; | |
| print("{} ==> {} [{}]".format(old, new, input), | |
| file = sys.stderr); | |
| return result; | |
| def trim(anchor, input): | |
| if "from" in anchor and "to" in anchor: | |
| i = max(anchor["from"], 0); | |
| j = min(anchor["to"], len(input)); | |
| while i < j and input[i] in score.core.PUNCTUATION: i += 1; | |
| while j > i and input[j - 1] in score.core.PUNCTUATION: j -= 1; | |
| if trace and (i != anchor["from"] or j != anchor["to"]): | |
| print("{} ({}) --> <{}:{}> ({})" | |
| "".format(anchor, | |
| input[anchor["from"]:anchor["to"]], | |
| i, j, input[i:j]), | |
| file = sys.stderr); | |
| anchor["from"] = i; | |
| anchor["to"] = j; | |
| if self.anchors is not None and "anchors" in actions: | |
| self.anchors = union(self.anchors); | |
| if self.anchors is not None and len(self.anchors) > 0 and input: | |
| for anchor in self.anchors: trim(anchor, input); | |
| elif isinstance(self.anchors, list) and len(self.anchors) == 0: | |
| self.anchors = None; | |
| if "case" in actions: | |
| if self.label is not None: | |
| self.label = str(self.label).lower(); | |
| if self.properties and self.values: | |
| for i in range(len(self.properties)): | |
| self.properties[i] = str(self.properties[i]).lower(); | |
| self.values[i] = str(self.values[i]).lower(); | |
| def compare(self, node): | |
| # | |
| # keep track of node-local pieces of information that either occur in | |
| # both nodes (i.e. match), or only in the first or second of them. in | |
| # guiding the MCES search, we (apparently) use the net gain of matching | |
| # pieces /minus/ those not matching on either side. that does not lead | |
| # to monotonicity, in the sense of cumulative scores moving either up | |
| # or down as more node correspondences are fixed, but for guiding the | |
| # MCES search monotonicity fortunately is not a requirement either. | |
| # | |
| count1 = both = count2 = 0; | |
| if node is None: | |
| if self.is_top: | |
| count1 += 1; | |
| if self.label is not None: | |
| count1 += 1; | |
| if self.properties is not None: | |
| count1 += len(self.properties); | |
| return both - count1 - count2, count1, both, count2; | |
| if self.is_top: | |
| if node.is_top: both += 1; | |
| else: count1 += 1; | |
| else: | |
| if node.is_top: count2 += 1; | |
| else: both += 1; | |
| if self.label is not None: | |
| if self.label == node.label: | |
| both += 1; | |
| else: | |
| count1 += 1; | |
| if node.label is not None: count2 += 1; | |
| if self.properties is not None: | |
| if node.properties is None: | |
| count1 += len(self.properties); | |
| else: | |
| properties1 = {(property, self.values[i]) | |
| for i, property in enumerate(self.properties)}; | |
| properties2 = {(property, node.values[i]) | |
| for i, property in enumerate(node.properties)}; | |
| n = len(properties1 & properties2); | |
| count1 += len(properties1) - n; | |
| both += n; | |
| count2 += len(properties2) - n; | |
| elif node.properties is not None: | |
| count2 += len(node.properties); | |
| return both - count1 - count2, count1, both, count2; | |
| def encode(self): | |
| json = {"id": self.id}; | |
| if self.label: | |
| json["label"] = self.label; | |
| if self.properties and self.values or self.anchorings: | |
| json["properties"] = self.properties; | |
| if self.values: | |
| json["values"] = self.values; | |
| if self.anchorings: | |
| json["anchorings"] = self.anchorings; | |
| if self.anchors: | |
| json["anchors"] = self.anchors; | |
| return json; | |
| def decode(json): | |
| id = json["id"] | |
| label = json.get("label", None) | |
| properties = json.get("properties", None) | |
| values = json.get("values", None) | |
| anchorings = json.get("anchorings", None) | |
| anchors = json.get("anchors", None) | |
| return Node(id=id, label=label, properties=properties, values=values, anchors=anchors, anchorings=anchorings) | |
| def dot(self, stream, input = None, ids = False, strings = False, | |
| errors = None, overlay = False): | |
| shapes = ["square", "oval", "diamond", "triangle"]; | |
| if errors is not None and "correspondences" in errors: | |
| correspondences = {g: s for g, s in errors["correspondences"]}; | |
| else: | |
| correspondences = None; | |
| missing = [None, [], [], None]; | |
| surplus = [None, [], [], None]; | |
| if errors is not None: | |
| if "labels" in errors and "missing" in errors["labels"]: | |
| for id, label in errors["labels"]["missing"]: | |
| if id == self.id: missing[0] = label; | |
| if "properties" in errors and "missing" in errors["properties"]: | |
| for id, property, value in errors["properties"]["missing"]: | |
| if id == self.id: | |
| missing[1].append(property); missing[2].append(value); | |
| if "anchors" in errors and "missing" in errors["anchors"]: | |
| for id, anchor in errors["anchors"]["missing"]: | |
| if id == self.id: missing[3] = anchor; | |
| if correspondences is not None and self.id in correspondences: | |
| key = correspondences[self.id]; | |
| if "labels" in errors and "surplus" in errors["labels"]: | |
| for id, label in errors["labels"]["surplus"]: | |
| if id == key: surplus[0] = label; | |
| if "properties" in errors and "surplus" in errors["properties"]: | |
| for id, property, value in errors["properties"]["surplus"]: | |
| if id == key: | |
| surplus[1].append(property); surplus[2].append(value); | |
| if "anchors" in errors and "surplus" in errors["anchors"]: | |
| for id, anchor in errors["anchors"]["surplus"]: | |
| if id == key: surplus[3] = anchor; | |
| if self.label \ | |
| or ids and not overlay \ | |
| or self.properties and self.values \ | |
| or self.anchors \ | |
| or missing[0] is not None or len(missing[1]) > 0 \ | |
| or missing[3] is not None \ | |
| or surplus[0] is not None or len(surplus[1]) > 0 \ | |
| or surplus[3] is not None: | |
| if self.type in {0, 1, 2, 3}: | |
| shape = "shape={}, ".format(shapes[self.type]); | |
| else: | |
| shape = ""; | |
| color = "color=blue, " if overlay else ""; | |
| print(" {} [ {}{}label=<<table align=\"center\" border=\"0\" cellspacing=\"0\">" | |
| "".format(self.id, shape, color), end = "", file = stream); | |
| if ids and not overlay: | |
| print("<tr><td colspan=\"2\">#{}</td></tr>" | |
| "".format(self.id), end = "", file = stream); | |
| if self.label: | |
| if missing[0]: font = "<font color=\"red\">"; | |
| elif overlay: font = "<font color=\"blue\">"; | |
| else: font = "<font>"; | |
| print("<tr><td colspan=\"2\">{}{}</font></td></tr>" | |
| "".format(font, html.escape(self.label, False)), | |
| end = "", file = stream); | |
| if surplus[0]: | |
| font = "<font color=\"blue\">"; | |
| print("<tr><td colspan=\"2\">{}{}</font></td></tr>" | |
| "".format(font, html.escape(surplus[0], False)), | |
| end = "", file = stream); | |
| def __anchors__(anchors, color): | |
| print("<tr><td colspan=\"2\"><font color=\"{}\">{{" | |
| "".format(color), end = "", file = stream); | |
| for index in anchors: | |
| print("{}{}".format(" " if index != anchors[0] else "", index), | |
| end = "", file = stream); | |
| print("}</font></td></tr>", end = "", file = stream); | |
| if self.anchors is not None: | |
| if overlay: | |
| __anchors__(self.anchors, "blue"); | |
| else: | |
| print("<tr><td colspan=\"2\">", end = "", file = stream); | |
| for anchor in self.anchors: | |
| if strings and input: | |
| print("{}<font face=\"Courier\">{}</font>" | |
| "".format(", " if anchor != self.anchors[0] else "", | |
| html.escape(input[anchor["from"]:anchor["to"]])), | |
| end = "", file = stream); | |
| else: | |
| print("{}〈{}:{}〉" | |
| "".format(" " if anchor != self.anchors[0] else "", | |
| anchor["from"], anchor["to"]), | |
| end = "", file = stream); | |
| print("</td></tr>", end = "", file = stream); | |
| if missing[3]: __anchors__(missing[3], "red"); | |
| if surplus[3]: __anchors__(surplus[3], "blue"); | |
| def __properties__(names, values, color): | |
| font = "<font color=\"{}\">".format(color); | |
| for name, value in zip(names, values): | |
| print("<tr><td sides=\"l\" border=\"1\" align=\"left\">{}{}</font>" | |
| "</td><td sides=\"r\" border=\"1\" align=\"left\">{}{}</font></td></tr>" | |
| "".format(font, html.escape(name, False), | |
| font, html.escape(value), False), | |
| end = "", file = stream); | |
| if self.properties and self.values: | |
| if not overlay: | |
| for name, value in zip(self.properties, self.values): | |
| i = None; | |
| try: | |
| i = missing[1].index(name); | |
| except: | |
| pass; | |
| if i is None or missing[2][i] != value: | |
| __properties__([name], [value], "black"); | |
| else: | |
| __properties__(self.properties, self.values, "blue"); | |
| if len(missing[1]) > 0: __properties__(missing[1], missing[2], "red"); | |
| if len(surplus[1]) > 0: __properties__(surplus[1], surplus[2], "blue"); | |
| print("</table>> ];", file = stream); | |
| elif overlay is None or self.id < 0: | |
| shape = "{}, label=\" \"".format(shapes[0]) if self.type == 0 else "point"; | |
| print(" {} [ shape={}, width=0.2 ];" | |
| "".format(self.id, shape), file = stream); | |
| def __key(self): | |
| return self.id | |
| def __eq__(self, other): | |
| return self.__key() == other.__key() | |
| def __lt__(self, other): | |
| return self.__key() < other.__key() | |
| def __hash__(self): | |
| return hash(self.__key()) | |
| class Edge(object): | |
| def __init__(self, id, src, tgt, lab, normal = None, | |
| attributes = None, values = None, anchors = None): | |
| self.id = id; | |
| self.src = src; | |
| self.tgt = tgt; | |
| self.lab = lab; | |
| self.normal = normal; | |
| self.attributes = attributes; | |
| self.values = values; | |
| self.anchors = anchors; | |
| def is_loop(self): | |
| return self.src == self.tgt | |
| def min(self): | |
| return min(self.src, self.tgt) | |
| def max(self): | |
| return max(self.src, self.tgt) | |
| def endpoints(self): | |
| return self.min(), self.max() | |
| def length(self): | |
| return self.max() - self.min() | |
| def normalize(self, actions, trace = 0): | |
| if "edges" in actions: | |
| if self.normal is None \ | |
| and self.lab is not None: | |
| label = self.lab; | |
| if label == "mod": | |
| self.normal = "domain"; | |
| elif label.endswith("-of-of") \ | |
| or label.endswith("-of") \ | |
| and label not in {"consist-of" "subset-of"} \ | |
| and not label.startswith("prep-"): | |
| self.normal = label[:-3]; | |
| if self.normal: | |
| target = self.src; | |
| self.src = self.tgt; | |
| self.tgt = target; | |
| self.lab = self.normal; | |
| self.normal = None; | |
| if "case" in actions: | |
| if self.lab is not None: | |
| self.lab = str(self.lab).lower(); | |
| if self.normal is not None: | |
| self.normal = str(self.normal).lower(); | |
| if self.attributes and self.values: | |
| for i in range(len(self.attributes)): | |
| self.attributes[i] = str(self.attributes[i]).lower(); | |
| self.values[i] = str(self.values[i]).lower(); | |
| if "attributes" in actions and self.attributes and self.values: | |
| # | |
| # drop (attribute, value) pairs whose value is the default value | |
| # | |
| attribute_value_pairs = [ | |
| (attribute, value) for attribute, value | |
| in zip(self.attributes, self.values) | |
| if attribute not in ATTRIBUTE_DEFAULTS | |
| or ATTRIBUTE_DEFAULTS[attribute] != value] | |
| self.attributes, self.values \ | |
| = tuple(map(list, zip(*attribute_value_pairs))) or ([], []) | |
| def encode(self): | |
| json = {"id": self.id}; | |
| if self.src is not None: json["source"] = self.src; | |
| if self.tgt is not None: json["target"] = self.tgt; | |
| if self.lab: json["label"] = self.lab; | |
| if self.normal: json["normal"] = self.normal; | |
| if self.attributes and self.values: | |
| json["attributes"] = self.attributes; | |
| json["values"] = self.values; | |
| if self.anchors: json["anchors"] = self.anchors; | |
| return json; | |
| def decode(json): | |
| id = json.get("id", None); | |
| src = json.get("source", None); | |
| tgt = json.get("target", None); | |
| lab = json.get("label", None); | |
| if lab == "": lab = None; | |
| normal = json.get("normal", None) | |
| attributes = json.get("attributes", None) | |
| if attributes is None: | |
| attributes = json.get("properties", None) | |
| if attributes is not None: | |
| print("Edge.decode(): " | |
| "interpreting deprecated ‘properties’ on edge object.", | |
| file = sys.stderr); | |
| values = json.get("values", None) | |
| anchors = json.get("anchors", None) | |
| return Edge(id, src, tgt, lab, normal, attributes, values, anchors) | |
| def dot(self, stream, input = None, strings = False, | |
| errors = None, overlay = False): | |
| def __missing__(): | |
| if errors is not None and "edges" in errors \ | |
| and "missing" in errors["edges"]: | |
| for source, target, label in errors["edges"]["missing"]: | |
| if source == self.src and target == self.tgt and label == self.lab: | |
| return True; | |
| return False; | |
| if self.attributes and self.values: | |
| style = ", style=dashed"; | |
| label = "<<table align=\"center\" border=\"0\" cellspacing=\"0\">"; | |
| if self.lab: label += "<tr><td colspan=\"1\">{}</td></tr>".format(self.lab); | |
| # | |
| # _fix_me_ | |
| # currently assuming that all values are boolean where presence of | |
| # the attribute means True. (oe; 21-apr-20) | |
| # | |
| if self.attributes and self.values: | |
| for attribute, _ in zip(self.attributes, self.values): | |
| label += "<tr><td>{}</td></tr>".format(attribute); | |
| label += "</table>>"; | |
| else: | |
| label = self.lab; | |
| if label and self.normal: | |
| if label[:-3] == self.normal: | |
| label = "(" + self.normal + ")-of"; | |
| else: | |
| label = label + " (" + self.normal + ")"; | |
| if label: label = "\"{}\"".format(label); | |
| style = ""; | |
| if overlay: | |
| color = ", color=blue, fontcolor=blue"; | |
| elif __missing__(): | |
| color = ", color=red, fontcolor=red"; | |
| else: | |
| color = ""; | |
| print(" {} -> {} [ label={}{}{} ];" | |
| "".format(self.src, self.tgt, label if label else "\"\"", | |
| style, color), | |
| file = stream); | |
| def __key(self): | |
| return self.tgt, self.src, self.lab | |
| def __eq__(self, other): | |
| return self.__key() == other.__key() | |
| def __lt__(self, other): | |
| return self.__key() < other.__key() | |
| def __hash__(self): | |
| return hash(self.__key()) | |
| class Graph(object): | |
| def __init__(self, id, flavor = None, framework = None): | |
| self.id = id; | |
| self.time = datetime.utcnow(); | |
| self._language = None; | |
| self._provenance = None; | |
| self._source = None; | |
| self._targets = None; | |
| self.input = None; | |
| self.nodes = []; | |
| self.edges = set(); | |
| self.flavor = FLAVORS.get(framework) if flavor is None else flavor; | |
| self.framework = framework; | |
| def language(self, value = None): | |
| if value is not None: self._language = value; | |
| return self._language; | |
| def provenance(self, value = None): | |
| if value is not None: self._provenance = value; | |
| return self._provenance; | |
| def source(self, value = None): | |
| if value is not None: self._source = value; | |
| return self._source; | |
| def targets(self, value = None): | |
| if value is not None: self._targets = value; | |
| return self._targets; | |
| def size(self): | |
| return len(self.nodes); | |
| def inject(self, information): | |
| if isinstance(information, str): information = eval(information); | |
| for key, value in information.items(): | |
| if key == "id": self.id = value; | |
| elif key == "time": self.item = value; | |
| elif key == "language": self._language = value; | |
| elif key == "provenance": self._provenance = value; | |
| elif key == "source": self._source = value; | |
| elif key == "targets": self._targets = value; | |
| elif key == "input": self.input = value; | |
| elif key == "flavor": self.flavor = value; | |
| elif key == "framework": self.framework = value; | |
| else: | |
| print("Graph.inject(): ignoring invalid key ‘{}’" | |
| "".format(key), file = sys.stderr); | |
| def add_node(self, id = None, label = None, | |
| properties = None, values = None, | |
| anchors = None, top = False, type = 1, anchorings = None): | |
| node = Node(id if id is not None else len(self.nodes), | |
| label = label, properties = properties, values = values, | |
| anchors = anchors, top = top, type = type, | |
| anchorings = anchorings); | |
| self.nodes.append(node) | |
| return node | |
| def find_node(self, id): | |
| for node in self.nodes: | |
| if node.id == id: return node; | |
| def add_edge(self, src, tgt, lab, normal = None, | |
| attributes = None, values = None, anchors = None): | |
| self.store_edge(Edge(id=len(self.edges), src=src, tgt=tgt, lab=lab, normal=normal, | |
| attributes=attributes, values=values, anchors=anchors)); | |
| def store_edge(self, edge, robust = False): | |
| self.edges.add(edge) | |
| source = self.find_node(edge.src); | |
| if source is None and not robust: | |
| raise ValueError("Graph.add_edge(): graph #{}: " | |
| "invalid source node {}." | |
| "".format(self.id, self.src)) | |
| if source: source.outgoing_edges.add(edge) | |
| target = self.find_node(edge.tgt); | |
| if target is None and not robust: | |
| raise ValueError("Graph.add_edge(): graph #{}: " | |
| "invalid target node {}." | |
| "".format(self.id, self.tgt)) | |
| if target: target.incoming_edges.add(edge) | |
| return edge | |
| def add_input(self, text, id = None, quiet = False): | |
| if not id: id = self.id; | |
| if isinstance(text, str): | |
| self.input = text; | |
| elif isinstance(text, Path): | |
| file = text / (str(id) + ".txt"); | |
| if not file.exists() and not quiet: | |
| print("add_input(): no text for {}.".format(file), | |
| file = sys.stderr); | |
| else: | |
| with file.open() as stream: | |
| input = stream.readline(); | |
| if input.endswith("\n"): input = input[:len(input) - 1]; | |
| self.input = input; | |
| else: | |
| input = text.get(id); | |
| if input: | |
| self.input = input; | |
| elif not quiet: | |
| print("add_input(): no text for key {}.".format(id), | |
| file = sys.stderr); | |
| def anchor(self): | |
| n = len(self.input); | |
| i = 0; | |
| def skip(): | |
| nonlocal i; | |
| while i < n and self.input[i] in {" ", "\t"}: | |
| i += 1; | |
| def scan(candidates): | |
| for candidate in candidates: | |
| if self.input.startswith(candidate, i): | |
| return len(candidate); | |
| skip(); | |
| for node in self.nodes: | |
| for j in range(len(node.anchors) if node.anchors else 0): | |
| if isinstance(node.anchors[j], str): | |
| form = node.anchors[j]; | |
| m = None; | |
| if self.input.startswith(form, i): | |
| m = len(form); | |
| else: | |
| for old, new in {("‘", "`"), ("’", "'"), ("`", "'"), | |
| ("“", "\""), ("”", "\"")}: | |
| form = form.replace(old, new); | |
| if self.input.startswith(form, i): | |
| m = len(form); | |
| break; | |
| # | |
| # _fix_me_ | |
| # the block below looks weird: it would seem to accept any | |
| # of the punctuation marks given to scan(), irrespective | |
| # of the current .form. value? (oe; 27-apr-20) | |
| # | |
| if not m: | |
| m = scan({"“", "\"", "``"}) or scan({"‘", "`"}) \ | |
| or scan({"”", "\"", "''"}) or scan({"’", "'"}) \ | |
| or scan({"—", "—", "---", "--"}) \ | |
| or scan({"…", "...", ". . ."}); | |
| if m: | |
| node.anchors[j] = {"from": i, "to": i + m}; | |
| i += m; | |
| skip(); | |
| else: | |
| raise Exception("failed to anchor |{}| in |{}| ({})" | |
| "".format(form, self.input, i)); | |
| def normalize(self, actions, trace = 0): | |
| for node in self.nodes: | |
| node.normalize(actions, self.input, trace); | |
| for edge in self.edges: | |
| edge.normalize(actions, trace); | |
| # | |
| # recompute cached edge relations, to reflect the new state of affairs | |
| # | |
| if "edges" in actions: | |
| for node in self.nodes: | |
| node.outgoing_edges.clear(); | |
| node.incoming_edges.clear(); | |
| for edge in self.edges: | |
| self.find_node(edge.src).outgoing_edges.add(edge); | |
| self.find_node(edge.tgt).incoming_edges.add(edge); | |
| def prettify(self, trace = 0): | |
| if self.framework == "drg": | |
| boxes = {"IMP", "DIS", "DUP", "NOT", "POS", "NEC", | |
| "ALTERNATION", "ATTRIBUTION", "BACKGROUND", | |
| "COMMENTARY", "CONDITION", "CONTINUATION", "CONTRAST", | |
| "CONSEQUENCE", "ELABORATION", "EXPLANATION", "INSTANCE", | |
| "NARRATION", "NEGATION", "NECESSITY", | |
| "POSSIBILITY", "PARALLEL", "PRECONDITION", | |
| "RESULT", "TOPIC", "PRESUPPOSITION"}; | |
| for node in self.nodes: | |
| if node.is_top or node.is_root(): | |
| node.type = 0; | |
| # | |
| # _fix_me_ | |
| # but what about more deeply nested boxes? (24-aug-20; oe) | |
| # | |
| for edge in node.outgoing_edges: | |
| if edge.lab in boxes: | |
| self.find_node(edge.tgt).type = 0; | |
| elif len(node.incoming_edges) == len(node.outgoing_edges) == 1: | |
| if next(iter(node.incoming_edges)).lab is None \ | |
| and next(iter(node.outgoing_edges)).lab is None: | |
| node.type = 2; | |
| def score(self, graph, correspondences, errors = None): | |
| # | |
| # accommodate the various conventions for node correspondence matrices; | |
| # anyway, entries are indices into the .nodes. list, not identifiers. | |
| # _fix_me_ | |
| # double-check for correspondences from SMATCH. (oe; 19-apr-20) | |
| # | |
| if isinstance(correspondences, list) and len(correspondences) > 0: | |
| if isinstance(correspondences[0], tuple): | |
| correspondences = {i: j if j is not None else -1 | |
| for i, j in correspondences}; | |
| elif isinstance(correspondences[0], int): | |
| correspondences = {i: j if j is not None else -1 | |
| for i, j in enumerate(correspondences)}; | |
| # | |
| # all tuples use node identifiers from the gold graph, where there is | |
| # a correspondence; otherwise we (appear to) synthesize new unique | |
| # identifiers for remaining nodes from both graphs. | |
| # | |
| identities1 = dict(); | |
| identities2 = dict(); | |
| for i, pair in enumerate(correspondences.items()): | |
| identities1[self.nodes[pair[0]].id] = i; | |
| if pair[1] >= 0: | |
| identities2[graph.nodes[pair[1]].id] = i; | |
| i = len(correspondences); | |
| for node in self.nodes: | |
| if node.id not in identities1: | |
| identities1[node.id] = i; | |
| i += 1; | |
| for node in graph.nodes: | |
| if node.id not in identities2: | |
| identities2[node.id] = i; | |
| i += 1; | |
| # | |
| # map 'corresponding' identifiers back to the original graphs | |
| # | |
| def native(id, identities): | |
| for key, value in identities.items(): | |
| if id == value: return key; | |
| def tuples(graph, identities): | |
| # | |
| # .identities. is a hash table mapping node identifiers into the | |
| # 'corresponding' identifier space, such that paired nodes (and | |
| # only these) share the same identifier. | |
| # | |
| def identify(id): | |
| return identities[id] if identities is not None else id; | |
| tops = set(); | |
| labels = set(); | |
| properties = set(); | |
| anchors = set(); | |
| edges = set(); | |
| attributes = set(); | |
| for node in graph.nodes: | |
| identity = identify(node.id); | |
| if node.is_top: tops.add(identity); | |
| if node.label is not None: labels.add((identity, node.label)); | |
| if node.properties is not None: | |
| for property, value in zip(node.properties, node.values): | |
| properties.add((identity, property, value.lower())); | |
| if node.anchors is not None: | |
| anchor = score.core.anchor(node); | |
| if graph.input: | |
| anchor = score.core.explode(graph.input, anchor); | |
| else: | |
| anchor = tuple(anchor); | |
| anchors.add((identity, anchor)); | |
| for edge in graph.edges: | |
| identity \ | |
| = (identify(edge.src), identify(edge.tgt), edge.lab); | |
| edges.add(identity); | |
| if edge.attributes and edge.values: | |
| for attribute, value in zip(edge.attributes, edge.values): | |
| attributes.add(tuple(list(identity) + [attribute, value])); | |
| return tops, labels, properties, anchors, edges, attributes; | |
| def count(gold, system, key): | |
| if errors is not None: | |
| missing = gold - system; | |
| surplus = system - gold; | |
| if len(missing) > 0 or len(surplus) > 0 and key not in errors: | |
| errors[key] = dict(); | |
| if key == "tops": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [native(id, identities1) for id in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [native(id, identities2) for id in surplus]; | |
| elif key == "labels": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [(native(id, identities1), label) | |
| for id, label in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [(native(id, identities2), label) | |
| for id, label in surplus]; | |
| elif key == "properties": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [(native(id, identities1), property, value) | |
| for id, property,value in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [(native(id, identities2), property, value) | |
| for id, property, value in surplus]; | |
| elif key == "anchors": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [(native(id, identities1), list(sorted(anchor))) | |
| for id, anchor in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [(native(id, identities2), list(sorted(anchor))) | |
| for id, anchor in surplus]; | |
| elif key == "edges": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [(native(source, identities1), | |
| native(target, identities1), label) | |
| for source, target, label in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [(native(source, identities2), | |
| native(target, identities2), label) | |
| for source, target, label in surplus]; | |
| elif key == "attributes": | |
| if missing: | |
| errors[key]["missing"] \ | |
| = [(native(source, identities1), | |
| native(target, identities1), label, | |
| attribute, value) | |
| for source, target, label, attribute, value | |
| in missing]; | |
| if surplus: | |
| errors[key]["surplus"] \ | |
| = [(native(source, identities2), | |
| native(target, identities2), label, | |
| attribute, value) | |
| for source, target, label, attribute, value | |
| in surplus]; | |
| return {"g": len(gold), "s": len(system), "c": len(gold & system)}; | |
| if correspondences is None or len(correspondences) == 0: | |
| return count(set(), set()), count(set(), set()), \ | |
| count(set(), set()), count(set(), set()), \ | |
| count(set(), set()), count(set(), set()); | |
| gtops, glabels, gproperties, ganchors, gedges, gattributes \ | |
| = tuples(self, identities1); | |
| stops, slabels, sproperties, sanchors, sedges, sattributes \ | |
| = tuples(graph, identities2); | |
| if errors is not None: | |
| errors[self.framework][self.id] = errors \ | |
| = {"correspondences": [(self.nodes[g].id, graph.nodes[s].id) | |
| for g, s in correspondences.items() | |
| if s >= 0]} | |
| return count(gtops, stops, "tops"), \ | |
| count(glabels, slabels, "labels"), \ | |
| count(gproperties, sproperties, "properties"), \ | |
| count(ganchors, sanchors, "anchors"), \ | |
| count(gedges, sedges, "edges"), \ | |
| count(gattributes, sattributes, "attributes"); | |
| def encode(self, version = 1.1): | |
| json = {"id": self.id}; | |
| if self.flavor is not None: | |
| json["flavor"] = self.flavor; | |
| if self.framework: | |
| json["framework"] = self.framework; | |
| json["version"] = version; | |
| if self.time is not None: | |
| json["time"] = self.time.strftime("%Y-%m-%d"); | |
| else: | |
| json["time"] = datetime.now().strftime("%Y-%m-%d"); | |
| if self._language is not None: json["language"] = self._language; | |
| if self._source is not None: json["source"] = self._source; | |
| if self._provenance is not None: json["provenance"] = self._provenance; | |
| if self._targets is not None: json["targets"] = self._targets; | |
| if self.input: | |
| json["input"] = self.input; | |
| if self.nodes: | |
| tops = [node.id for node in self.nodes if node.is_top]; | |
| if len(tops): | |
| json["tops"] = tops; | |
| json["nodes"] = [node.encode() for node in self.nodes]; | |
| if self.edges: | |
| json["edges"] = [edge.encode() for edge in | |
| sorted(self.edges, key = operator.attrgetter("id"))]; | |
| return json; | |
| def decode(json, robust = False): | |
| graph = Graph(json["id"], json.get("flavor"), json.get("framework")) | |
| try: | |
| graph.time = datetime.strptime(json["time"], "%Y-%m-%d") | |
| except: | |
| graph.time = datetime.strptime(json["time"], "%Y-%m-%d (%H:%M)") | |
| graph.input = json.get("input") | |
| graph.language(json.get("language")) | |
| graph.source(json.get("source")) | |
| graph.provenance(json.get("provenance")) | |
| graph.targets(json.get("targets")) | |
| nodes = json.get("nodes") | |
| if nodes is not None: | |
| for j in nodes: | |
| node = Node.decode(j) | |
| graph.add_node(node.id, node.label, node.properties, | |
| node.values, node.anchors, top = False, anchorings=node.anchorings) | |
| edges = json.get("edges") | |
| if edges is not None: | |
| for j in edges: | |
| edge = Edge.decode(j); | |
| if edge.id is None: edge.id = len(graph.edges); | |
| graph.store_edge(edge, robust = robust); | |
| tops = json.get("tops") | |
| if tops is not None: | |
| for i in tops: | |
| node = graph.find_node(i) | |
| if node is not None: | |
| node.is_top = True | |
| else: | |
| raise ValueError("Graph.decode(): graph #{}: " | |
| "invalid top node {}." | |
| "".format(graph.id, i)) | |
| return graph | |
| def copy(self): | |
| return Graph.decode(self.encode()) | |
| def dot(self, stream, ids = False, strings = False, | |
| errors = None, overlay = False): | |
| if not overlay: | |
| print("digraph \"{}\" {{\n top [ style=invis ];" | |
| "".format(self.id), | |
| file = stream); | |
| for node in self.nodes: | |
| if node.is_top: | |
| if overlay: | |
| color = " [ color=blue ]"; | |
| elif errors is not None and "tops" in errors \ | |
| and "missing" in errors["tops"] and node.id in errors["tops"]["missing"]: | |
| color = " [ color=red ]"; | |
| else: | |
| color = ""; | |
| print(" top -> {}{};".format(node.id, color), file = stream); | |
| n = -1; | |
| for node in self.nodes: | |
| node.dot(stream, self.input, ids, strings, errors, overlay); | |
| for edge in self.edges: | |
| if node.id == edge.src: | |
| edge.dot(stream, self.input, strings, errors, overlay); | |
| if errors is not None: | |
| surplus = Graph(self.id, flavor = self.flavor, framework = self.framework); | |
| surplus.add_input(self.input); | |
| mapping = dict(); | |
| correspondences = {s: g for g, s in errors["correspondences"]}; | |
| if "labels" in errors and "surplus" in errors["labels"]: | |
| for id, label in errors["labels"]["surplus"]: | |
| if id not in correspondences: | |
| mapping[id] = surplus.add_node(id = n, label = label); | |
| n -= 1; | |
| if "properties" in errors and "surplus" in errors["properties"]: | |
| for id, property, value in errors["properties"]["surplus"]: | |
| if id not in correspondences: | |
| if id in mapping: | |
| mapping[id].set_property(property, value); | |
| else: | |
| mapping[id] = surplus.add_node(id = n, | |
| properties = [property], | |
| values = [value]); | |
| n -= 1; | |
| if "anchors" in errors and "surplus" in errors["anchors"]: | |
| for id, anchor in errors["anchors"]["surplus"]: | |
| if id not in correspondences: | |
| if id in mapping: | |
| mapping[id].anchors = anchor; | |
| else: | |
| mapping[id] = surplus.add_node(id = n, anchors = anchor); | |
| n -= 1; | |
| if "tops" in errors and "surplus" in errors["tops"]: | |
| for id in errors["tops"]["surplus"]: | |
| if id in correspondences: | |
| print(" top -> {} [ color=blue ];" | |
| "".format(correspondences[id]), file = stream); | |
| elif id not in mapping: | |
| mapping[id] = surplus.add_node(id = n, top = True); | |
| n -= 1; | |
| else: | |
| mapping[id].is_root = True; | |
| if "edges" in errors and "surplus" in errors["edges"]: | |
| for source, target, label in errors["edges"]["surplus"]: | |
| if source not in mapping: | |
| try: | |
| mapping[source] = surplus.add_node(correspondences[source]); | |
| except KeyError: | |
| mapping[source] = surplus.add_node(n); | |
| n -= 1; | |
| if target not in mapping: | |
| try: | |
| mapping[target] = surplus.add_node(correspondences[target]); | |
| except KeyError: | |
| mapping[target] = surplus.add_node(n); | |
| n -= 1; | |
| surplus.add_edge(mapping[source].id, mapping[target].id, label); | |
| surplus.dot(stream, ids = ids, strings = strings, errors = None, overlay = True); | |
| if not overlay: print("}", file = stream); | |
| def tikz(self, stream): | |
| if self.flavor != 0: # bi-lexical: use tikz-dependency | |
| raise ValueError("TikZ visualization is currently only for flavor-0 graphs.") | |
| graph = self._full_sentence_recovery() # a copy of self with nodes covering all tokens | |
| print(r"\documentclass{article}", file=stream) | |
| print(r"\usepackage[T1]{fontenc}", file=stream) | |
| print(r"\usepackage[utf8]{inputenc}", file=stream) | |
| print(r"\usepackage{tikz-dependency}", file=stream) | |
| print(r"\begin{document}", file=stream) | |
| print(r"\begin{dependency}", file=stream) | |
| print(r"\begin{deptext}", file=stream) | |
| print(r"% id = " + str(graph.id), file=stream) | |
| if graph.input is not None: | |
| print(r"% input = " + str(graph.input), file=stream) | |
| sorted_nodes = sorted((node.id, node) for node in graph.nodes) | |
| id2i = {id: i for i, (id, _) in enumerate(sorted_nodes, start=1)} | |
| print(r" \& ".join(" ".join(graph.input[anchor["from"]:anchor["to"]] for anchor in node.anchors or ()) | |
| or node.label for _, node in sorted_nodes) + r" \\", file=stream) | |
| print(r"\end{deptext}", file=stream) | |
| for id, node in sorted_nodes: | |
| if node.is_top: | |
| print(r"\deproot{" + str(id2i[id]) + r"}{TOP}", file=stream) | |
| for edge in graph.edges: | |
| if node.id == edge.tgt: | |
| print(r"\depedge{" + str(id2i[edge.src]) + r"}{" + str(id2i[id]) + r"}{" + str(edge.lab) + r"}", file=stream) | |
| print(r"\end{dependency}", file=stream) | |
| print(r"\end{document}", file=stream) | |
| def displacy(self, stream=None, format="svg", **kwargs): | |
| """ | |
| Use displacy to present dependency graph over sentence. | |
| :param format: can be either "svg" or "html". | |
| kwargs are passed to displacy.render method, see https://spacy.io/usage/visualizers | |
| for possible options. | |
| One can omit the stream argument if specifying `jupyter=True` - this will render the visualization directly | |
| to the jupyter notebook. | |
| """ | |
| assert stream or kwargs.get("jupyter"), "Either `stream` is given or `jupyter=True` must hold." | |
| assert format in ("svg", "html"), 'format can be either "svg" or "html"' | |
| try: | |
| from spacy import displacy | |
| except ModuleNotFoundError as e: | |
| print("You must install SpaCy in order to use the displacy visualization. \nTry running `pip install spacy`.") | |
| raise e | |
| if self.flavor != 0: # currently supporting only bi-lexical graphs | |
| raise ValueError("displacy visualization is currently only for flavor-0 graphs.") | |
| graph = self._full_sentence_recovery() # a copy of self with nodes covering all tokens | |
| # prepare displacy_dep_input, composed of `words` list and `arcs` list | |
| words = [{"text": n.label, "tag": ""} for n in graph.nodes] | |
| def get_arc(edge: Edge): | |
| src, tgt = edge.src, edge.tgt | |
| direction = u'right' if src < tgt else u'left' | |
| return {'dir': direction, | |
| 'start': min(src, tgt), | |
| 'end': max(src, tgt), | |
| 'label': edge.lab} | |
| arcs = [get_arc(edge) for edge in graph.edges] | |
| displacy_dep_input = {'words': words, 'arcs': arcs} | |
| # render to stream as svg or html | |
| kwargs["page"] = format=="html" | |
| markdown = displacy.render(displacy_dep_input, style='dep', manual=True, **kwargs) | |
| # write svg text to a file | |
| if stream: | |
| stream.write(markdown) | |
| def _full_sentence_recovery(self): | |
| """ | |
| graph nodes may sometimes only include non-singleton nodes, for example when taking the graph from | |
| a model prediction. For this reason, we need to use anchors and the input sentence in order to recover | |
| the original tokenization (thus node-ids and their corresponding text spans). | |
| Here, when necessary, we assume the original tokenization is encoded with spaces in self.input. | |
| But we mainly look for missing character segments (i.e. spans that are not included in anchors) | |
| and produce singleton nodes for them. | |
| The function returns a new Graph, in which recovered nodes are included and thus nodes correspond to | |
| input tokens. | |
| """ | |
| graph = self.copy() # don't change | |
| length = len(graph.input) | |
| def rm_all(lst, items_to_remove): | |
| for item in items_to_remove: | |
| if item in lst: | |
| lst.remove(item) | |
| return lst | |
| def group_consecutive(lst): | |
| # get list of integers, return list of lists, each the maximal consecutive (increasing) set from lst | |
| if not lst: | |
| return [] | |
| groups = [] | |
| cur_group=[lst[0]] | |
| for i,item in enumerate(lst[1:]): | |
| if item-1 == cur_group[-1]: | |
| cur_group.append(item) | |
| else: | |
| groups.append(cur_group) | |
| cur_group = [item] | |
| groups.append(cur_group) | |
| return groups | |
| # iterate missing ids | |
| node_ids = [n.id for n in graph.nodes] | |
| id2node = {n.id : n for n in graph.nodes} | |
| max_id = max(node_ids) | |
| missing_ids = rm_all(list(range(max_id)), node_ids) | |
| missing_id_groups = group_consecutive(missing_ids) | |
| for id_group in missing_id_groups: | |
| # id_group is a list of consecutive missing ids | |
| if id_group[0]==0: | |
| begin_char = 0 | |
| else: | |
| prev_id = id_group[0]-1 # the id of the existing node preceding the missing-id group | |
| prev_node = id2node[prev_id] | |
| begin_char = prev_node.anchors[0]['to'] | |
| next_id = id_group[-1]+1 | |
| if next_id in id2node: | |
| next_node = id2node[next_id] | |
| end_char = next_node.anchors[0]['from'] | |
| else: | |
| end_char = length | |
| omitted_span = graph.input[begin_char:end_char] | |
| # we need to create len(id_group) new nodes for the omitted span. | |
| # Try to align singleton node (i.e. one id) to a token; if num of tokens in omitted_span | |
| # don't match num of missing ids, generate all these nodes with the same anchors to the whole span | |
| tokens = omitted_span.strip().split() | |
| if len(tokens) == len(id_group): | |
| for token, new_id in zip(tokens, id_group): | |
| tok_begin_char = begin_char + omitted_span.find(token) | |
| tok_end_char = tok_begin_char + len(token) | |
| # add new node corresponding to omitted token | |
| graph.add_node(new_id, label=token, anchors=[{"from":tok_begin_char, "to":tok_end_char}]) | |
| else: | |
| # add new nodes, all corresponding to omitted span | |
| for new_id in id_group: | |
| graph.add_node(new_id, label=omitted_span, anchors=[{"from": begin_char, "to": end_char}]) | |
| # special treatment is required for missing tokens after the last existing node | |
| # (if there are tokens left in self.input not covered by node anchors) | |
| last_end_char_of_nodes = max([n.anchors[0]['to'] for n in graph.nodes]) | |
| if last_end_char_of_nodes < length: | |
| # the meaning is that there is some span of the sentence not covered; | |
| # we will add nodes according to num of tokens in this last span | |
| omitted_span = graph.input[last_end_char_of_nodes:] | |
| for i,token in enumerate(omitted_span.strip().split()): | |
| new_id = max_id+1+i | |
| tok_begin_char = last_end_char_of_nodes + omitted_span.find(token) | |
| tok_end_char = tok_begin_char + len(token) | |
| graph.add_node(new_id, label=token, anchors=[{"from":tok_begin_char, "to":tok_end_char}]) | |
| # as a finish, sort nodes in graph so that they will again be ordered by id (& realization location) | |
| graph.nodes = list(sorted(graph.nodes)) | |
| return graph | |