File size: 8,635 Bytes
72c0672
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
from collections import defaultdict, abc
from typing import cast

from docutils import nodes
from docutils.parsers.rst import Directive

import sphinx
from sphinx.locale import _
from sphinx.util.docutils import SphinxDirective
from sphinx.errors import ExtensionError

from conf import languages as LANGUAGES

logger = sphinx.util.logging.getLogger(__name__)

GLOBALNAME = "$GLOBAL$"


def update(d, u):
    for k, v in u.items():
        if isinstance(v, abc.Mapping):
            d[k] = update(d.get(k, {}), v)
        else:
            d[k] = v
    return d


class EntityNode(nodes.General, nodes.Element):
    pass


class EntitiesNode(nodes.General, nodes.Element):
    pass


class AllEntities:
    def __init__(self):
        self.entities = defaultdict(dict)

    @classmethod
    def install(cls, env):
        if not hasattr(env, "entity_all_entities"):
            entities = cls()
            env.entity_all_entities = entities
        return env.entity_all_entities

    def merge(self, other):
        self.entities.update(other.entities)

    def purge(self, docname):
        for env_docname in [GLOBALNAME, docname]:
            self.entities[env_docname] = dict(
                [
                    (name, entity)
                    for name, entity in self.entities[env_docname].items()
                    if entity["docname"] != docname
                ]
            )

    def _extract_entities(self, nodes):
        pass

    def _extract_options(self, nodes):
        pass

    def _add_entities(self, entities, language, is_global, docname):
        scope = GLOBALNAME if is_global else docname
        for entity in entities:
            name = f'{language}-{entity["name"]}'
            content = entity["content"]

            if name in self.entities[scope]:
                logger.warning(
                    f'Entity "{name}" has already been defined{" globally" if is_global else ""}',
                    location=docname,
                )

            self.entities[scope][name] = {"docname": docname, "content": content}

    def _extract_global(self, nodes):
        for node in nodes:
            if node.tagname != "field":
                raise Exception(f"Expected a field, found {node.tagname}")

            name, _ = node.children
            if name.tagname != "field_name":
                raise Exception(f"Expected a field name here, found {name_node.tagname}")

            if str(name.children[0]) == "global":
                return True

    def _extract_entities(self, nodes):
        entities = []
        for node in nodes:
            if node.tagname != "definition_list_item":
                raise Exception(f"Expected a list item here, found {node.tagname}")

            name_node, content_node = node.children
            if name_node.tagname != "term":
                raise Exception(f"Expected a term here, found {name_node.tagname}")
            if content_node.tagname != "definition":
                raise Exception(f"Expected a definition here, found {content_node.tagname}")

            name = str(name_node.children[0])
            if len(content_node.children) == 1 and content_node.children[0].tagname == "paragraph":
                content = content_node.children[0].children[0]
            else:
                content = content_node

            entities.append({"name": name, "content": content})
        return entities

    def extract(self, node, docname):
        is_global = False
        entities = []

        language = None
        for node in node.children:
            if language is None and node.tagname != "paragraph":
                raise Exception(f"Expected language name:\n.. entities:: <LANGUAGE>")
            elif language is None and node.tagname == "paragraph":
                language = str(node.children[0])
                if language not in LANGUAGES:
                    raise Exception(
                        f'Unknown language "{language}. Might be missing a newline after language"'
                    )
            elif node.tagname == "field_list":
                is_global = self._extract_global(node.children)
            elif node.tagname == "definition_list":
                entities.extend(self._extract_entities(node.children))
            else:
                raise Exception(f"Expected a list of terms/options, found {node.tagname}")

        self._add_entities(entities, language, is_global, docname)

    def resolve_pendings(self, app):
        env = app.builder.env

        updates = defaultdict(dict)
        for env_docname in self.entities.keys():
            for name, entity in self.entities[env_docname].items():
                docname = entity["docname"]
                node = entity["content"]

                for node in node.traverse(sphinx.addnodes.pending_xref):
                    contnode = cast(nodes.TextElement, node[0].deepcopy())
                    newnode = None

                    typ = node["reftype"]
                    target = node["reftarget"]
                    refdoc = node.get("refdoc", docname)
                    domain = None

                    try:
                        if "refdomain" in node and node["refdomain"]:
                            # let the domain try to resolve the reference
                            try:
                                domain = env.domains[node["refdomain"]]
                            except KeyError as exc:
                                raise NoUri(target, typ) from exc
                            newnode = domain.resolve_xref(
                                env, refdoc, app.builder, typ, target, node, contnode
                            )
                    except NoUri:
                        newnode = contnode

                    updates[env_docname][name] = {
                        "docname": docname,
                        "content": newnode or contnode,
                    }

        update(self.entities, updates)

    def get(self, language, name, docname):
        name = f"{language}-{name}"
        if name in self.entities[docname]:
            return self.entities[docname][name]
        elif name in self.entities[GLOBALNAME]:
            return self.entities[GLOBALNAME][name]
        else:
            return None


class EntitiesDirective(SphinxDirective):
    has_content = True

    def run(self):
        content = nodes.definition_list()
        self.state.nested_parse(self.content, self.content_offset, content)

        try:
            entities = AllEntities.install(self.env)
            entities.extract(content, self.env.docname)
        except Exception as err:
            raise self.error(f'Malformed directive "entities": {err}')

        return []


def entity_role(name, rawtext, text, lineno, inliner, options={}, content=[]):
    node = EntityNode()
    node.entity = text

    return [node], []


def process_entity_nodes(app, doctree, docname):
    """ Replace all the entities by their content """
    env = app.builder.env

    entities = AllEntities.install(env)
    entities.resolve_pendings(app)

    language = None
    try:
        language = next(l for l in LANGUAGES if l in app.tags)
    except Exception:
        logger.warning(f"No language tag specified, not resolving entities in {docname}")

    for node in doctree.traverse(EntityNode):
        if language is None:
            node.replace_self(nodes.Text(_(node.entity), _(node.entity)))
        else:
            entity = entities.get(language, node.entity, docname)
            if entity is None:
                node.replace_self(nodes.Text(_(node.entity), _(node.entity)))
                logger.warning(f'Entity "{node.entity}" has not been defined', location=node)
            else:
                node.replace_self(entity["content"])


def purge_entities(app, env, docname):
    """ Purge any entity that comes from the given docname """
    entities = AllEntities.install(env)
    entities.purge(docname)


def merge_entities(app, env, docnames, other):
    """ Merge multiple environment entities """
    entities = AllEntities.install(env)
    other_entities = AllEntities.install(other)
    entities.merge(other_entities)


def setup(app):
    app.add_node(EntityNode)
    app.add_node(EntitiesNode)
    app.add_directive("entities", EntitiesDirective)
    app.add_role("entity", entity_role)

    app.connect("doctree-resolved", process_entity_nodes)
    app.connect("env-merge-info", merge_entities)
    app.connect("env-purge-doc", purge_entities)

    return {
        "version": "0.1",
        "parallel_read_safe": True,
        "parallel_write_safe": True,
    }