ESGen / esgen /parser.py
Bohui Zhang
Update the second version
41bef2b
import re
from typing import Optional, Union
from loguru import logger
from pyshexc.parser_impl.generate_shexj import parse
from jsonasobj import as_json
from ShExJSG import ShExJ
from esgen.ShExC import ShExC
from pyjsg.jsglib import loads as jsg_loads
from rdflib import Graph
from rdflib.namespace import NamespaceManager
from esgen.utils import locate_comment, position_start_line
def shexc_to_shexj(shexc_text: str) -> tuple[str, Optional[str], Optional[Union[NamespaceManager, Graph]], Optional[list[dict]]]:
"""
converts ShExC text to ShExJ text
:param shexc_text:
:return:
"""
shexj_text = as_json(parse(shexc_text))
base = base_uri_parser_helper(shexc_text)
namespaces = namespaces_parser_helper(shexc_text)
comments = comment_parser_helper(shexc_text)
return shexj_text, base, namespaces, comments
def shexj_to_shexc(
shexj_text: str,
base: Optional[str],
namespaces: Optional[Union[NamespaceManager, Graph]],
comments: Optional[list[dict]]) -> str:
"""
converts ShExJ text to ShExC text
:param shexj_text:
:param base:
:param namespaces:
:param comments:
:return:
"""
shex_json: ShExJ.Schema = jsg_loads(shexj_text, ShExJ) # <class 'ShExJSG.ShExJ.Schema'>
shexc_text = str(ShExC(shex_json, base, namespaces))
shexc_text = insert_comments(shexc_text, comments)
return shexc_text
def base_uri_parser_helper(shexc_text: str) -> Optional[str]:
"""
:param shexc_text:
:return:
"""
base_pattern = r'^[Bb][Aa][Ss][Ee]\s+<(.+)>$'
for line in shexc_text.split("\n"):
match = re.match(base_pattern, line)
if match:
return match.group(1)
return None
def namespaces_parser_helper(inputs: str | dict) -> Optional[Union[NamespaceManager, Graph]]:
"""
:param inputs:
:return:
"""
g = Graph()
if type(inputs) is str:
prefix_pattern = r'^PREFIX\s+(\w+):\s+<(.+)>$'
for line in inputs.split("\n"):
match = re.match(prefix_pattern, line.strip())
if match:
prefix = match.group(1)
uri = match.group(2)
g.bind(prefix, uri)
elif type(inputs) is dict:
for prefix, uri in inputs.items():
g.bind(prefix, uri)
else:
logger.error("TypeError: Incorrect inputs type for namespaces parser.")
# print(dict(NamespaceManager(g).namespaces()))
return NamespaceManager(g)
def comment_parser_helper(shexc_text: str) -> list[dict]:
"""
Two types of comments are accepted by this helper function:
- general comments: comments before start line will be added as general comments
- constraint comments
- case 1: single line comments
- case 2: comments after the constraint
:param shexc_text:
:return:
"""
comments = list()
start_line_num = position_start_line(shexc_text)
shexc_lines = shexc_text.split("\n")
for idx, line in enumerate(shexc_lines):
# general comments
if idx < start_line_num:
if line.strip().startswith("#"):
comments.append({
"comment": line,
"type": "general",
"location": locate_comment(shexc_lines[idx:], "general")
})
# constraint comments
else:
if line.strip().startswith("#"):
comments.append({
"comment": line,
"type": "constraint",
"location": locate_comment(shexc_lines[idx:], "constraint")
})
elif "#" in line:
comments.append({
"comment": line[line.index("#"):],
"type": "constraint",
"location": locate_comment(shexc_lines[idx:], "constraint")
})
return comments
def insert_comments(shexc_text: str, comments: Optional[list[dict]]) -> str:
"""
:param shexc_text:
:param comments:
:return:
"""
shexc_lines = shexc_text.split("\n")
if not comments:
return shexc_text
for comment in comments[::-1]: # reverse the list during insertion since 'location' is the next line
if comment["location"] == 0:
shexc_lines.insert(0, comment["comment"])
continue
for idx, line in enumerate(shexc_lines):
if line == comment["location"] or line.rstrip(' ;') == comment["location"]:
if comment["type"] == "general":
shexc_lines.insert(idx, comment["comment"])
else:
shexc_lines[idx] = line.rstrip() + ' ' + comment["comment"].lstrip()
break
return '\n'.join(shexc_lines)