| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | __all__ = ['Scanner', 'ScannerError'] |
| |
|
| | from .error import MarkedYAMLError |
| | from .tokens import * |
| |
|
| | class ScannerError(MarkedYAMLError): |
| | pass |
| |
|
| | class SimpleKey: |
| | |
| |
|
| | def __init__(self, token_number, required, index, line, column, mark): |
| | self.token_number = token_number |
| | self.required = required |
| | self.index = index |
| | self.line = line |
| | self.column = column |
| | self.mark = mark |
| |
|
| | class Scanner: |
| |
|
| | def __init__(self): |
| | """Initialize the scanner.""" |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | self.done = False |
| |
|
| | |
| | |
| | self.flow_level = 0 |
| |
|
| | |
| | self.tokens = [] |
| |
|
| | |
| | self.fetch_stream_start() |
| |
|
| | |
| | self.tokens_taken = 0 |
| |
|
| | |
| | self.indent = -1 |
| |
|
| | |
| | self.indents = [] |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.allow_simple_key = True |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | self.possible_simple_keys = {} |
| |
|
| | |
| |
|
| | def check_token(self, *choices): |
| | |
| | while self.need_more_tokens(): |
| | self.fetch_more_tokens() |
| | if self.tokens: |
| | if not choices: |
| | return True |
| | for choice in choices: |
| | if isinstance(self.tokens[0], choice): |
| | return True |
| | return False |
| |
|
| | def peek_token(self): |
| | |
| | |
| | while self.need_more_tokens(): |
| | self.fetch_more_tokens() |
| | if self.tokens: |
| | return self.tokens[0] |
| | else: |
| | return None |
| |
|
| | def get_token(self): |
| | |
| | while self.need_more_tokens(): |
| | self.fetch_more_tokens() |
| | if self.tokens: |
| | self.tokens_taken += 1 |
| | return self.tokens.pop(0) |
| |
|
| | |
| |
|
| | def need_more_tokens(self): |
| | if self.done: |
| | return False |
| | if not self.tokens: |
| | return True |
| | |
| | |
| | self.stale_possible_simple_keys() |
| | if self.next_possible_simple_key() == self.tokens_taken: |
| | return True |
| |
|
| | def fetch_more_tokens(self): |
| |
|
| | |
| | self.scan_to_next_token() |
| |
|
| | |
| | self.stale_possible_simple_keys() |
| |
|
| | |
| | |
| | self.unwind_indent(self.column) |
| |
|
| | |
| | ch = self.peek() |
| |
|
| | |
| | if ch == '\0': |
| | return self.fetch_stream_end() |
| |
|
| | |
| | if ch == '%' and self.check_directive(): |
| | return self.fetch_directive() |
| |
|
| | |
| | if ch == '-' and self.check_document_start(): |
| | return self.fetch_document_start() |
| |
|
| | |
| | if ch == '.' and self.check_document_end(): |
| | return self.fetch_document_end() |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | if ch == '[': |
| | return self.fetch_flow_sequence_start() |
| |
|
| | |
| | if ch == '{': |
| | return self.fetch_flow_mapping_start() |
| |
|
| | |
| | if ch == ']': |
| | return self.fetch_flow_sequence_end() |
| |
|
| | |
| | if ch == '}': |
| | return self.fetch_flow_mapping_end() |
| |
|
| | |
| | if ch == ',': |
| | return self.fetch_flow_entry() |
| |
|
| | |
| | if ch == '-' and self.check_block_entry(): |
| | return self.fetch_block_entry() |
| |
|
| | |
| | if ch == '?' and self.check_key(): |
| | return self.fetch_key() |
| |
|
| | |
| | if ch == ':' and self.check_value(): |
| | return self.fetch_value() |
| |
|
| | |
| | if ch == '*': |
| | return self.fetch_alias() |
| |
|
| | |
| | if ch == '&': |
| | return self.fetch_anchor() |
| |
|
| | |
| | if ch == '!': |
| | return self.fetch_tag() |
| |
|
| | |
| | if ch == '|' and not self.flow_level: |
| | return self.fetch_literal() |
| |
|
| | |
| | if ch == '>' and not self.flow_level: |
| | return self.fetch_folded() |
| |
|
| | |
| | if ch == '\'': |
| | return self.fetch_single() |
| |
|
| | |
| | if ch == '\"': |
| | return self.fetch_double() |
| |
|
| | |
| | if self.check_plain(): |
| | return self.fetch_plain() |
| |
|
| | |
| | raise ScannerError("while scanning for the next token", None, |
| | "found character %r that cannot start any token" % ch, |
| | self.get_mark()) |
| |
|
| | |
| |
|
| | def next_possible_simple_key(self): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | min_token_number = None |
| | for level in self.possible_simple_keys: |
| | key = self.possible_simple_keys[level] |
| | if min_token_number is None or key.token_number < min_token_number: |
| | min_token_number = key.token_number |
| | return min_token_number |
| |
|
| | def stale_possible_simple_keys(self): |
| | |
| | |
| | |
| | |
| | |
| | |
| | for level in list(self.possible_simple_keys): |
| | key = self.possible_simple_keys[level] |
| | if key.line != self.line \ |
| | or self.index-key.index > 1024: |
| | if key.required: |
| | raise ScannerError("while scanning a simple key", key.mark, |
| | "could not find expected ':'", self.get_mark()) |
| | del self.possible_simple_keys[level] |
| |
|
| | def save_possible_simple_key(self): |
| | |
| | |
| | |
| |
|
| | |
| | required = not self.flow_level and self.indent == self.column |
| |
|
| | |
| | |
| | if self.allow_simple_key: |
| | self.remove_possible_simple_key() |
| | token_number = self.tokens_taken+len(self.tokens) |
| | key = SimpleKey(token_number, required, |
| | self.index, self.line, self.column, self.get_mark()) |
| | self.possible_simple_keys[self.flow_level] = key |
| |
|
| | def remove_possible_simple_key(self): |
| | |
| | if self.flow_level in self.possible_simple_keys: |
| | key = self.possible_simple_keys[self.flow_level] |
| | |
| | if key.required: |
| | raise ScannerError("while scanning a simple key", key.mark, |
| | "could not find expected ':'", self.get_mark()) |
| |
|
| | del self.possible_simple_keys[self.flow_level] |
| |
|
| | |
| |
|
| | def unwind_indent(self, column): |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | |
| | if self.flow_level: |
| | return |
| |
|
| | |
| | while self.indent > column: |
| | mark = self.get_mark() |
| | self.indent = self.indents.pop() |
| | self.tokens.append(BlockEndToken(mark, mark)) |
| |
|
| | def add_indent(self, column): |
| | |
| | if self.indent < column: |
| | self.indents.append(self.indent) |
| | self.indent = column |
| | return True |
| | return False |
| |
|
| | |
| |
|
| | def fetch_stream_start(self): |
| | |
| | |
| |
|
| | |
| | mark = self.get_mark() |
| | |
| | |
| | self.tokens.append(StreamStartToken(mark, mark, |
| | encoding=self.encoding)) |
| | |
| |
|
| | def fetch_stream_end(self): |
| |
|
| | |
| | self.unwind_indent(-1) |
| |
|
| | |
| | self.remove_possible_simple_key() |
| | self.allow_simple_key = False |
| | self.possible_simple_keys = {} |
| |
|
| | |
| | mark = self.get_mark() |
| | |
| | |
| | self.tokens.append(StreamEndToken(mark, mark)) |
| |
|
| | |
| | self.done = True |
| |
|
| | def fetch_directive(self): |
| | |
| | |
| | self.unwind_indent(-1) |
| |
|
| | |
| | self.remove_possible_simple_key() |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_directive()) |
| |
|
| | def fetch_document_start(self): |
| | self.fetch_document_indicator(DocumentStartToken) |
| |
|
| | def fetch_document_end(self): |
| | self.fetch_document_indicator(DocumentEndToken) |
| |
|
| | def fetch_document_indicator(self, TokenClass): |
| |
|
| | |
| | self.unwind_indent(-1) |
| |
|
| | |
| | |
| | self.remove_possible_simple_key() |
| | self.allow_simple_key = False |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward(3) |
| | end_mark = self.get_mark() |
| | self.tokens.append(TokenClass(start_mark, end_mark)) |
| |
|
| | def fetch_flow_sequence_start(self): |
| | self.fetch_flow_collection_start(FlowSequenceStartToken) |
| |
|
| | def fetch_flow_mapping_start(self): |
| | self.fetch_flow_collection_start(FlowMappingStartToken) |
| |
|
| | def fetch_flow_collection_start(self, TokenClass): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | self.flow_level += 1 |
| |
|
| | |
| | self.allow_simple_key = True |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(TokenClass(start_mark, end_mark)) |
| |
|
| | def fetch_flow_sequence_end(self): |
| | self.fetch_flow_collection_end(FlowSequenceEndToken) |
| |
|
| | def fetch_flow_mapping_end(self): |
| | self.fetch_flow_collection_end(FlowMappingEndToken) |
| |
|
| | def fetch_flow_collection_end(self, TokenClass): |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | self.flow_level -= 1 |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(TokenClass(start_mark, end_mark)) |
| |
|
| | def fetch_flow_entry(self): |
| |
|
| | |
| | self.allow_simple_key = True |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(FlowEntryToken(start_mark, end_mark)) |
| |
|
| | def fetch_block_entry(self): |
| |
|
| | |
| | if not self.flow_level: |
| |
|
| | |
| | if not self.allow_simple_key: |
| | raise ScannerError(None, None, |
| | "sequence entries are not allowed here", |
| | self.get_mark()) |
| |
|
| | |
| | if self.add_indent(self.column): |
| | mark = self.get_mark() |
| | self.tokens.append(BlockSequenceStartToken(mark, mark)) |
| |
|
| | |
| | |
| | else: |
| | pass |
| |
|
| | |
| | self.allow_simple_key = True |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(BlockEntryToken(start_mark, end_mark)) |
| |
|
| | def fetch_key(self): |
| | |
| | |
| | if not self.flow_level: |
| |
|
| | |
| | if not self.allow_simple_key: |
| | raise ScannerError(None, None, |
| | "mapping keys are not allowed here", |
| | self.get_mark()) |
| |
|
| | |
| | if self.add_indent(self.column): |
| | mark = self.get_mark() |
| | self.tokens.append(BlockMappingStartToken(mark, mark)) |
| |
|
| | |
| | self.allow_simple_key = not self.flow_level |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(KeyToken(start_mark, end_mark)) |
| |
|
| | def fetch_value(self): |
| |
|
| | |
| | if self.flow_level in self.possible_simple_keys: |
| |
|
| | |
| | key = self.possible_simple_keys[self.flow_level] |
| | del self.possible_simple_keys[self.flow_level] |
| | self.tokens.insert(key.token_number-self.tokens_taken, |
| | KeyToken(key.mark, key.mark)) |
| |
|
| | |
| | |
| | if not self.flow_level: |
| | if self.add_indent(key.column): |
| | self.tokens.insert(key.token_number-self.tokens_taken, |
| | BlockMappingStartToken(key.mark, key.mark)) |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | else: |
| | |
| | |
| | |
| | |
| | if not self.flow_level: |
| |
|
| | |
| | |
| | if not self.allow_simple_key: |
| | raise ScannerError(None, None, |
| | "mapping values are not allowed here", |
| | self.get_mark()) |
| |
|
| | |
| | |
| | |
| | if not self.flow_level: |
| | if self.add_indent(self.column): |
| | mark = self.get_mark() |
| | self.tokens.append(BlockMappingStartToken(mark, mark)) |
| |
|
| | |
| | self.allow_simple_key = not self.flow_level |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | end_mark = self.get_mark() |
| | self.tokens.append(ValueToken(start_mark, end_mark)) |
| |
|
| | def fetch_alias(self): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_anchor(AliasToken)) |
| |
|
| | def fetch_anchor(self): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_anchor(AnchorToken)) |
| |
|
| | def fetch_tag(self): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_tag()) |
| |
|
| | def fetch_literal(self): |
| | self.fetch_block_scalar(style='|') |
| |
|
| | def fetch_folded(self): |
| | self.fetch_block_scalar(style='>') |
| |
|
| | def fetch_block_scalar(self, style): |
| |
|
| | |
| | self.allow_simple_key = True |
| |
|
| | |
| | self.remove_possible_simple_key() |
| |
|
| | |
| | self.tokens.append(self.scan_block_scalar(style)) |
| |
|
| | def fetch_single(self): |
| | self.fetch_flow_scalar(style='\'') |
| |
|
| | def fetch_double(self): |
| | self.fetch_flow_scalar(style='"') |
| |
|
| | def fetch_flow_scalar(self, style): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_flow_scalar(style)) |
| |
|
| | def fetch_plain(self): |
| |
|
| | |
| | self.save_possible_simple_key() |
| |
|
| | |
| | |
| | |
| | self.allow_simple_key = False |
| |
|
| | |
| | self.tokens.append(self.scan_plain()) |
| |
|
| | |
| |
|
| | def check_directive(self): |
| |
|
| | |
| | |
| | if self.column == 0: |
| | return True |
| |
|
| | def check_document_start(self): |
| |
|
| | |
| | if self.column == 0: |
| | if self.prefix(3) == '---' \ |
| | and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
| | return True |
| |
|
| | def check_document_end(self): |
| |
|
| | |
| | if self.column == 0: |
| | if self.prefix(3) == '...' \ |
| | and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
| | return True |
| |
|
| | def check_block_entry(self): |
| |
|
| | |
| | return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
| |
|
| | def check_key(self): |
| |
|
| | |
| | if self.flow_level: |
| | return True |
| |
|
| | |
| | else: |
| | return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
| |
|
| | def check_value(self): |
| |
|
| | |
| | if self.flow_level: |
| | return True |
| |
|
| | |
| | else: |
| | return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
| |
|
| | def check_plain(self): |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | ch = self.peek() |
| | return ch not in '\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`' \ |
| | or (self.peek(1) not in '\0 \t\r\n\x85\u2028\u2029' |
| | and (ch == '-' or (not self.flow_level and ch in '?:'))) |
| |
|
| | |
| |
|
| | def scan_to_next_token(self): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | if self.index == 0 and self.peek() == '\uFEFF': |
| | self.forward() |
| | found = False |
| | while not found: |
| | while self.peek() == ' ': |
| | self.forward() |
| | if self.peek() == '#': |
| | while self.peek() not in '\0\r\n\x85\u2028\u2029': |
| | self.forward() |
| | if self.scan_line_break(): |
| | if not self.flow_level: |
| | self.allow_simple_key = True |
| | else: |
| | found = True |
| |
|
| | def scan_directive(self): |
| | |
| | start_mark = self.get_mark() |
| | self.forward() |
| | name = self.scan_directive_name(start_mark) |
| | value = None |
| | if name == 'YAML': |
| | value = self.scan_yaml_directive_value(start_mark) |
| | end_mark = self.get_mark() |
| | elif name == 'TAG': |
| | value = self.scan_tag_directive_value(start_mark) |
| | end_mark = self.get_mark() |
| | else: |
| | end_mark = self.get_mark() |
| | while self.peek() not in '\0\r\n\x85\u2028\u2029': |
| | self.forward() |
| | self.scan_directive_ignored_line(start_mark) |
| | return DirectiveToken(name, value, start_mark, end_mark) |
| |
|
| | def scan_directive_name(self, start_mark): |
| | |
| | length = 0 |
| | ch = self.peek(length) |
| | while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
| | or ch in '-_': |
| | length += 1 |
| | ch = self.peek(length) |
| | if not length: |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected alphabetic or numeric character, but found %r" |
| | % ch, self.get_mark()) |
| | value = self.prefix(length) |
| | self.forward(length) |
| | ch = self.peek() |
| | if ch not in '\0 \r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected alphabetic or numeric character, but found %r" |
| | % ch, self.get_mark()) |
| | return value |
| |
|
| | def scan_yaml_directive_value(self, start_mark): |
| | |
| | while self.peek() == ' ': |
| | self.forward() |
| | major = self.scan_yaml_directive_number(start_mark) |
| | if self.peek() != '.': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected a digit or '.', but found %r" % self.peek(), |
| | self.get_mark()) |
| | self.forward() |
| | minor = self.scan_yaml_directive_number(start_mark) |
| | if self.peek() not in '\0 \r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected a digit or ' ', but found %r" % self.peek(), |
| | self.get_mark()) |
| | return (major, minor) |
| |
|
| | def scan_yaml_directive_number(self, start_mark): |
| | |
| | ch = self.peek() |
| | if not ('0' <= ch <= '9'): |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected a digit, but found %r" % ch, self.get_mark()) |
| | length = 0 |
| | while '0' <= self.peek(length) <= '9': |
| | length += 1 |
| | value = int(self.prefix(length)) |
| | self.forward(length) |
| | return value |
| |
|
| | def scan_tag_directive_value(self, start_mark): |
| | |
| | while self.peek() == ' ': |
| | self.forward() |
| | handle = self.scan_tag_directive_handle(start_mark) |
| | while self.peek() == ' ': |
| | self.forward() |
| | prefix = self.scan_tag_directive_prefix(start_mark) |
| | return (handle, prefix) |
| |
|
| | def scan_tag_directive_handle(self, start_mark): |
| | |
| | value = self.scan_tag_handle('directive', start_mark) |
| | ch = self.peek() |
| | if ch != ' ': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected ' ', but found %r" % ch, self.get_mark()) |
| | return value |
| |
|
| | def scan_tag_directive_prefix(self, start_mark): |
| | |
| | value = self.scan_tag_uri('directive', start_mark) |
| | ch = self.peek() |
| | if ch not in '\0 \r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected ' ', but found %r" % ch, self.get_mark()) |
| | return value |
| |
|
| | def scan_directive_ignored_line(self, start_mark): |
| | |
| | while self.peek() == ' ': |
| | self.forward() |
| | if self.peek() == '#': |
| | while self.peek() not in '\0\r\n\x85\u2028\u2029': |
| | self.forward() |
| | ch = self.peek() |
| | if ch not in '\0\r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a directive", start_mark, |
| | "expected a comment or a line break, but found %r" |
| | % ch, self.get_mark()) |
| | self.scan_line_break() |
| |
|
| | def scan_anchor(self, TokenClass): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | start_mark = self.get_mark() |
| | indicator = self.peek() |
| | if indicator == '*': |
| | name = 'alias' |
| | else: |
| | name = 'anchor' |
| | self.forward() |
| | length = 0 |
| | ch = self.peek(length) |
| | while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
| | or ch in '-_': |
| | length += 1 |
| | ch = self.peek(length) |
| | if not length: |
| | raise ScannerError("while scanning an %s" % name, start_mark, |
| | "expected alphabetic or numeric character, but found %r" |
| | % ch, self.get_mark()) |
| | value = self.prefix(length) |
| | self.forward(length) |
| | ch = self.peek() |
| | if ch not in '\0 \t\r\n\x85\u2028\u2029?:,]}%@`': |
| | raise ScannerError("while scanning an %s" % name, start_mark, |
| | "expected alphabetic or numeric character, but found %r" |
| | % ch, self.get_mark()) |
| | end_mark = self.get_mark() |
| | return TokenClass(value, start_mark, end_mark) |
| |
|
| | def scan_tag(self): |
| | |
| | start_mark = self.get_mark() |
| | ch = self.peek(1) |
| | if ch == '<': |
| | handle = None |
| | self.forward(2) |
| | suffix = self.scan_tag_uri('tag', start_mark) |
| | if self.peek() != '>': |
| | raise ScannerError("while parsing a tag", start_mark, |
| | "expected '>', but found %r" % self.peek(), |
| | self.get_mark()) |
| | self.forward() |
| | elif ch in '\0 \t\r\n\x85\u2028\u2029': |
| | handle = None |
| | suffix = '!' |
| | self.forward() |
| | else: |
| | length = 1 |
| | use_handle = False |
| | while ch not in '\0 \r\n\x85\u2028\u2029': |
| | if ch == '!': |
| | use_handle = True |
| | break |
| | length += 1 |
| | ch = self.peek(length) |
| | handle = '!' |
| | if use_handle: |
| | handle = self.scan_tag_handle('tag', start_mark) |
| | else: |
| | handle = '!' |
| | self.forward() |
| | suffix = self.scan_tag_uri('tag', start_mark) |
| | ch = self.peek() |
| | if ch not in '\0 \r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a tag", start_mark, |
| | "expected ' ', but found %r" % ch, self.get_mark()) |
| | value = (handle, suffix) |
| | end_mark = self.get_mark() |
| | return TagToken(value, start_mark, end_mark) |
| |
|
| | def scan_block_scalar(self, style): |
| | |
| |
|
| | if style == '>': |
| | folded = True |
| | else: |
| | folded = False |
| |
|
| | chunks = [] |
| | start_mark = self.get_mark() |
| |
|
| | |
| | self.forward() |
| | chomping, increment = self.scan_block_scalar_indicators(start_mark) |
| | self.scan_block_scalar_ignored_line(start_mark) |
| |
|
| | |
| | min_indent = self.indent+1 |
| | if min_indent < 1: |
| | min_indent = 1 |
| | if increment is None: |
| | breaks, max_indent, end_mark = self.scan_block_scalar_indentation() |
| | indent = max(min_indent, max_indent) |
| | else: |
| | indent = min_indent+increment-1 |
| | breaks, end_mark = self.scan_block_scalar_breaks(indent) |
| | line_break = '' |
| |
|
| | |
| | while self.column == indent and self.peek() != '\0': |
| | chunks.extend(breaks) |
| | leading_non_space = self.peek() not in ' \t' |
| | length = 0 |
| | while self.peek(length) not in '\0\r\n\x85\u2028\u2029': |
| | length += 1 |
| | chunks.append(self.prefix(length)) |
| | self.forward(length) |
| | line_break = self.scan_line_break() |
| | breaks, end_mark = self.scan_block_scalar_breaks(indent) |
| | if self.column == indent and self.peek() != '\0': |
| |
|
| | |
| | |
| | |
| | |
| | if folded and line_break == '\n' \ |
| | and leading_non_space and self.peek() not in ' \t': |
| | if not breaks: |
| | chunks.append(' ') |
| | else: |
| | chunks.append(line_break) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | else: |
| | break |
| |
|
| | |
| | if chomping is not False: |
| | chunks.append(line_break) |
| | if chomping is True: |
| | chunks.extend(breaks) |
| |
|
| | |
| | return ScalarToken(''.join(chunks), False, start_mark, end_mark, |
| | style) |
| |
|
| | def scan_block_scalar_indicators(self, start_mark): |
| | |
| | chomping = None |
| | increment = None |
| | ch = self.peek() |
| | if ch in '+-': |
| | if ch == '+': |
| | chomping = True |
| | else: |
| | chomping = False |
| | self.forward() |
| | ch = self.peek() |
| | if ch in '0123456789': |
| | increment = int(ch) |
| | if increment == 0: |
| | raise ScannerError("while scanning a block scalar", start_mark, |
| | "expected indentation indicator in the range 1-9, but found 0", |
| | self.get_mark()) |
| | self.forward() |
| | elif ch in '0123456789': |
| | increment = int(ch) |
| | if increment == 0: |
| | raise ScannerError("while scanning a block scalar", start_mark, |
| | "expected indentation indicator in the range 1-9, but found 0", |
| | self.get_mark()) |
| | self.forward() |
| | ch = self.peek() |
| | if ch in '+-': |
| | if ch == '+': |
| | chomping = True |
| | else: |
| | chomping = False |
| | self.forward() |
| | ch = self.peek() |
| | if ch not in '\0 \r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a block scalar", start_mark, |
| | "expected chomping or indentation indicators, but found %r" |
| | % ch, self.get_mark()) |
| | return chomping, increment |
| |
|
| | def scan_block_scalar_ignored_line(self, start_mark): |
| | |
| | while self.peek() == ' ': |
| | self.forward() |
| | if self.peek() == '#': |
| | while self.peek() not in '\0\r\n\x85\u2028\u2029': |
| | self.forward() |
| | ch = self.peek() |
| | if ch not in '\0\r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a block scalar", start_mark, |
| | "expected a comment or a line break, but found %r" % ch, |
| | self.get_mark()) |
| | self.scan_line_break() |
| |
|
| | def scan_block_scalar_indentation(self): |
| | |
| | chunks = [] |
| | max_indent = 0 |
| | end_mark = self.get_mark() |
| | while self.peek() in ' \r\n\x85\u2028\u2029': |
| | if self.peek() != ' ': |
| | chunks.append(self.scan_line_break()) |
| | end_mark = self.get_mark() |
| | else: |
| | self.forward() |
| | if self.column > max_indent: |
| | max_indent = self.column |
| | return chunks, max_indent, end_mark |
| |
|
| | def scan_block_scalar_breaks(self, indent): |
| | |
| | chunks = [] |
| | end_mark = self.get_mark() |
| | while self.column < indent and self.peek() == ' ': |
| | self.forward() |
| | while self.peek() in '\r\n\x85\u2028\u2029': |
| | chunks.append(self.scan_line_break()) |
| | end_mark = self.get_mark() |
| | while self.column < indent and self.peek() == ' ': |
| | self.forward() |
| | return chunks, end_mark |
| |
|
| | def scan_flow_scalar(self, style): |
| | |
| | |
| | |
| | |
| | |
| | |
| | if style == '"': |
| | double = True |
| | else: |
| | double = False |
| | chunks = [] |
| | start_mark = self.get_mark() |
| | quote = self.peek() |
| | self.forward() |
| | chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) |
| | while self.peek() != quote: |
| | chunks.extend(self.scan_flow_scalar_spaces(double, start_mark)) |
| | chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) |
| | self.forward() |
| | end_mark = self.get_mark() |
| | return ScalarToken(''.join(chunks), False, start_mark, end_mark, |
| | style) |
| |
|
| | ESCAPE_REPLACEMENTS = { |
| | '0': '\0', |
| | 'a': '\x07', |
| | 'b': '\x08', |
| | 't': '\x09', |
| | '\t': '\x09', |
| | 'n': '\x0A', |
| | 'v': '\x0B', |
| | 'f': '\x0C', |
| | 'r': '\x0D', |
| | 'e': '\x1B', |
| | ' ': '\x20', |
| | '\"': '\"', |
| | '\\': '\\', |
| | '/': '/', |
| | 'N': '\x85', |
| | '_': '\xA0', |
| | 'L': '\u2028', |
| | 'P': '\u2029', |
| | } |
| |
|
| | ESCAPE_CODES = { |
| | 'x': 2, |
| | 'u': 4, |
| | 'U': 8, |
| | } |
| |
|
| | def scan_flow_scalar_non_spaces(self, double, start_mark): |
| | |
| | chunks = [] |
| | while True: |
| | length = 0 |
| | while self.peek(length) not in '\'\"\\\0 \t\r\n\x85\u2028\u2029': |
| | length += 1 |
| | if length: |
| | chunks.append(self.prefix(length)) |
| | self.forward(length) |
| | ch = self.peek() |
| | if not double and ch == '\'' and self.peek(1) == '\'': |
| | chunks.append('\'') |
| | self.forward(2) |
| | elif (double and ch == '\'') or (not double and ch in '\"\\'): |
| | chunks.append(ch) |
| | self.forward() |
| | elif double and ch == '\\': |
| | self.forward() |
| | ch = self.peek() |
| | if ch in self.ESCAPE_REPLACEMENTS: |
| | chunks.append(self.ESCAPE_REPLACEMENTS[ch]) |
| | self.forward() |
| | elif ch in self.ESCAPE_CODES: |
| | length = self.ESCAPE_CODES[ch] |
| | self.forward() |
| | for k in range(length): |
| | if self.peek(k) not in '0123456789ABCDEFabcdef': |
| | raise ScannerError("while scanning a double-quoted scalar", start_mark, |
| | "expected escape sequence of %d hexadecimal numbers, but found %r" % |
| | (length, self.peek(k)), self.get_mark()) |
| | code = int(self.prefix(length), 16) |
| | chunks.append(chr(code)) |
| | self.forward(length) |
| | elif ch in '\r\n\x85\u2028\u2029': |
| | self.scan_line_break() |
| | chunks.extend(self.scan_flow_scalar_breaks(double, start_mark)) |
| | else: |
| | raise ScannerError("while scanning a double-quoted scalar", start_mark, |
| | "found unknown escape character %r" % ch, self.get_mark()) |
| | else: |
| | return chunks |
| |
|
| | def scan_flow_scalar_spaces(self, double, start_mark): |
| | |
| | chunks = [] |
| | length = 0 |
| | while self.peek(length) in ' \t': |
| | length += 1 |
| | whitespaces = self.prefix(length) |
| | self.forward(length) |
| | ch = self.peek() |
| | if ch == '\0': |
| | raise ScannerError("while scanning a quoted scalar", start_mark, |
| | "found unexpected end of stream", self.get_mark()) |
| | elif ch in '\r\n\x85\u2028\u2029': |
| | line_break = self.scan_line_break() |
| | breaks = self.scan_flow_scalar_breaks(double, start_mark) |
| | if line_break != '\n': |
| | chunks.append(line_break) |
| | elif not breaks: |
| | chunks.append(' ') |
| | chunks.extend(breaks) |
| | else: |
| | chunks.append(whitespaces) |
| | return chunks |
| |
|
| | def scan_flow_scalar_breaks(self, double, start_mark): |
| | |
| | chunks = [] |
| | while True: |
| | |
| | |
| | prefix = self.prefix(3) |
| | if (prefix == '---' or prefix == '...') \ |
| | and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
| | raise ScannerError("while scanning a quoted scalar", start_mark, |
| | "found unexpected document separator", self.get_mark()) |
| | while self.peek() in ' \t': |
| | self.forward() |
| | if self.peek() in '\r\n\x85\u2028\u2029': |
| | chunks.append(self.scan_line_break()) |
| | else: |
| | return chunks |
| |
|
| | def scan_plain(self): |
| | |
| | |
| | |
| | |
| | |
| | chunks = [] |
| | start_mark = self.get_mark() |
| | end_mark = start_mark |
| | indent = self.indent+1 |
| | |
| | |
| | |
| | |
| | spaces = [] |
| | while True: |
| | length = 0 |
| | if self.peek() == '#': |
| | break |
| | while True: |
| | ch = self.peek(length) |
| | if ch in '\0 \t\r\n\x85\u2028\u2029' \ |
| | or (ch == ':' and |
| | self.peek(length+1) in '\0 \t\r\n\x85\u2028\u2029' |
| | + (u',[]{}' if self.flow_level else u''))\ |
| | or (self.flow_level and ch in ',?[]{}'): |
| | break |
| | length += 1 |
| | if length == 0: |
| | break |
| | self.allow_simple_key = False |
| | chunks.extend(spaces) |
| | chunks.append(self.prefix(length)) |
| | self.forward(length) |
| | end_mark = self.get_mark() |
| | spaces = self.scan_plain_spaces(indent, start_mark) |
| | if not spaces or self.peek() == '#' \ |
| | or (not self.flow_level and self.column < indent): |
| | break |
| | return ScalarToken(''.join(chunks), True, start_mark, end_mark) |
| |
|
| | def scan_plain_spaces(self, indent, start_mark): |
| | |
| | |
| | |
| | chunks = [] |
| | length = 0 |
| | while self.peek(length) in ' ': |
| | length += 1 |
| | whitespaces = self.prefix(length) |
| | self.forward(length) |
| | ch = self.peek() |
| | if ch in '\r\n\x85\u2028\u2029': |
| | line_break = self.scan_line_break() |
| | self.allow_simple_key = True |
| | prefix = self.prefix(3) |
| | if (prefix == '---' or prefix == '...') \ |
| | and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
| | return |
| | breaks = [] |
| | while self.peek() in ' \r\n\x85\u2028\u2029': |
| | if self.peek() == ' ': |
| | self.forward() |
| | else: |
| | breaks.append(self.scan_line_break()) |
| | prefix = self.prefix(3) |
| | if (prefix == '---' or prefix == '...') \ |
| | and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
| | return |
| | if line_break != '\n': |
| | chunks.append(line_break) |
| | elif not breaks: |
| | chunks.append(' ') |
| | chunks.extend(breaks) |
| | elif whitespaces: |
| | chunks.append(whitespaces) |
| | return chunks |
| |
|
| | def scan_tag_handle(self, name, start_mark): |
| | |
| | |
| | |
| | ch = self.peek() |
| | if ch != '!': |
| | raise ScannerError("while scanning a %s" % name, start_mark, |
| | "expected '!', but found %r" % ch, self.get_mark()) |
| | length = 1 |
| | ch = self.peek(length) |
| | if ch != ' ': |
| | while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
| | or ch in '-_': |
| | length += 1 |
| | ch = self.peek(length) |
| | if ch != '!': |
| | self.forward(length) |
| | raise ScannerError("while scanning a %s" % name, start_mark, |
| | "expected '!', but found %r" % ch, self.get_mark()) |
| | length += 1 |
| | value = self.prefix(length) |
| | self.forward(length) |
| | return value |
| |
|
| | def scan_tag_uri(self, name, start_mark): |
| | |
| | |
| | chunks = [] |
| | length = 0 |
| | ch = self.peek(length) |
| | while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
| | or ch in '-;/?:@&=+$,_.!~*\'()[]%': |
| | if ch == '%': |
| | chunks.append(self.prefix(length)) |
| | self.forward(length) |
| | length = 0 |
| | chunks.append(self.scan_uri_escapes(name, start_mark)) |
| | else: |
| | length += 1 |
| | ch = self.peek(length) |
| | if length: |
| | chunks.append(self.prefix(length)) |
| | self.forward(length) |
| | length = 0 |
| | if not chunks: |
| | raise ScannerError("while parsing a %s" % name, start_mark, |
| | "expected URI, but found %r" % ch, self.get_mark()) |
| | return ''.join(chunks) |
| |
|
| | def scan_uri_escapes(self, name, start_mark): |
| | |
| | codes = [] |
| | mark = self.get_mark() |
| | while self.peek() == '%': |
| | self.forward() |
| | for k in range(2): |
| | if self.peek(k) not in '0123456789ABCDEFabcdef': |
| | raise ScannerError("while scanning a %s" % name, start_mark, |
| | "expected URI escape sequence of 2 hexadecimal numbers, but found %r" |
| | % self.peek(k), self.get_mark()) |
| | codes.append(int(self.prefix(2), 16)) |
| | self.forward(2) |
| | try: |
| | value = bytes(codes).decode('utf-8') |
| | except UnicodeDecodeError as exc: |
| | raise ScannerError("while scanning a %s" % name, start_mark, str(exc), mark) |
| | return value |
| |
|
| | def scan_line_break(self): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | ch = self.peek() |
| | if ch in '\r\n\x85': |
| | if self.prefix(2) == '\r\n': |
| | self.forward(2) |
| | else: |
| | self.forward() |
| | return '\n' |
| | elif ch in '\u2028\u2029': |
| | self.forward() |
| | return ch |
| | return '' |
| |
|