| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from io import StringIO |
| | from antlr4.Token import Token |
| | from antlr4.error.Errors import IllegalStateException |
| |
|
| | |
| | Lexer = None |
| |
|
| | |
| | class TokenStream(object): |
| |
|
| | pass |
| |
|
| |
|
| | class BufferedTokenStream(TokenStream): |
| | __slots__ = ('tokenSource', 'tokens', 'index', 'fetchedEOF') |
| |
|
| | def __init__(self, tokenSource:Lexer): |
| | |
| | self.tokenSource = tokenSource |
| |
|
| | |
| | |
| | |
| | self.tokens = [] |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.index = -1 |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.fetchedEOF = False |
| |
|
| | def mark(self): |
| | return 0 |
| |
|
| | def release(self, marker:int): |
| | |
| | pass |
| |
|
| | def reset(self): |
| | self.seek(0) |
| |
|
| | def seek(self, index:int): |
| | self.lazyInit() |
| | self.index = self.adjustSeekIndex(index) |
| |
|
| | def get(self, index:int): |
| | self.lazyInit() |
| | return self.tokens[index] |
| |
|
| | def consume(self): |
| | skipEofCheck = False |
| | if self.index >= 0: |
| | if self.fetchedEOF: |
| | |
| | |
| | skipEofCheck = self.index < len(self.tokens) - 1 |
| | else: |
| | |
| | skipEofCheck = self.index < len(self.tokens) |
| | else: |
| | |
| | skipEofCheck = False |
| |
|
| | if not skipEofCheck and self.LA(1) == Token.EOF: |
| | raise IllegalStateException("cannot consume EOF") |
| |
|
| | if self.sync(self.index + 1): |
| | self.index = self.adjustSeekIndex(self.index + 1) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | def sync(self, i:int): |
| | n = i - len(self.tokens) + 1 |
| | if n > 0 : |
| | fetched = self.fetch(n) |
| | return fetched >= n |
| | return True |
| |
|
| | |
| | |
| | |
| | |
| | def fetch(self, n:int): |
| | if self.fetchedEOF: |
| | return 0 |
| | for i in range(0, n): |
| | t = self.tokenSource.nextToken() |
| | t.tokenIndex = len(self.tokens) |
| | self.tokens.append(t) |
| | if t.type==Token.EOF: |
| | self.fetchedEOF = True |
| | return i + 1 |
| | return n |
| |
|
| |
|
| | |
| | def getTokens(self, start:int, stop:int, types:set=None): |
| | if start<0 or stop<0: |
| | return None |
| | self.lazyInit() |
| | subset = [] |
| | if stop >= len(self.tokens): |
| | stop = len(self.tokens)-1 |
| | for i in range(start, stop): |
| | t = self.tokens[i] |
| | if t.type==Token.EOF: |
| | break |
| | if types is None or t.type in types: |
| | subset.append(t) |
| | return subset |
| |
|
| | def LA(self, i:int): |
| | return self.LT(i).type |
| |
|
| | def LB(self, k:int): |
| | if (self.index-k) < 0: |
| | return None |
| | return self.tokens[self.index-k] |
| |
|
| | def LT(self, k:int): |
| | self.lazyInit() |
| | if k==0: |
| | return None |
| | if k < 0: |
| | return self.LB(-k) |
| | i = self.index + k - 1 |
| | self.sync(i) |
| | if i >= len(self.tokens): |
| | |
| | return self.tokens[len(self.tokens)-1] |
| | return self.tokens[i] |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def adjustSeekIndex(self, i:int): |
| | return i |
| |
|
| | def lazyInit(self): |
| | if self.index == -1: |
| | self.setup() |
| |
|
| | def setup(self): |
| | self.sync(0) |
| | self.index = self.adjustSeekIndex(0) |
| |
|
| | |
| | def setTokenSource(self, tokenSource:Lexer): |
| | self.tokenSource = tokenSource |
| | self.tokens = [] |
| | self.index = -1 |
| | self.fetchedEOF = False |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | def nextTokenOnChannel(self, i:int, channel:int): |
| | self.sync(i) |
| | if i>=len(self.tokens): |
| | return len(self.tokens) - 1 |
| | token = self.tokens[i] |
| | while token.channel!=channel: |
| | if token.type==Token.EOF: |
| | return i |
| | i += 1 |
| | self.sync(i) |
| | token = self.tokens[i] |
| | return i |
| |
|
| | |
| | |
| | |
| | def previousTokenOnChannel(self, i:int, channel:int): |
| | while i>=0 and self.tokens[i].channel!=channel: |
| | i -= 1 |
| | return i |
| |
|
| | |
| | |
| | |
| | def getHiddenTokensToRight(self, tokenIndex:int, channel:int=-1): |
| | self.lazyInit() |
| | if tokenIndex<0 or tokenIndex>=len(self.tokens): |
| | raise Exception(str(tokenIndex) + " not in 0.." + str(len(self.tokens)-1)) |
| | from antlr4.Lexer import Lexer |
| | nextOnChannel = self.nextTokenOnChannel(tokenIndex + 1, Lexer.DEFAULT_TOKEN_CHANNEL) |
| | from_ = tokenIndex+1 |
| | |
| | to = (len(self.tokens)-1) if nextOnChannel==-1 else nextOnChannel |
| | return self.filterForChannel(from_, to, channel) |
| |
|
| |
|
| | |
| | |
| | |
| | def getHiddenTokensToLeft(self, tokenIndex:int, channel:int=-1): |
| | self.lazyInit() |
| | if tokenIndex<0 or tokenIndex>=len(self.tokens): |
| | raise Exception(str(tokenIndex) + " not in 0.." + str(len(self.tokens)-1)) |
| | from antlr4.Lexer import Lexer |
| | prevOnChannel = self.previousTokenOnChannel(tokenIndex - 1, Lexer.DEFAULT_TOKEN_CHANNEL) |
| | if prevOnChannel == tokenIndex - 1: |
| | return None |
| | |
| | from_ = prevOnChannel+1 |
| | to = tokenIndex-1 |
| | return self.filterForChannel(from_, to, channel) |
| |
|
| |
|
| | def filterForChannel(self, left:int, right:int, channel:int): |
| | hidden = [] |
| | for i in range(left, right+1): |
| | t = self.tokens[i] |
| | if channel==-1: |
| | from antlr4.Lexer import Lexer |
| | if t.channel!= Lexer.DEFAULT_TOKEN_CHANNEL: |
| | hidden.append(t) |
| | elif t.channel==channel: |
| | hidden.append(t) |
| | if len(hidden)==0: |
| | return None |
| | return hidden |
| |
|
| | def getSourceName(self): |
| | return self.tokenSource.getSourceName() |
| |
|
| | |
| | def getText(self, start:int=None, stop:int=None): |
| | self.lazyInit() |
| | self.fill() |
| | if isinstance(start, Token): |
| | start = start.tokenIndex |
| | elif start is None: |
| | start = 0 |
| | if isinstance(stop, Token): |
| | stop = stop.tokenIndex |
| | elif stop is None or stop >= len(self.tokens): |
| | stop = len(self.tokens) - 1 |
| | if start < 0 or stop < 0 or stop < start: |
| | return "" |
| | with StringIO() as buf: |
| | for i in range(start, stop+1): |
| | t = self.tokens[i] |
| | if t.type==Token.EOF: |
| | break |
| | buf.write(t.text) |
| | return buf.getvalue() |
| |
|
| |
|
| | |
| | def fill(self): |
| | self.lazyInit() |
| | while self.fetch(1000)==1000: |
| | pass |
| |
|