| | |
| | |
| | |
| | |
| | |
| | |
| | import warnings |
| |
|
| | from pyasn1 import error |
| | from pyasn1.codec.ber import encoder |
| | from pyasn1.type import univ |
| | from pyasn1.type import useful |
| |
|
| | __all__ = ['Encoder', 'encode'] |
| |
|
| |
|
| | class BooleanEncoder(encoder.IntegerEncoder): |
| | def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| | if value == 0: |
| | substrate = (0,) |
| | else: |
| | substrate = (255,) |
| | return substrate, False, False |
| |
|
| |
|
| | class RealEncoder(encoder.RealEncoder): |
| | def _chooseEncBase(self, value): |
| | m, b, e = value |
| | return self._dropFloatingPoint(m, b, e) |
| |
|
| |
|
| | |
| |
|
| | class TimeEncoderMixIn(object): |
| | Z_CHAR = ord('Z') |
| | PLUS_CHAR = ord('+') |
| | MINUS_CHAR = ord('-') |
| | COMMA_CHAR = ord(',') |
| | DOT_CHAR = ord('.') |
| | ZERO_CHAR = ord('0') |
| |
|
| | MIN_LENGTH = 12 |
| | MAX_LENGTH = 19 |
| |
|
| | def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | if asn1Spec is not None: |
| | value = asn1Spec.clone(value) |
| |
|
| | numbers = value.asNumbers() |
| |
|
| | if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers: |
| | raise error.PyAsn1Error('Must be UTC time: %r' % value) |
| |
|
| | if numbers[-1] != self.Z_CHAR: |
| | raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value) |
| |
|
| | if self.COMMA_CHAR in numbers: |
| | raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value) |
| |
|
| | if self.DOT_CHAR in numbers: |
| |
|
| | isModified = False |
| |
|
| | numbers = list(numbers) |
| |
|
| | searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1) |
| |
|
| | while numbers[searchIndex] != self.DOT_CHAR: |
| | if numbers[searchIndex] == self.ZERO_CHAR: |
| | del numbers[searchIndex] |
| | isModified = True |
| |
|
| | searchIndex -= 1 |
| |
|
| | searchIndex += 1 |
| |
|
| | if searchIndex < len(numbers): |
| | if numbers[searchIndex] == self.Z_CHAR: |
| | |
| | del numbers[searchIndex - 1] |
| | isModified = True |
| |
|
| | if isModified: |
| | value = value.clone(numbers) |
| |
|
| | if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH: |
| | raise error.PyAsn1Error('Length constraint violated: %r' % value) |
| |
|
| | options.update(maxChunkSize=1000) |
| |
|
| | return encoder.OctetStringEncoder.encodeValue( |
| | self, value, asn1Spec, encodeFun, **options |
| | ) |
| |
|
| |
|
| | class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): |
| | MIN_LENGTH = 12 |
| | MAX_LENGTH = 20 |
| |
|
| |
|
| | class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): |
| | MIN_LENGTH = 10 |
| | MAX_LENGTH = 14 |
| |
|
| |
|
| | class SetOfEncoder(encoder.SequenceOfEncoder): |
| | def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| | chunks = self._encodeComponents( |
| | value, asn1Spec, encodeFun, **options) |
| |
|
| | |
| | if len(chunks) > 1: |
| | zero = b'\x00' |
| | maxLen = max(map(len, chunks)) |
| | paddedChunks = [ |
| | (x.ljust(maxLen, zero), x) for x in chunks |
| | ] |
| | paddedChunks.sort(key=lambda x: x[0]) |
| |
|
| | chunks = [x[1] for x in paddedChunks] |
| |
|
| | return b''.join(chunks), True, True |
| |
|
| |
|
| | class SequenceOfEncoder(encoder.SequenceOfEncoder): |
| | def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| |
|
| | if options.get('ifNotEmpty', False) and not len(value): |
| | return b'', True, True |
| |
|
| | chunks = self._encodeComponents( |
| | value, asn1Spec, encodeFun, **options) |
| |
|
| | return b''.join(chunks), True, True |
| |
|
| |
|
| | class SetEncoder(encoder.SequenceEncoder): |
| | @staticmethod |
| | def _componentSortKey(componentAndType): |
| | """Sort SET components by tag |
| | |
| | Sort regardless of the Choice value (static sort) |
| | """ |
| | component, asn1Spec = componentAndType |
| |
|
| | if asn1Spec is None: |
| | asn1Spec = component |
| |
|
| | if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet: |
| | if asn1Spec.tagSet: |
| | return asn1Spec.tagSet |
| | else: |
| | return asn1Spec.componentType.minTagSet |
| | else: |
| | return asn1Spec.tagSet |
| |
|
| | def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| |
|
| | substrate = b'' |
| |
|
| | comps = [] |
| | compsMap = {} |
| |
|
| | if asn1Spec is None: |
| | |
| | inconsistency = value.isInconsistent |
| | if inconsistency: |
| | raise error.PyAsn1Error( |
| | f"ASN.1 object {value.__class__.__name__} is inconsistent") |
| |
|
| | namedTypes = value.componentType |
| |
|
| | for idx, component in enumerate(value.values()): |
| | if namedTypes: |
| | namedType = namedTypes[idx] |
| |
|
| | if namedType.isOptional and not component.isValue: |
| | continue |
| |
|
| | if namedType.isDefaulted and component == namedType.asn1Object: |
| | continue |
| |
|
| | compsMap[id(component)] = namedType |
| |
|
| | else: |
| | compsMap[id(component)] = None |
| |
|
| | comps.append((component, asn1Spec)) |
| |
|
| | else: |
| | |
| | for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): |
| |
|
| | try: |
| | component = value[namedType.name] |
| |
|
| | except KeyError: |
| | raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) |
| |
|
| | if namedType.isOptional and namedType.name not in value: |
| | continue |
| |
|
| | if namedType.isDefaulted and component == namedType.asn1Object: |
| | continue |
| |
|
| | compsMap[id(component)] = namedType |
| | comps.append((component, asn1Spec[idx])) |
| |
|
| | for comp, compType in sorted(comps, key=self._componentSortKey): |
| | namedType = compsMap[id(comp)] |
| |
|
| | if namedType: |
| | options.update(ifNotEmpty=namedType.isOptional) |
| |
|
| | chunk = encodeFun(comp, compType, **options) |
| |
|
| | |
| | if namedType and namedType.openType: |
| | wrapType = namedType.asn1Object |
| | if wrapType.tagSet and not wrapType.isSameTypeWith(comp): |
| | chunk = encodeFun(chunk, wrapType, **options) |
| |
|
| | substrate += chunk |
| |
|
| | return substrate, True, True |
| |
|
| |
|
| | class SequenceEncoder(encoder.SequenceEncoder): |
| | omitEmptyOptionals = True |
| |
|
| |
|
| | TAG_MAP = encoder.TAG_MAP.copy() |
| |
|
| | TAG_MAP.update({ |
| | univ.Boolean.tagSet: BooleanEncoder(), |
| | univ.Real.tagSet: RealEncoder(), |
| | useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(), |
| | useful.UTCTime.tagSet: UTCTimeEncoder(), |
| | |
| | univ.SetOf.tagSet: SetOfEncoder(), |
| | univ.Sequence.typeId: SequenceEncoder() |
| | }) |
| |
|
| | TYPE_MAP = encoder.TYPE_MAP.copy() |
| |
|
| | TYPE_MAP.update({ |
| | univ.Boolean.typeId: BooleanEncoder(), |
| | univ.Real.typeId: RealEncoder(), |
| | useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(), |
| | useful.UTCTime.typeId: UTCTimeEncoder(), |
| | |
| | univ.Set.typeId: SetEncoder(), |
| | univ.SetOf.typeId: SetOfEncoder(), |
| | univ.Sequence.typeId: SequenceEncoder(), |
| | univ.SequenceOf.typeId: SequenceOfEncoder() |
| | }) |
| |
|
| |
|
| | class SingleItemEncoder(encoder.SingleItemEncoder): |
| | fixedDefLengthMode = False |
| | fixedChunkSize = 1000 |
| |
|
| | TAG_MAP = TAG_MAP |
| | TYPE_MAP = TYPE_MAP |
| |
|
| |
|
| | class Encoder(encoder.Encoder): |
| | SINGLE_ITEM_ENCODER = SingleItemEncoder |
| |
|
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | encode = Encoder() |
| |
|
| | |
| |
|
| | def __getattr__(attr: str): |
| | if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): |
| | warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) |
| | return globals()[newAttr] |
| | raise AttributeError(attr) |
| |
|