|
|
import re |
|
|
from typing import Optional, Union |
|
|
|
|
|
from loguru import logger |
|
|
from pyshexc.parser_impl.generate_shexj import parse |
|
|
from jsonasobj import as_json |
|
|
|
|
|
from ShExJSG import ShExJ |
|
|
from esgen.ShExC import ShExC |
|
|
from pyjsg.jsglib import loads as jsg_loads |
|
|
|
|
|
from rdflib import Graph |
|
|
from rdflib.namespace import NamespaceManager |
|
|
|
|
|
from esgen.utils import locate_comment, position_start_line |
|
|
|
|
|
|
|
|
def shexc_to_shexj(shexc_text: str) -> tuple[str, Optional[str], Optional[Union[NamespaceManager, Graph]], Optional[list[dict]]]: |
|
|
""" |
|
|
converts ShExC text to ShExJ text |
|
|
:param shexc_text: |
|
|
:return: |
|
|
""" |
|
|
shexj_text = as_json(parse(shexc_text)) |
|
|
base = base_uri_parser_helper(shexc_text) |
|
|
namespaces = namespaces_parser_helper(shexc_text) |
|
|
comments = comment_parser_helper(shexc_text) |
|
|
return shexj_text, base, namespaces, comments |
|
|
|
|
|
|
|
|
def shexj_to_shexc( |
|
|
shexj_text: str, |
|
|
base: Optional[str], |
|
|
namespaces: Optional[Union[NamespaceManager, Graph]], |
|
|
comments: Optional[list[dict]]) -> str: |
|
|
""" |
|
|
converts ShExJ text to ShExC text |
|
|
:param shexj_text: |
|
|
:param base: |
|
|
:param namespaces: |
|
|
:param comments: |
|
|
:return: |
|
|
""" |
|
|
shex_json: ShExJ.Schema = jsg_loads(shexj_text, ShExJ) |
|
|
shexc_text = str(ShExC(shex_json, base, namespaces)) |
|
|
shexc_text = insert_comments(shexc_text, comments) |
|
|
return shexc_text |
|
|
|
|
|
|
|
|
def base_uri_parser_helper(shexc_text: str) -> Optional[str]: |
|
|
""" |
|
|
|
|
|
:param shexc_text: |
|
|
:return: |
|
|
""" |
|
|
base_pattern = r'^[Bb][Aa][Ss][Ee]\s+<(.+)>$' |
|
|
for line in shexc_text.split("\n"): |
|
|
match = re.match(base_pattern, line) |
|
|
if match: |
|
|
return match.group(1) |
|
|
return None |
|
|
|
|
|
|
|
|
def namespaces_parser_helper(inputs: str | dict) -> Optional[Union[NamespaceManager, Graph]]: |
|
|
""" |
|
|
|
|
|
:param inputs: |
|
|
:return: |
|
|
""" |
|
|
g = Graph() |
|
|
if type(inputs) is str: |
|
|
prefix_pattern = r'^PREFIX\s+(\w+):\s+<(.+)>$' |
|
|
for line in inputs.split("\n"): |
|
|
match = re.match(prefix_pattern, line.strip()) |
|
|
if match: |
|
|
prefix = match.group(1) |
|
|
uri = match.group(2) |
|
|
g.bind(prefix, uri) |
|
|
elif type(inputs) is dict: |
|
|
for prefix, uri in inputs.items(): |
|
|
g.bind(prefix, uri) |
|
|
else: |
|
|
logger.error("TypeError: Incorrect inputs type for namespaces parser.") |
|
|
|
|
|
return NamespaceManager(g) |
|
|
|
|
|
|
|
|
def comment_parser_helper(shexc_text: str) -> list[dict]: |
|
|
""" |
|
|
Two types of comments are accepted by this helper function: |
|
|
- general comments: comments before start line will be added as general comments |
|
|
- constraint comments |
|
|
- case 1: single line comments |
|
|
- case 2: comments after the constraint |
|
|
:param shexc_text: |
|
|
:return: |
|
|
""" |
|
|
comments = list() |
|
|
start_line_num = position_start_line(shexc_text) |
|
|
shexc_lines = shexc_text.split("\n") |
|
|
for idx, line in enumerate(shexc_lines): |
|
|
|
|
|
if idx < start_line_num: |
|
|
if line.strip().startswith("#"): |
|
|
comments.append({ |
|
|
"comment": line, |
|
|
"type": "general", |
|
|
"location": locate_comment(shexc_lines[idx:], "general") |
|
|
}) |
|
|
|
|
|
else: |
|
|
if line.strip().startswith("#"): |
|
|
comments.append({ |
|
|
"comment": line, |
|
|
"type": "constraint", |
|
|
"location": locate_comment(shexc_lines[idx:], "constraint") |
|
|
}) |
|
|
elif "#" in line: |
|
|
comments.append({ |
|
|
"comment": line[line.index("#"):], |
|
|
"type": "constraint", |
|
|
"location": locate_comment(shexc_lines[idx:], "constraint") |
|
|
}) |
|
|
return comments |
|
|
|
|
|
|
|
|
def insert_comments(shexc_text: str, comments: Optional[list[dict]]) -> str: |
|
|
""" |
|
|
|
|
|
:param shexc_text: |
|
|
:param comments: |
|
|
:return: |
|
|
""" |
|
|
shexc_lines = shexc_text.split("\n") |
|
|
if not comments: |
|
|
return shexc_text |
|
|
for comment in comments[::-1]: |
|
|
if comment["location"] == 0: |
|
|
shexc_lines.insert(0, comment["comment"]) |
|
|
continue |
|
|
for idx, line in enumerate(shexc_lines): |
|
|
if line == comment["location"] or line.rstrip(' ;') == comment["location"]: |
|
|
if comment["type"] == "general": |
|
|
shexc_lines.insert(idx, comment["comment"]) |
|
|
else: |
|
|
shexc_lines[idx] = line.rstrip() + ' ' + comment["comment"].lstrip() |
|
|
break |
|
|
return '\n'.join(shexc_lines) |
|
|
|