import re from typing import Optional, Union from loguru import logger from pyshexc.parser_impl.generate_shexj import parse from jsonasobj import as_json from ShExJSG import ShExJ from esgen.ShExC import ShExC from pyjsg.jsglib import loads as jsg_loads from rdflib import Graph from rdflib.namespace import NamespaceManager from esgen.utils import locate_comment, position_start_line def shexc_to_shexj(shexc_text: str) -> tuple[str, Optional[str], Optional[Union[NamespaceManager, Graph]], Optional[list[dict]]]: """ converts ShExC text to ShExJ text :param shexc_text: :return: """ shexj_text = as_json(parse(shexc_text)) base = base_uri_parser_helper(shexc_text) namespaces = namespaces_parser_helper(shexc_text) comments = comment_parser_helper(shexc_text) return shexj_text, base, namespaces, comments def shexj_to_shexc( shexj_text: str, base: Optional[str], namespaces: Optional[Union[NamespaceManager, Graph]], comments: Optional[list[dict]]) -> str: """ converts ShExJ text to ShExC text :param shexj_text: :param base: :param namespaces: :param comments: :return: """ shex_json: ShExJ.Schema = jsg_loads(shexj_text, ShExJ) # shexc_text = str(ShExC(shex_json, base, namespaces)) shexc_text = insert_comments(shexc_text, comments) return shexc_text def base_uri_parser_helper(shexc_text: str) -> Optional[str]: """ :param shexc_text: :return: """ base_pattern = r'^[Bb][Aa][Ss][Ee]\s+<(.+)>$' for line in shexc_text.split("\n"): match = re.match(base_pattern, line) if match: return match.group(1) return None def namespaces_parser_helper(inputs: str | dict) -> Optional[Union[NamespaceManager, Graph]]: """ :param inputs: :return: """ g = Graph() if type(inputs) is str: prefix_pattern = r'^PREFIX\s+(\w+):\s+<(.+)>$' for line in inputs.split("\n"): match = re.match(prefix_pattern, line.strip()) if match: prefix = match.group(1) uri = match.group(2) g.bind(prefix, uri) elif type(inputs) is dict: for prefix, uri in inputs.items(): g.bind(prefix, uri) else: logger.error("TypeError: Incorrect inputs type for namespaces parser.") # print(dict(NamespaceManager(g).namespaces())) return NamespaceManager(g) def comment_parser_helper(shexc_text: str) -> list[dict]: """ Two types of comments are accepted by this helper function: - general comments: comments before start line will be added as general comments - constraint comments - case 1: single line comments - case 2: comments after the constraint :param shexc_text: :return: """ comments = list() start_line_num = position_start_line(shexc_text) shexc_lines = shexc_text.split("\n") for idx, line in enumerate(shexc_lines): # general comments if idx < start_line_num: if line.strip().startswith("#"): comments.append({ "comment": line, "type": "general", "location": locate_comment(shexc_lines[idx:], "general") }) # constraint comments else: if line.strip().startswith("#"): comments.append({ "comment": line, "type": "constraint", "location": locate_comment(shexc_lines[idx:], "constraint") }) elif "#" in line: comments.append({ "comment": line[line.index("#"):], "type": "constraint", "location": locate_comment(shexc_lines[idx:], "constraint") }) return comments def insert_comments(shexc_text: str, comments: Optional[list[dict]]) -> str: """ :param shexc_text: :param comments: :return: """ shexc_lines = shexc_text.split("\n") if not comments: return shexc_text for comment in comments[::-1]: # reverse the list during insertion since 'location' is the next line if comment["location"] == 0: shexc_lines.insert(0, comment["comment"]) continue for idx, line in enumerate(shexc_lines): if line == comment["location"] or line.rstrip(' ;') == comment["location"]: if comment["type"] == "general": shexc_lines.insert(idx, comment["comment"]) else: shexc_lines[idx] = line.rstrip() + ' ' + comment["comment"].lstrip() break return '\n'.join(shexc_lines)