| |
| |
| |
| |
| import sys |
| if sys.version_info[1] > 5: |
| from typing import TextIO |
| else: |
| from typing.io import TextIO |
| from antlr4.BufferedTokenStream import TokenStream |
| from antlr4.CommonTokenFactory import TokenFactory |
| from antlr4.error.ErrorStrategy import DefaultErrorStrategy |
| from antlr4.InputStream import InputStream |
| from antlr4.Recognizer import Recognizer |
| from antlr4.RuleContext import RuleContext |
| from antlr4.ParserRuleContext import ParserRuleContext |
| from antlr4.Token import Token |
| from antlr4.Lexer import Lexer |
| from antlr4.atn.ATNDeserializer import ATNDeserializer |
| from antlr4.atn.ATNDeserializationOptions import ATNDeserializationOptions |
| from antlr4.error.Errors import UnsupportedOperationException, RecognitionException |
| from antlr4.tree.ParseTreePatternMatcher import ParseTreePatternMatcher |
| from antlr4.tree.Tree import ParseTreeListener, TerminalNode, ErrorNode |
|
|
| class TraceListener(ParseTreeListener): |
| __slots__ = '_parser' |
|
|
| def __init__(self, parser): |
| self._parser = parser |
|
|
| def enterEveryRule(self, ctx): |
| print("enter " + self._parser.ruleNames[ctx.getRuleIndex()] + ", LT(1)=" + self._parser._input.LT(1).text, file=self._parser._output) |
|
|
| def visitTerminal(self, node): |
|
|
| print("consume " + str(node.symbol) + " rule " + self._parser.ruleNames[self._parser._ctx.getRuleIndex()], file=self._parser._output) |
|
|
| def visitErrorNode(self, node): |
| pass |
|
|
|
|
| def exitEveryRule(self, ctx): |
| print("exit " + self._parser.ruleNames[ctx.getRuleIndex()] + ", LT(1)=" + self._parser._input.LT(1).text, file=self._parser._output) |
|
|
|
|
| |
| class Parser (Recognizer): |
| __slots__ = ( |
| '_input', '_output', '_errHandler', '_precedenceStack', '_ctx', |
| 'buildParseTrees', '_tracer', '_parseListeners', '_syntaxErrors' |
|
|
| ) |
| |
| |
| |
| |
| |
| bypassAltsAtnCache = dict() |
|
|
| def __init__(self, input:TokenStream, output:TextIO = sys.stdout): |
| super().__init__() |
| |
| self._input = None |
| self._output = output |
| |
| |
| self._errHandler = DefaultErrorStrategy() |
| self._precedenceStack = list() |
| self._precedenceStack.append(0) |
| |
| |
| self._ctx = None |
| |
| |
| self.buildParseTrees = True |
| |
| |
| |
| |
| |
| self._tracer = None |
| |
| |
| self._parseListeners = None |
| |
| |
| self._syntaxErrors = 0 |
| self.setInputStream(input) |
|
|
| |
| def reset(self): |
| if self._input is not None: |
| self._input.seek(0) |
| self._errHandler.reset(self) |
| self._ctx = None |
| self._syntaxErrors = 0 |
| self.setTrace(False) |
| self._precedenceStack = list() |
| self._precedenceStack.append(0) |
| if self._interp is not None: |
| self._interp.reset() |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def match(self, ttype:int): |
| t = self.getCurrentToken() |
| if t.type==ttype: |
| self._errHandler.reportMatch(self) |
| self.consume() |
| else: |
| t = self._errHandler.recoverInline(self) |
| if self.buildParseTrees and t.tokenIndex==-1: |
| |
| |
| self._ctx.addErrorNode(t) |
| return t |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| def matchWildcard(self): |
| t = self.getCurrentToken() |
| if t.type > 0: |
| self._errHandler.reportMatch(self) |
| self.consume() |
| else: |
| t = self._errHandler.recoverInline(self) |
| if self.buildParseTrees and t.tokenIndex == -1: |
| |
| |
| self._ctx.addErrorNode(t) |
|
|
| return t |
|
|
| def getParseListeners(self): |
| return list() if self._parseListeners is None else self._parseListeners |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def addParseListener(self, listener:ParseTreeListener): |
| if listener is None: |
| raise ReferenceError("listener") |
| if self._parseListeners is None: |
| self._parseListeners = [] |
| self._parseListeners.append(listener) |
|
|
| |
| |
| |
| |
| |
| |
| |
| def removeParseListener(self, listener:ParseTreeListener): |
| if self._parseListeners is not None: |
| self._parseListeners.remove(listener) |
| if len(self._parseListeners)==0: |
| self._parseListeners = None |
|
|
| |
| def removeParseListeners(self): |
| self._parseListeners = None |
|
|
| |
| def triggerEnterRuleEvent(self): |
| if self._parseListeners is not None: |
| for listener in self._parseListeners: |
| listener.enterEveryRule(self._ctx) |
| self._ctx.enterRule(listener) |
|
|
| |
| |
| |
| |
| |
| def triggerExitRuleEvent(self): |
| if self._parseListeners is not None: |
| |
| for listener in reversed(self._parseListeners): |
| self._ctx.exitRule(listener) |
| listener.exitEveryRule(self._ctx) |
|
|
|
|
| |
| |
| |
| |
| |
| def getNumberOfSyntaxErrors(self): |
| return self._syntaxErrors |
|
|
| def getTokenFactory(self): |
| return self._input.tokenSource._factory |
|
|
| |
| def setTokenFactory(self, factory:TokenFactory): |
| self._input.tokenSource._factory = factory |
|
|
| |
| |
| |
| |
| |
| |
| def getATNWithBypassAlts(self): |
| serializedAtn = self.getSerializedATN() |
| if serializedAtn is None: |
| raise UnsupportedOperationException("The current parser does not support an ATN with bypass alternatives.") |
| result = self.bypassAltsAtnCache.get(serializedAtn, None) |
| if result is None: |
| deserializationOptions = ATNDeserializationOptions() |
| deserializationOptions.generateRuleBypassTransitions = True |
| result = ATNDeserializer(deserializationOptions).deserialize(serializedAtn) |
| self.bypassAltsAtnCache[serializedAtn] = result |
| return result |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def compileParseTreePattern(self, pattern:str, patternRuleIndex:int, lexer:Lexer = None): |
| if lexer is None: |
| if self.getTokenStream() is not None: |
| tokenSource = self.getTokenStream().tokenSource |
| if isinstance( tokenSource, Lexer ): |
| lexer = tokenSource |
| if lexer is None: |
| raise UnsupportedOperationException("Parser can't discover a lexer to use") |
|
|
| m = ParseTreePatternMatcher(lexer, self) |
| return m.compile(pattern, patternRuleIndex) |
|
|
|
|
| def getInputStream(self): |
| return self.getTokenStream() |
|
|
| def setInputStream(self, input:InputStream): |
| self.setTokenStream(input) |
|
|
| def getTokenStream(self): |
| return self._input |
|
|
| |
| def setTokenStream(self, input:TokenStream): |
| self._input = None |
| self.reset() |
| self._input = input |
|
|
| |
| |
| |
| def getCurrentToken(self): |
| return self._input.LT(1) |
|
|
| def notifyErrorListeners(self, msg:str, offendingToken:Token = None, e:RecognitionException = None): |
| if offendingToken is None: |
| offendingToken = self.getCurrentToken() |
| self._syntaxErrors += 1 |
| line = offendingToken.line |
| column = offendingToken.column |
| listener = self.getErrorListenerDispatch() |
| listener.syntaxError(self, offendingToken, line, column, msg, e) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def consume(self): |
| o = self.getCurrentToken() |
| if o.type != Token.EOF: |
| self.getInputStream().consume() |
| hasListener = self._parseListeners is not None and len(self._parseListeners)>0 |
| if self.buildParseTrees or hasListener: |
| if self._errHandler.inErrorRecoveryMode(self): |
| node = self._ctx.addErrorNode(o) |
| else: |
| node = self._ctx.addTokenNode(o) |
| if hasListener: |
| for listener in self._parseListeners: |
| if isinstance(node, ErrorNode): |
| listener.visitErrorNode(node) |
| elif isinstance(node, TerminalNode): |
| listener.visitTerminal(node) |
| return o |
|
|
| def addContextToParseTree(self): |
| |
| if self._ctx.parentCtx is not None: |
| self._ctx.parentCtx.addChild(self._ctx) |
|
|
| |
| |
| |
| def enterRule(self, localctx:ParserRuleContext , state:int , ruleIndex:int): |
| self.state = state |
| self._ctx = localctx |
| self._ctx.start = self._input.LT(1) |
| if self.buildParseTrees: |
| self.addContextToParseTree() |
| if self._parseListeners is not None: |
| self.triggerEnterRuleEvent() |
|
|
| def exitRule(self): |
| self._ctx.stop = self._input.LT(-1) |
| |
| if self._parseListeners is not None: |
| self.triggerExitRuleEvent() |
| self.state = self._ctx.invokingState |
| self._ctx = self._ctx.parentCtx |
|
|
| def enterOuterAlt(self, localctx:ParserRuleContext, altNum:int): |
| localctx.setAltNumber(altNum) |
| |
| |
| if self.buildParseTrees and self._ctx != localctx: |
| if self._ctx.parentCtx is not None: |
| self._ctx.parentCtx.removeLastChild() |
| self._ctx.parentCtx.addChild(localctx) |
| self._ctx = localctx |
|
|
| |
| |
| |
| |
| |
| def getPrecedence(self): |
| if len(self._precedenceStack)==0: |
| return -1 |
| else: |
| return self._precedenceStack[-1] |
|
|
| def enterRecursionRule(self, localctx:ParserRuleContext, state:int, ruleIndex:int, precedence:int): |
| self.state = state |
| self._precedenceStack.append(precedence) |
| self._ctx = localctx |
| self._ctx.start = self._input.LT(1) |
| if self._parseListeners is not None: |
| self.triggerEnterRuleEvent() |
|
|
| |
| |
| |
| def pushNewRecursionContext(self, localctx:ParserRuleContext, state:int, ruleIndex:int): |
| previous = self._ctx |
| previous.parentCtx = localctx |
| previous.invokingState = state |
| previous.stop = self._input.LT(-1) |
|
|
| self._ctx = localctx |
| self._ctx.start = previous.start |
| if self.buildParseTrees: |
| self._ctx.addChild(previous) |
|
|
| if self._parseListeners is not None: |
| self.triggerEnterRuleEvent() |
|
|
| def unrollRecursionContexts(self, parentCtx:ParserRuleContext): |
| self._precedenceStack.pop() |
| self._ctx.stop = self._input.LT(-1) |
| retCtx = self._ctx |
| |
| if self._parseListeners is not None: |
| while self._ctx is not parentCtx: |
| self.triggerExitRuleEvent() |
| self._ctx = self._ctx.parentCtx |
| else: |
| self._ctx = parentCtx |
|
|
| |
| retCtx.parentCtx = parentCtx |
|
|
| if self.buildParseTrees and parentCtx is not None: |
| |
| parentCtx.addChild(retCtx) |
|
|
| def getInvokingContext(self, ruleIndex:int): |
| ctx = self._ctx |
| while ctx is not None: |
| if ctx.getRuleIndex() == ruleIndex: |
| return ctx |
| ctx = ctx.parentCtx |
| return None |
|
|
|
|
| def precpred(self, localctx:RuleContext , precedence:int): |
| return precedence >= self._precedenceStack[-1] |
|
|
| def inContext(self, context:str): |
| |
| return False |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| def isExpectedToken(self, symbol:int): |
| atn = self._interp.atn |
| ctx = self._ctx |
| s = atn.states[self.state] |
| following = atn.nextTokens(s) |
| if symbol in following: |
| return True |
| if not Token.EPSILON in following: |
| return False |
|
|
| while ctx is not None and ctx.invokingState>=0 and Token.EPSILON in following: |
| invokingState = atn.states[ctx.invokingState] |
| rt = invokingState.transitions[0] |
| following = atn.nextTokens(rt.followState) |
| if symbol in following: |
| return True |
| ctx = ctx.parentCtx |
|
|
| if Token.EPSILON in following and symbol == Token.EOF: |
| return True |
| else: |
| return False |
|
|
| |
| |
| |
| |
| |
| |
| def getExpectedTokens(self): |
| return self._interp.atn.getExpectedTokens(self.state, self._ctx) |
|
|
| def getExpectedTokensWithinCurrentRule(self): |
| atn = self._interp.atn |
| s = atn.states[self.state] |
| return atn.nextTokens(s) |
|
|
| |
| def getRuleIndex(self, ruleName:str): |
| ruleIndex = self.getRuleIndexMap().get(ruleName, None) |
| if ruleIndex is not None: |
| return ruleIndex |
| else: |
| return -1 |
|
|
| |
| |
| |
| |
| |
| |
| |
| def getRuleInvocationStack(self, p:RuleContext=None): |
| if p is None: |
| p = self._ctx |
| stack = list() |
| while p is not None: |
| |
| ruleIndex = p.getRuleIndex() |
| if ruleIndex<0: |
| stack.append("n/a") |
| else: |
| stack.append(self.ruleNames[ruleIndex]) |
| p = p.parentCtx |
| return stack |
|
|
| |
| def getDFAStrings(self): |
| return [ str(dfa) for dfa in self._interp.decisionToDFA] |
|
|
| |
| def dumpDFA(self): |
| seenOne = False |
| for i in range(0, len(self._interp.decisionToDFA)): |
| dfa = self._interp.decisionToDFA[i] |
| if len(dfa.states)>0: |
| if seenOne: |
| print(file=self._output) |
| print("Decision " + str(dfa.decision) + ":", file=self._output) |
| print(dfa.toString(self.literalNames, self.symbolicNames), end='', file=self._output) |
| seenOne = True |
|
|
|
|
| def getSourceName(self): |
| return self._input.sourceName |
|
|
| |
| |
| |
| def setTrace(self, trace:bool): |
| if not trace: |
| self.removeParseListener(self._tracer) |
| self._tracer = None |
| else: |
| if self._tracer is not None: |
| self.removeParseListener(self._tracer) |
| self._tracer = TraceListener(self) |
| self.addParseListener(self._tracer) |
|
|