File size: 4,795 Bytes
41bef2b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import re
from typing import Optional, Union

from loguru import logger
from pyshexc.parser_impl.generate_shexj import parse
from jsonasobj import as_json

from ShExJSG import ShExJ
from esgen.ShExC import ShExC
from pyjsg.jsglib import loads as jsg_loads

from rdflib import Graph
from rdflib.namespace import NamespaceManager

from esgen.utils import locate_comment, position_start_line


def shexc_to_shexj(shexc_text: str) -> tuple[str, Optional[str], Optional[Union[NamespaceManager, Graph]], Optional[list[dict]]]:
    """
    converts ShExC text to ShExJ text
    :param shexc_text:
    :return:
    """
    shexj_text = as_json(parse(shexc_text))
    base = base_uri_parser_helper(shexc_text)
    namespaces = namespaces_parser_helper(shexc_text)
    comments = comment_parser_helper(shexc_text)
    return shexj_text, base, namespaces, comments


def shexj_to_shexc(
        shexj_text: str,
        base: Optional[str],
        namespaces: Optional[Union[NamespaceManager, Graph]],
        comments: Optional[list[dict]]) -> str:
    """
    converts ShExJ text to ShExC text
    :param shexj_text:
    :param base:
    :param namespaces:
    :param comments:
    :return:
    """
    shex_json: ShExJ.Schema = jsg_loads(shexj_text, ShExJ)  # <class 'ShExJSG.ShExJ.Schema'>
    shexc_text = str(ShExC(shex_json, base, namespaces))
    shexc_text = insert_comments(shexc_text, comments)
    return shexc_text


def base_uri_parser_helper(shexc_text: str) -> Optional[str]:
    """

    :param shexc_text:
    :return:
    """
    base_pattern = r'^[Bb][Aa][Ss][Ee]\s+<(.+)>$'
    for line in shexc_text.split("\n"):
        match = re.match(base_pattern, line)
        if match:
            return match.group(1)
    return None


def namespaces_parser_helper(inputs: str | dict) -> Optional[Union[NamespaceManager, Graph]]:
    """

    :param inputs:
    :return:
    """
    g = Graph()
    if type(inputs) is str:
        prefix_pattern = r'^PREFIX\s+(\w+):\s+<(.+)>$'
        for line in inputs.split("\n"):
            match = re.match(prefix_pattern, line.strip())
            if match:
                prefix = match.group(1)
                uri = match.group(2)
                g.bind(prefix, uri)
    elif type(inputs) is dict:
        for prefix, uri in inputs.items():
            g.bind(prefix, uri)
    else:
        logger.error("TypeError: Incorrect inputs type for namespaces parser.")
    # print(dict(NamespaceManager(g).namespaces()))
    return NamespaceManager(g)


def comment_parser_helper(shexc_text: str) -> list[dict]:
    """
    Two types of comments are accepted by this helper function:
    - general comments: comments before start line will be added as general comments
    - constraint comments
      - case 1: single line comments
      - case 2: comments after the constraint
    :param shexc_text:
    :return:
    """
    comments = list()
    start_line_num = position_start_line(shexc_text)
    shexc_lines = shexc_text.split("\n")
    for idx, line in enumerate(shexc_lines):
        # general comments
        if idx < start_line_num:
            if line.strip().startswith("#"):
                comments.append({
                    "comment": line,
                    "type": "general",
                    "location": locate_comment(shexc_lines[idx:], "general")
                })
        # constraint comments
        else:
            if line.strip().startswith("#"):
                comments.append({
                    "comment": line,
                    "type": "constraint",
                    "location": locate_comment(shexc_lines[idx:], "constraint")
                })
            elif "#" in line:
                comments.append({
                    "comment": line[line.index("#"):],
                    "type": "constraint",
                    "location": locate_comment(shexc_lines[idx:], "constraint")
                })
    return comments


def insert_comments(shexc_text: str, comments: Optional[list[dict]]) -> str:
    """

    :param shexc_text:
    :param comments:
    :return:
    """
    shexc_lines = shexc_text.split("\n")
    if not comments:
        return shexc_text
    for comment in comments[::-1]:  # reverse the list during insertion since 'location' is the next line
        if comment["location"] == 0:
            shexc_lines.insert(0, comment["comment"])
            continue
        for idx, line in enumerate(shexc_lines):
            if line == comment["location"] or line.rstrip(' ;') == comment["location"]:
                if comment["type"] == "general":
                    shexc_lines.insert(idx, comment["comment"])
                else:
                    shexc_lines[idx] = line.rstrip() + '  ' + comment["comment"].lstrip()
                break
    return '\n'.join(shexc_lines)