| import re; |
| import sys |
| import xml.etree.ElementTree as ET |
| from itertools import groupby |
| from operator import attrgetter; |
| from pathlib import Path; |
|
|
| from graph import Graph; |
| from ucca import core, layer0, layer1, textutil; |
| from ucca.convert import to_standard |
| from ucca.ioutil import get_passages; |
|
|
|
|
| def convert_id(id, prefix): |
| m = re.search(r'wsj_([0-9]+)\.([0-9]+)', id); |
| if m: |
| return "2%04d%03d" % (int(m.group(1)), int(m.group(2))); |
| elif prefix: |
| return prefix + id; |
| else: |
| return id; |
|
|
|
|
| def passage2graph(passage, text=None, prefix=None): |
| graph = Graph(convert_id(passage.ID, prefix), flavor=1, framework="ucca"); |
| l0 = passage.layer(layer0.LAYER_ID); |
| l1 = passage.layer(layer1.LAYER_ID); |
| unit_id_to_node_id = {}; |
|
|
| n = None; |
| if text: |
| graph.add_input(text); |
| n = len(graph.input); |
| i = 0; |
|
|
| def skip(): |
| nonlocal i; |
| while i < n and graph.input[i] in {" ", "\t"}: |
| i += 1; |
|
|
| def scan(candidates): |
| for candidate in candidates: |
| if graph.input.startswith(candidate, i): |
| return len(candidate); |
|
|
| def anchor(form): |
| nonlocal i; |
| skip(); |
| m = None; |
| if graph.input.startswith(form, i): |
| m = len(form); |
| else: |
| for old, new in {("β", "`"), ("β", "'")}: |
| form = form.replace(old, new); |
| if graph.input.startswith(form, i): |
| m = len(form); |
| break; |
| if not m: |
| m = scan({"β", "\"", "``"}) or scan({"β", "`"}) \ |
| or scan({"β", "\"", "''"}) or scan({"β", "'"}) \ |
| or scan({"β", "β", "---", "--"}) \ |
| or scan({"β¦", "...", ". . ."}); |
| if m: |
| anchor = {"from": i, "to": i + m}; |
| i += m; |
| skip(); |
| return anchor; |
| else: |
| raise Exception("{}: failed to anchor |{}| in |{}| ({})" |
| "".format(graph.id, form, graph.input, i)); |
|
|
| non_terminals = [unit for unit in l1.all if unit.tag in (layer1.NodeTags.Foundational, layer1.NodeTags.Punctuation)] |
| for token in sorted(l0.all, key=attrgetter("position")): |
| for unit in non_terminals: |
| if not unit.attrib.get("implicit"): |
| for edge in unit: |
| if "Terminal" in edge.tags and token.ID == edge.child.ID: |
| if unit.ID in unit_id_to_node_id: |
| node = graph.find_node(unit_id_to_node_id[unit.ID]); |
| if graph.input: |
| node.anchors.append(anchor(token.text)); |
| else: |
| node = graph.add_node(anchors=[anchor(token.text)] if graph.input else None); |
| unit_id_to_node_id[unit.ID] = node.id; |
| for unit in sorted(non_terminals, key=attrgetter("start_position", "end_position")): |
| if not unit.attrib.get("implicit") and unit.ID not in unit_id_to_node_id: |
| node = graph.add_node(); |
| unit_id_to_node_id[unit.ID] = node.id; |
| for unit in non_terminals: |
| for edge in unit: |
| for tag in edge.tags: |
| if tag != "Terminal": |
| if edge.child.ID in unit_id_to_node_id: |
| attributes, values = None, None; |
| if edge.attrib.get("remote"): |
| attributes = ["remote"]; |
| values = [True]; |
| graph.add_edge(unit_id_to_node_id[unit.ID], |
| unit_id_to_node_id[edge.child.ID], |
| tag, |
| attributes=attributes, |
| values=values); |
| else: |
| |
| |
| |
| pass; |
| for unit in l1.heads: |
| node_id = unit_id_to_node_id.get(unit.ID) |
| if node_id is not None: |
| graph.nodes[node_id].is_top = True; |
| return graph |
|
|
|
|
| def read(fp, text=None, prefix=None): |
| parent = Path(fp.name).parent; |
| paths = [parent / file.strip() for file in fp]; |
| for passage in get_passages(map(str, paths)): |
| try: |
| graph = passage2graph(passage, text, prefix); |
| except Exception as exception: |
| print(exception); |
| continue; |
| yield graph, None; |
|
|
|
|
| def is_punct(node): |
| for edge in node.incoming_edges or (): |
| if edge.lab.upper() == "U": |
| return True |
| return False |
|
|
|
|
| def is_remote(edge): |
| for attribute, value in zip(edge.attributes or (), edge.values or ()): |
| if attribute == "remote" and value != "false": |
| return True |
| return False |
|
|
|
|
| def is_implicit(node): |
| for prop, value in zip(node.properties or (), node.values or ()): |
| if prop == "implicit" and value != "false": |
| return True |
| return False |
|
|
| def is_primary_root(node): |
| return all(is_remote(edge) for edge in node.incoming_edges) |
|
|
| def graph2passage(graph, input): |
| passage = core.Passage(graph.id) |
| l0 = layer0.Layer0(passage) |
| anchors = {(anchor["from"], anchor["to"], is_punct(node)) for node in graph.nodes for anchor in node.anchors or ()} |
| terminals = {(i, j): l0.add_terminal(text=input[i:j], punct=punct) for i, j, punct in sorted(anchors)} |
|
|
| l1 = layer1.Layer1(passage) |
| queue = [(node, None if node.is_top else layer1.FoundationalNode(root=l1.root, |
| tag=layer1.NodeTags.Foundational, |
| ID=l1.next_id())) |
| for node in graph.nodes if is_primary_root(node)] |
|
|
|
|
| id_to_unit = {node.id: unit for (node, unit) in queue} |
| remotes = [] |
| while queue: |
| parent, parent_unit = queue.pop(0) |
| for tgt, edges in groupby(sorted(parent.outgoing_edges, key=attrgetter("tgt")), key=attrgetter("tgt")): |
| edges = list(edges) |
| labels = [edge.lab for edge in edges] |
| if is_remote(edges[0]): |
| remotes.append((parent_unit, labels, tgt)) |
| else: |
| child = graph.find_node(tgt) |
| child_unit = id_to_unit[tgt] = l1.add_fnode_multiple(parent_unit, labels, implicit=is_implicit(child)) |
| queue.append((child, child_unit)) |
| for anchor in parent.anchors or (): |
| if parent_unit is None: |
| print("graph2passage(): anchors of the root node converted to Terminal children in β{}β." |
| "".format(graph.id), file=sys.stderr) |
| parent_unit = l1.heads[0] |
| parent_unit.add(layer1.EdgeTags.Terminal, terminals[anchor["from"], anchor["to"]]) |
| for parent, labels, tgt in remotes: |
| l1.add_remote_multiple(parent, labels, id_to_unit[tgt]) |
| return passage |
|
|
|
|
| def write(graph, input, file): |
| passage = graph2passage(graph, input) |
| root = to_standard(passage) |
| xml_string = ET.tostring(root).decode() |
| output = textutil.indent_xml(xml_string) |
| file.write(output) |
|
|