File size: 4,795 Bytes
41bef2b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
import re
from typing import Optional, Union
from loguru import logger
from pyshexc.parser_impl.generate_shexj import parse
from jsonasobj import as_json
from ShExJSG import ShExJ
from esgen.ShExC import ShExC
from pyjsg.jsglib import loads as jsg_loads
from rdflib import Graph
from rdflib.namespace import NamespaceManager
from esgen.utils import locate_comment, position_start_line
def shexc_to_shexj(shexc_text: str) -> tuple[str, Optional[str], Optional[Union[NamespaceManager, Graph]], Optional[list[dict]]]:
"""
converts ShExC text to ShExJ text
:param shexc_text:
:return:
"""
shexj_text = as_json(parse(shexc_text))
base = base_uri_parser_helper(shexc_text)
namespaces = namespaces_parser_helper(shexc_text)
comments = comment_parser_helper(shexc_text)
return shexj_text, base, namespaces, comments
def shexj_to_shexc(
shexj_text: str,
base: Optional[str],
namespaces: Optional[Union[NamespaceManager, Graph]],
comments: Optional[list[dict]]) -> str:
"""
converts ShExJ text to ShExC text
:param shexj_text:
:param base:
:param namespaces:
:param comments:
:return:
"""
shex_json: ShExJ.Schema = jsg_loads(shexj_text, ShExJ) # <class 'ShExJSG.ShExJ.Schema'>
shexc_text = str(ShExC(shex_json, base, namespaces))
shexc_text = insert_comments(shexc_text, comments)
return shexc_text
def base_uri_parser_helper(shexc_text: str) -> Optional[str]:
"""
:param shexc_text:
:return:
"""
base_pattern = r'^[Bb][Aa][Ss][Ee]\s+<(.+)>$'
for line in shexc_text.split("\n"):
match = re.match(base_pattern, line)
if match:
return match.group(1)
return None
def namespaces_parser_helper(inputs: str | dict) -> Optional[Union[NamespaceManager, Graph]]:
"""
:param inputs:
:return:
"""
g = Graph()
if type(inputs) is str:
prefix_pattern = r'^PREFIX\s+(\w+):\s+<(.+)>$'
for line in inputs.split("\n"):
match = re.match(prefix_pattern, line.strip())
if match:
prefix = match.group(1)
uri = match.group(2)
g.bind(prefix, uri)
elif type(inputs) is dict:
for prefix, uri in inputs.items():
g.bind(prefix, uri)
else:
logger.error("TypeError: Incorrect inputs type for namespaces parser.")
# print(dict(NamespaceManager(g).namespaces()))
return NamespaceManager(g)
def comment_parser_helper(shexc_text: str) -> list[dict]:
"""
Two types of comments are accepted by this helper function:
- general comments: comments before start line will be added as general comments
- constraint comments
- case 1: single line comments
- case 2: comments after the constraint
:param shexc_text:
:return:
"""
comments = list()
start_line_num = position_start_line(shexc_text)
shexc_lines = shexc_text.split("\n")
for idx, line in enumerate(shexc_lines):
# general comments
if idx < start_line_num:
if line.strip().startswith("#"):
comments.append({
"comment": line,
"type": "general",
"location": locate_comment(shexc_lines[idx:], "general")
})
# constraint comments
else:
if line.strip().startswith("#"):
comments.append({
"comment": line,
"type": "constraint",
"location": locate_comment(shexc_lines[idx:], "constraint")
})
elif "#" in line:
comments.append({
"comment": line[line.index("#"):],
"type": "constraint",
"location": locate_comment(shexc_lines[idx:], "constraint")
})
return comments
def insert_comments(shexc_text: str, comments: Optional[list[dict]]) -> str:
"""
:param shexc_text:
:param comments:
:return:
"""
shexc_lines = shexc_text.split("\n")
if not comments:
return shexc_text
for comment in comments[::-1]: # reverse the list during insertion since 'location' is the next line
if comment["location"] == 0:
shexc_lines.insert(0, comment["comment"])
continue
for idx, line in enumerate(shexc_lines):
if line == comment["location"] or line.rstrip(' ;') == comment["location"]:
if comment["type"] == "general":
shexc_lines.insert(idx, comment["comment"])
else:
shexc_lines[idx] = line.rstrip() + ' ' + comment["comment"].lstrip()
break
return '\n'.join(shexc_lines)
|