Spaces:
Running
Running
| import itertools | |
| import logging | |
| import re | |
| import struct | |
| from hashlib import md5, sha256, sha384, sha512 | |
| from typing import ( | |
| Any, | |
| Callable, | |
| Dict, | |
| Iterable, | |
| Iterator, | |
| KeysView, | |
| List, | |
| Optional, | |
| Sequence, | |
| Tuple, | |
| Type, | |
| Union, | |
| cast, | |
| ) | |
| from cryptography.hazmat.backends import default_backend | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
| from pdf2zh import settings | |
| from pdf2zh.arcfour import Arcfour | |
| from pdf2zh.data_structures import NumberTree | |
| from pdf2zh.pdfexceptions import ( | |
| PDFException, | |
| PDFKeyError, | |
| PDFObjectNotFound, | |
| PDFTypeError, | |
| ) | |
| from pdf2zh.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError | |
| from pdf2zh.pdftypes import ( | |
| DecipherCallable, | |
| PDFStream, | |
| decipher_all, | |
| dict_value, | |
| int_value, | |
| list_value, | |
| str_value, | |
| stream_value, | |
| uint_value, | |
| ) | |
| from pdf2zh.psexceptions import PSEOF | |
| from pdf2zh.psparser import KWD, LIT, literal_name | |
| from pdf2zh.utils import ( | |
| choplist, | |
| decode_text, | |
| format_int_alpha, | |
| format_int_roman, | |
| nunpack, | |
| ) | |
| log = logging.getLogger(__name__) | |
| class PDFNoValidXRef(PDFSyntaxError): | |
| pass | |
| class PDFNoValidXRefWarning(SyntaxWarning): | |
| """Legacy warning for missing xref. | |
| Not used anymore because warnings.warn is replaced by logger.Logger.warn. | |
| """ | |
| class PDFNoOutlines(PDFException): | |
| pass | |
| class PDFNoPageLabels(PDFException): | |
| pass | |
| class PDFDestinationNotFound(PDFException): | |
| pass | |
| class PDFEncryptionError(PDFException): | |
| pass | |
| class PDFPasswordIncorrect(PDFEncryptionError): | |
| pass | |
| class PDFEncryptionWarning(UserWarning): | |
| """Legacy warning for failed decryption. | |
| Not used anymore because warnings.warn is replaced by logger.Logger.warn. | |
| """ | |
| class PDFTextExtractionNotAllowedWarning(UserWarning): | |
| """Legacy warning for PDF that does not allow extraction. | |
| Not used anymore because warnings.warn is replaced by logger.Logger.warn. | |
| """ | |
| class PDFTextExtractionNotAllowed(PDFEncryptionError): | |
| pass | |
| # some predefined literals and keywords. | |
| LITERAL_OBJSTM = LIT("ObjStm") | |
| LITERAL_XREF = LIT("XRef") | |
| LITERAL_CATALOG = LIT("Catalog") | |
| class PDFBaseXRef: | |
| def get_trailer(self) -> Dict[str, Any]: | |
| raise NotImplementedError | |
| def get_objids(self) -> Iterable[int]: | |
| return [] | |
| # Must return | |
| # (strmid, index, genno) | |
| # or (None, pos, genno) | |
| def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: | |
| raise PDFKeyError(objid) | |
| def load(self, parser: PDFParser) -> None: | |
| raise NotImplementedError | |
| class PDFXRef(PDFBaseXRef): | |
| def __init__(self) -> None: | |
| self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {} | |
| self.trailer: Dict[str, Any] = {} | |
| def __repr__(self) -> str: | |
| return "<PDFXRef: offsets=%r>" % (self.offsets.keys()) | |
| def load(self, parser: PDFParser) -> None: | |
| while True: | |
| try: | |
| (pos, line) = parser.nextline() | |
| line = line.strip() | |
| if not line: | |
| continue | |
| except PSEOF: | |
| raise PDFNoValidXRef("Unexpected EOF - file corrupted?") | |
| if line.startswith(b"trailer"): | |
| parser.seek(pos) | |
| break | |
| f = line.split(b" ") | |
| if len(f) != 2: | |
| error_msg = f"Trailer not found: {parser!r}: line={line!r}" | |
| raise PDFNoValidXRef(error_msg) | |
| try: | |
| (start, nobjs) = map(int, f) | |
| except ValueError: | |
| error_msg = f"Invalid line: {parser!r}: line={line!r}" | |
| raise PDFNoValidXRef(error_msg) | |
| for objid in range(start, start + nobjs): | |
| try: | |
| (_, line) = parser.nextline() | |
| line = line.strip() | |
| except PSEOF: | |
| raise PDFNoValidXRef("Unexpected EOF - file corrupted?") | |
| f = line.split(b" ") | |
| if len(f) != 3: | |
| error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" | |
| raise PDFNoValidXRef(error_msg) | |
| (pos_b, genno_b, use_b) = f | |
| if use_b != b"n": | |
| continue | |
| self.offsets[objid] = (None, int(pos_b), int(genno_b)) | |
| # log.debug("xref objects: %r", self.offsets) | |
| self.load_trailer(parser) | |
| def load_trailer(self, parser: PDFParser) -> None: | |
| try: | |
| (_, kwd) = parser.nexttoken() | |
| assert kwd is KWD(b"trailer"), str(kwd) | |
| _, (_, dic) = parser.nextobject() | |
| except PSEOF: | |
| x = parser.pop(1) | |
| if not x: | |
| raise PDFNoValidXRef("Unexpected EOF - file corrupted") | |
| (_, dic) = x[0] | |
| self.trailer.update(dict_value(dic)) | |
| # log.debug("trailer=%r", self.trailer) | |
| def get_trailer(self) -> Dict[str, Any]: | |
| return self.trailer | |
| def get_objids(self) -> KeysView[int]: | |
| return self.offsets.keys() | |
| def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: | |
| return self.offsets[objid] | |
| class PDFXRefFallback(PDFXRef): | |
| def __repr__(self) -> str: | |
| return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys()) | |
| PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") | |
| def load(self, parser: PDFParser) -> None: | |
| parser.seek(0) | |
| while 1: | |
| try: | |
| (pos, line_bytes) = parser.nextline() | |
| except PSEOF: | |
| break | |
| if line_bytes.startswith(b"trailer"): | |
| parser.seek(pos) | |
| self.load_trailer(parser) | |
| # log.debug("trailer: %r", self.trailer) | |
| break | |
| line = line_bytes.decode("latin-1") # default pdf encoding | |
| m = self.PDFOBJ_CUE.match(line) | |
| if not m: | |
| continue | |
| (objid_s, genno_s) = m.groups() | |
| objid = int(objid_s) | |
| genno = int(genno_s) | |
| self.offsets[objid] = (None, pos, genno) | |
| # expand ObjStm. | |
| parser.seek(pos) | |
| _, (_, obj) = parser.nextobject() | |
| if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: | |
| stream = stream_value(obj) | |
| try: | |
| n = stream["N"] | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFSyntaxError("N is not defined: %r" % stream) | |
| n = 0 | |
| parser1 = PDFStreamParser(stream.get_data()) | |
| objs: List[int] = [] | |
| try: | |
| while 1: | |
| _, (_, obj) = parser1.nextobject() | |
| objs.append(cast(int, obj)) | |
| except PSEOF: | |
| pass | |
| n = min(n, len(objs) // 2) | |
| for index in range(n): | |
| objid1 = objs[index * 2] | |
| self.offsets[objid1] = (objid, index, 0) | |
| class PDFXRefStream(PDFBaseXRef): | |
| def __init__(self) -> None: | |
| self.data: Optional[bytes] = None | |
| self.entlen: Optional[int] = None | |
| self.fl1: Optional[int] = None | |
| self.fl2: Optional[int] = None | |
| self.fl3: Optional[int] = None | |
| self.ranges: List[Tuple[int, int]] = [] | |
| def __repr__(self) -> str: | |
| return "<PDFXRefStream: ranges=%r>" % (self.ranges) | |
| def load(self, parser: PDFParser) -> None: | |
| (_, objid) = parser.nexttoken() # ignored | |
| (_, genno) = parser.nexttoken() # ignored | |
| (_, kwd) = parser.nexttoken() | |
| _, (_, stream) = parser.nextobject() | |
| if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: | |
| raise PDFNoValidXRef("Invalid PDF stream spec.") | |
| size = stream["Size"] | |
| index_array = stream.get("Index", (0, size)) | |
| if len(index_array) % 2 != 0: | |
| raise PDFSyntaxError("Invalid index number") | |
| self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array))) | |
| (self.fl1, self.fl2, self.fl3) = stream["W"] | |
| assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None | |
| self.data = stream.get_data() | |
| self.entlen = self.fl1 + self.fl2 + self.fl3 | |
| self.trailer = stream.attrs | |
| # log.debug( | |
| # "xref stream: objid=%s, fields=%d,%d,%d", | |
| # ", ".join(map(repr, self.ranges)), | |
| # self.fl1, | |
| # self.fl2, | |
| # self.fl3, | |
| # ) | |
| def get_trailer(self) -> Dict[str, Any]: | |
| return self.trailer | |
| def get_objids(self) -> Iterator[int]: | |
| for start, nobjs in self.ranges: | |
| for i in range(nobjs): | |
| assert self.entlen is not None | |
| assert self.data is not None | |
| offset = self.entlen * i | |
| ent = self.data[offset : offset + self.entlen] | |
| f1 = nunpack(ent[: self.fl1], 1) | |
| if f1 == 1 or f1 == 2: | |
| yield start + i | |
| def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: | |
| index = 0 | |
| for start, nobjs in self.ranges: | |
| if start <= objid and objid < start + nobjs: | |
| index += objid - start | |
| break | |
| else: | |
| index += nobjs | |
| else: | |
| raise PDFKeyError(objid) | |
| assert self.entlen is not None | |
| assert self.data is not None | |
| assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None | |
| offset = self.entlen * index | |
| ent = self.data[offset : offset + self.entlen] | |
| f1 = nunpack(ent[: self.fl1], 1) | |
| f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) | |
| f3 = nunpack(ent[self.fl1 + self.fl2 :]) | |
| if f1 == 1: | |
| return (None, f2, f3) | |
| elif f1 == 2: | |
| return (f2, f3, 0) | |
| else: | |
| # this is a free object | |
| raise PDFKeyError(objid) | |
| class PDFStandardSecurityHandler: | |
| PASSWORD_PADDING = ( | |
| b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08" | |
| b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" | |
| ) | |
| supported_revisions: Tuple[int, ...] = (2, 3) | |
| def __init__( | |
| self, | |
| docid: Sequence[bytes], | |
| param: Dict[str, Any], | |
| password: str = "", | |
| ) -> None: | |
| self.docid = docid | |
| self.param = param | |
| self.password = password | |
| self.init() | |
| def init(self) -> None: | |
| self.init_params() | |
| if self.r not in self.supported_revisions: | |
| error_msg = "Unsupported revision: param=%r" % self.param | |
| raise PDFEncryptionError(error_msg) | |
| self.init_key() | |
| def init_params(self) -> None: | |
| self.v = int_value(self.param.get("V", 0)) | |
| self.r = int_value(self.param["R"]) | |
| self.p = uint_value(self.param["P"], 32) | |
| self.o = str_value(self.param["O"]) | |
| self.u = str_value(self.param["U"]) | |
| self.length = int_value(self.param.get("Length", 40)) | |
| def init_key(self) -> None: | |
| self.key = self.authenticate(self.password) | |
| if self.key is None: | |
| raise PDFPasswordIncorrect | |
| def is_printable(self) -> bool: | |
| return bool(self.p & 4) | |
| def is_modifiable(self) -> bool: | |
| return bool(self.p & 8) | |
| def is_extractable(self) -> bool: | |
| return bool(self.p & 16) | |
| def compute_u(self, key: bytes) -> bytes: | |
| if self.r == 2: | |
| # Algorithm 3.4 | |
| return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 | |
| else: | |
| # Algorithm 3.5 | |
| hash = md5(self.PASSWORD_PADDING) # 2 | |
| hash.update(self.docid[0]) # 3 | |
| result = Arcfour(key).encrypt(hash.digest()) # 4 | |
| for i in range(1, 20): # 5 | |
| k = b"".join(bytes((c ^ i,)) for c in iter(key)) | |
| result = Arcfour(k).encrypt(result) | |
| result += result # 6 | |
| return result | |
| def compute_encryption_key(self, password: bytes) -> bytes: | |
| # Algorithm 3.2 | |
| password = (password + self.PASSWORD_PADDING)[:32] # 1 | |
| hash = md5(password) # 2 | |
| hash.update(self.o) # 3 | |
| # See https://github.com/pdf2zh/pdf2zh.six/issues/186 | |
| hash.update(struct.pack("<L", self.p)) # 4 | |
| hash.update(self.docid[0]) # 5 | |
| if self.r >= 4: | |
| if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata: | |
| hash.update(b"\xff\xff\xff\xff") | |
| result = hash.digest() | |
| n = 5 | |
| if self.r >= 3: | |
| n = self.length // 8 | |
| for _ in range(50): | |
| result = md5(result[:n]).digest() | |
| return result[:n] | |
| def authenticate(self, password: str) -> Optional[bytes]: | |
| password_bytes = password.encode("latin1") | |
| key = self.authenticate_user_password(password_bytes) | |
| if key is None: | |
| key = self.authenticate_owner_password(password_bytes) | |
| return key | |
| def authenticate_user_password(self, password: bytes) -> Optional[bytes]: | |
| key = self.compute_encryption_key(password) | |
| if self.verify_encryption_key(key): | |
| return key | |
| else: | |
| return None | |
| def verify_encryption_key(self, key: bytes) -> bool: | |
| # Algorithm 3.6 | |
| u = self.compute_u(key) | |
| if self.r == 2: | |
| return u == self.u | |
| return u[:16] == self.u[:16] | |
| def authenticate_owner_password(self, password: bytes) -> Optional[bytes]: | |
| # Algorithm 3.7 | |
| password = (password + self.PASSWORD_PADDING)[:32] | |
| hash = md5(password) | |
| if self.r >= 3: | |
| for _ in range(50): | |
| hash = md5(hash.digest()) | |
| n = 5 | |
| if self.r >= 3: | |
| n = self.length // 8 | |
| key = hash.digest()[:n] | |
| if self.r == 2: | |
| user_password = Arcfour(key).decrypt(self.o) | |
| else: | |
| user_password = self.o | |
| for i in range(19, -1, -1): | |
| k = b"".join(bytes((c ^ i,)) for c in iter(key)) | |
| user_password = Arcfour(k).decrypt(user_password) | |
| return self.authenticate_user_password(user_password) | |
| def decrypt( | |
| self, | |
| objid: int, | |
| genno: int, | |
| data: bytes, | |
| attrs: Optional[Dict[str, Any]] = None, | |
| ) -> bytes: | |
| return self.decrypt_rc4(objid, genno, data) | |
| def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: | |
| assert self.key is not None | |
| key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] | |
| hash = md5(key) | |
| key = hash.digest()[: min(len(key), 16)] | |
| return Arcfour(key).decrypt(data) | |
| class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): | |
| supported_revisions: Tuple[int, ...] = (4,) | |
| def init_params(self) -> None: | |
| super().init_params() | |
| self.length = 128 | |
| self.cf = dict_value(self.param.get("CF")) | |
| self.stmf = literal_name(self.param["StmF"]) | |
| self.strf = literal_name(self.param["StrF"]) | |
| self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) | |
| if self.stmf != self.strf: | |
| error_msg = "Unsupported crypt filter: param=%r" % self.param | |
| raise PDFEncryptionError(error_msg) | |
| self.cfm = {} | |
| for k, v in self.cf.items(): | |
| f = self.get_cfm(literal_name(v["CFM"])) | |
| if f is None: | |
| error_msg = "Unknown crypt filter method: param=%r" % self.param | |
| raise PDFEncryptionError(error_msg) | |
| self.cfm[k] = f | |
| self.cfm["Identity"] = self.decrypt_identity | |
| if self.strf not in self.cfm: | |
| error_msg = "Undefined crypt filter: param=%r" % self.param | |
| raise PDFEncryptionError(error_msg) | |
| def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: | |
| if name == "V2": | |
| return self.decrypt_rc4 | |
| elif name == "AESV2": | |
| return self.decrypt_aes128 | |
| else: | |
| return None | |
| def decrypt( | |
| self, | |
| objid: int, | |
| genno: int, | |
| data: bytes, | |
| attrs: Optional[Dict[str, Any]] = None, | |
| name: Optional[str] = None, | |
| ) -> bytes: | |
| if not self.encrypt_metadata and attrs is not None: | |
| t = attrs.get("Type") | |
| if t is not None and literal_name(t) == "Metadata": | |
| return data | |
| if name is None: | |
| name = self.strf | |
| return self.cfm[name](objid, genno, data) | |
| def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: | |
| return data | |
| def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: | |
| assert self.key is not None | |
| key = ( | |
| self.key | |
| + struct.pack("<L", objid)[:3] | |
| + struct.pack("<L", genno)[:2] | |
| + b"sAlT" | |
| ) | |
| hash = md5(key) | |
| key = hash.digest()[: min(len(key), 16)] | |
| initialization_vector = data[:16] | |
| ciphertext = data[16:] | |
| cipher = Cipher( | |
| algorithms.AES(key), | |
| modes.CBC(initialization_vector), | |
| backend=default_backend(), | |
| ) # type: ignore | |
| return cipher.decryptor().update(ciphertext) # type: ignore | |
| class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): | |
| supported_revisions = (5, 6) | |
| def init_params(self) -> None: | |
| super().init_params() | |
| self.length = 256 | |
| self.oe = str_value(self.param["OE"]) | |
| self.ue = str_value(self.param["UE"]) | |
| self.o_hash = self.o[:32] | |
| self.o_validation_salt = self.o[32:40] | |
| self.o_key_salt = self.o[40:] | |
| self.u_hash = self.u[:32] | |
| self.u_validation_salt = self.u[32:40] | |
| self.u_key_salt = self.u[40:] | |
| def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: | |
| if name == "AESV3": | |
| return self.decrypt_aes256 | |
| else: | |
| return None | |
| def authenticate(self, password: str) -> Optional[bytes]: | |
| password_b = self._normalize_password(password) | |
| hash = self._password_hash(password_b, self.o_validation_salt, self.u) | |
| if hash == self.o_hash: | |
| hash = self._password_hash(password_b, self.o_key_salt, self.u) | |
| cipher = Cipher( | |
| algorithms.AES(hash), | |
| modes.CBC(b"\0" * 16), | |
| backend=default_backend(), | |
| ) # type: ignore | |
| return cipher.decryptor().update(self.oe) # type: ignore | |
| hash = self._password_hash(password_b, self.u_validation_salt) | |
| if hash == self.u_hash: | |
| hash = self._password_hash(password_b, self.u_key_salt) | |
| cipher = Cipher( | |
| algorithms.AES(hash), | |
| modes.CBC(b"\0" * 16), | |
| backend=default_backend(), | |
| ) # type: ignore | |
| return cipher.decryptor().update(self.ue) # type: ignore | |
| return None | |
| def _normalize_password(self, password: str) -> bytes: | |
| if self.r == 6: | |
| # saslprep expects non-empty strings, apparently | |
| if not password: | |
| return b"" | |
| from pdf2zh._saslprep import saslprep | |
| password = saslprep(password) | |
| return password.encode("utf-8")[:127] | |
| def _password_hash( | |
| self, | |
| password: bytes, | |
| salt: bytes, | |
| vector: Optional[bytes] = None, | |
| ) -> bytes: | |
| """Compute password hash depending on revision number""" | |
| if self.r == 5: | |
| return self._r5_password(password, salt, vector) | |
| return self._r6_password(password, salt[0:8], vector) | |
| def _r5_password( | |
| self, | |
| password: bytes, | |
| salt: bytes, | |
| vector: Optional[bytes] = None, | |
| ) -> bytes: | |
| """Compute the password for revision 5""" | |
| hash = sha256(password) | |
| hash.update(salt) | |
| if vector is not None: | |
| hash.update(vector) | |
| return hash.digest() | |
| def _r6_password( | |
| self, | |
| password: bytes, | |
| salt: bytes, | |
| vector: Optional[bytes] = None, | |
| ) -> bytes: | |
| """Compute the password for revision 6""" | |
| initial_hash = sha256(password) | |
| initial_hash.update(salt) | |
| if vector is not None: | |
| initial_hash.update(vector) | |
| k = initial_hash.digest() | |
| hashes = (sha256, sha384, sha512) | |
| round_no = last_byte_val = 0 | |
| while round_no < 64 or last_byte_val > round_no - 32: | |
| k1 = (password + k + (vector or b"")) * 64 | |
| e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) | |
| # compute the first 16 bytes of e, | |
| # interpreted as an unsigned integer mod 3 | |
| next_hash = hashes[self._bytes_mod_3(e[:16])] | |
| k = next_hash(e).digest() | |
| last_byte_val = e[len(e) - 1] | |
| round_no += 1 | |
| return k[:32] | |
| def _bytes_mod_3(input_bytes: bytes) -> int: | |
| # 256 is 1 mod 3, so we can just sum 'em | |
| return sum(b % 3 for b in input_bytes) % 3 | |
| def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: | |
| cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) | |
| encryptor = cipher.encryptor() # type: ignore | |
| return encryptor.update(data) + encryptor.finalize() # type: ignore | |
| def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: | |
| initialization_vector = data[:16] | |
| ciphertext = data[16:] | |
| assert self.key is not None | |
| cipher = Cipher( | |
| algorithms.AES(self.key), | |
| modes.CBC(initialization_vector), | |
| backend=default_backend(), | |
| ) # type: ignore | |
| return cipher.decryptor().update(ciphertext) # type: ignore | |
| class PDFDocument: | |
| """PDFDocument object represents a PDF document. | |
| Since a PDF file can be very big, normally it is not loaded at | |
| once. So PDF document has to cooperate with a PDF parser in order to | |
| dynamically import the data as processing goes. | |
| Typical usage: | |
| doc = PDFDocument(parser, password) | |
| obj = doc.getobj(objid) | |
| """ | |
| security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = { | |
| 1: PDFStandardSecurityHandler, | |
| 2: PDFStandardSecurityHandler, | |
| 4: PDFStandardSecurityHandlerV4, | |
| 5: PDFStandardSecurityHandlerV5, | |
| } | |
| def __init__( | |
| self, | |
| parser: PDFParser, | |
| password: str = "", | |
| caching: bool = True, | |
| fallback: bool = True, | |
| ) -> None: | |
| """Set the document to use a given PDFParser object.""" | |
| self.caching = caching | |
| self.xrefs: List[PDFBaseXRef] = [] | |
| self.info = [] | |
| self.catalog: Dict[str, Any] = {} | |
| self.encryption: Optional[Tuple[Any, Any]] = None | |
| self.decipher: Optional[DecipherCallable] = None | |
| self._parser = None | |
| self._cached_objs: Dict[int, Tuple[object, int]] = {} | |
| self._parsed_objs: Dict[int, Tuple[List[object], int]] = {} | |
| self._parser = parser | |
| self._parser.set_document(self) | |
| self.is_printable = self.is_modifiable = self.is_extractable = True | |
| # Retrieve the information of each header that was appended | |
| # (maybe multiple times) at the end of the document. | |
| try: | |
| # print('FIND XREF') | |
| pos = self.find_xref(parser) | |
| self.pos = pos | |
| self.read_xref_from(parser, pos, self.xrefs) | |
| except PDFNoValidXRef: | |
| if fallback: | |
| parser.fallback = True | |
| newxref = PDFXRefFallback() | |
| newxref.load(parser) | |
| self.xrefs.append(newxref) | |
| # print(f'XREF {self.xrefs}') | |
| for xref in self.xrefs: | |
| trailer = xref.get_trailer() | |
| if not trailer: | |
| continue | |
| # If there's an encryption info, remember it. | |
| if "Encrypt" in trailer: | |
| if "ID" in trailer: | |
| id_value = list_value(trailer["ID"]) | |
| else: | |
| # Some documents may not have a /ID, use two empty | |
| # byte strings instead. Solves | |
| # https://github.com/pdf2zh/pdf2zh.six/issues/594 | |
| id_value = (b"", b"") | |
| self.encryption = (id_value, dict_value(trailer["Encrypt"])) | |
| self._initialize_password(password) | |
| if "Info" in trailer: | |
| self.info.append(dict_value(trailer["Info"])) | |
| if "Root" in trailer: | |
| # Every PDF file must have exactly one /Root dictionary. | |
| self.catalog = dict_value(trailer["Root"]) | |
| break | |
| else: | |
| raise PDFSyntaxError("No /Root object! - Is this really a PDF?") | |
| if self.catalog.get("Type") is not LITERAL_CATALOG: | |
| if settings.STRICT: | |
| raise PDFSyntaxError("Catalog not found!") | |
| KEYWORD_OBJ = KWD(b"obj") | |
| # _initialize_password(password=b'') | |
| # Perform the initialization with a given password. | |
| def _initialize_password(self, password: str = "") -> None: | |
| assert self.encryption is not None | |
| (docid, param) = self.encryption | |
| if literal_name(param.get("Filter")) != "Standard": | |
| raise PDFEncryptionError("Unknown filter: param=%r" % param) | |
| v = int_value(param.get("V", 0)) | |
| factory = self.security_handler_registry.get(v) | |
| if factory is None: | |
| raise PDFEncryptionError("Unknown algorithm: param=%r" % param) | |
| handler = factory(docid, param, password) | |
| self.decipher = handler.decrypt | |
| self.is_printable = handler.is_printable() | |
| self.is_modifiable = handler.is_modifiable() | |
| self.is_extractable = handler.is_extractable() | |
| assert self._parser is not None | |
| self._parser.fallback = False # need to read streams with exact length | |
| def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: | |
| if stream.objid in self._parsed_objs: | |
| (objs, n) = self._parsed_objs[stream.objid] | |
| else: | |
| (objs, n) = self._get_objects(stream) | |
| if self.caching: | |
| assert stream.objid is not None | |
| self._parsed_objs[stream.objid] = (objs, n) | |
| i = n * 2 + index | |
| try: | |
| obj = objs[i] | |
| except IndexError: | |
| raise PDFSyntaxError("index too big: %r" % index) | |
| return obj | |
| def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]: | |
| if stream.get("Type") is not LITERAL_OBJSTM: | |
| if settings.STRICT: | |
| raise PDFSyntaxError("Not a stream object: %r" % stream) | |
| try: | |
| n = cast(int, stream["N"]) | |
| except KeyError: | |
| if settings.STRICT: | |
| raise PDFSyntaxError("N is not defined: %r" % stream) | |
| n = 0 | |
| parser = PDFStreamParser(stream.get_data()) | |
| parser.set_document(self) | |
| objs: List[object] = [] | |
| try: | |
| while 1: | |
| _, (_, obj) = parser.nextobject() | |
| objs.append(obj) | |
| except PSEOF: | |
| pass | |
| return (objs, n) | |
| def _getobj_parse(self, pos: int, objid: int) -> object: | |
| assert self._parser is not None | |
| self._parser.seek(pos) | |
| (_, objid1) = self._parser.nexttoken() # objid | |
| (_, genno) = self._parser.nexttoken() # genno | |
| (_, kwd) = self._parser.nexttoken() | |
| # hack around malformed pdf files | |
| # copied from https://github.com/jaepil/pdf2zh3k/blob/master/ | |
| # pdf2zh/pdfparser.py#L399 | |
| # to solve https://github.com/pdf2zh/pdf2zh.six/issues/56 | |
| # assert objid1 == objid, str((objid1, objid)) | |
| if objid1 != objid: | |
| x = [] | |
| while kwd is not self.KEYWORD_OBJ: | |
| (_, kwd) = self._parser.nexttoken() | |
| x.append(kwd) | |
| if len(x) >= 2: | |
| objid1 = x[-2] | |
| # #### end hack around malformed pdf files | |
| if objid1 != objid: | |
| raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") | |
| if kwd != KWD(b"obj"): | |
| raise PDFSyntaxError("Invalid object spec: offset=%r" % pos) | |
| end, (_, obj) = self._parser.nextobject() | |
| return end, obj | |
| # can raise PDFObjectNotFound | |
| def getobj(self, objid: int) -> object: | |
| """Get object from PDF | |
| :raises PDFException if PDFDocument is not initialized | |
| :raises PDFObjectNotFound if objid does not exist in PDF | |
| """ | |
| if not self.xrefs: | |
| raise PDFException("PDFDocument is not initialized") | |
| # log.debug("getobj: objid=%r", objid) | |
| if objid in self._cached_objs: | |
| (obj, genno) = self._cached_objs[objid] | |
| else: | |
| for xref in self.xrefs: | |
| try: | |
| (strmid, index, genno) = xref.get_pos(objid) | |
| except KeyError: | |
| continue | |
| try: | |
| if strmid is not None: | |
| stream = stream_value(self.getobj(strmid)) | |
| obj = self._getobj_objstm(stream, index, objid) | |
| else: | |
| end, obj = self._getobj_parse(index, objid) | |
| if self.decipher: | |
| obj = decipher_all(self.decipher, objid, genno, obj) | |
| if isinstance(obj, PDFStream): | |
| obj.set_objid(objid, genno) | |
| break | |
| except (PSEOF, PDFSyntaxError): | |
| continue | |
| else: | |
| raise PDFObjectNotFound(objid) | |
| # log.debug("register: objid=%r: %r", objid, obj) | |
| if self.caching: | |
| self._cached_objs[objid] = (obj, genno) | |
| return obj | |
| OutlineType = Tuple[Any, Any, Any, Any, Any] | |
| def get_outlines(self) -> Iterator[OutlineType]: | |
| if "Outlines" not in self.catalog: | |
| raise PDFNoOutlines | |
| def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: | |
| entry = dict_value(entry) | |
| if "Title" in entry: | |
| if "A" in entry or "Dest" in entry: | |
| title = decode_text(str_value(entry["Title"])) | |
| dest = entry.get("Dest") | |
| action = entry.get("A") | |
| se = entry.get("SE") | |
| yield (level, title, dest, action, se) | |
| if "First" in entry and "Last" in entry: | |
| yield from search(entry["First"], level + 1) | |
| if "Next" in entry: | |
| yield from search(entry["Next"], level) | |
| return search(self.catalog["Outlines"], 0) | |
| def get_page_labels(self) -> Iterator[str]: | |
| """Generate page label strings for the PDF document. | |
| If the document includes page labels, generates strings, one per page. | |
| If not, raises PDFNoPageLabels. | |
| The resulting iteration is unbounded. | |
| """ | |
| assert self.catalog is not None | |
| try: | |
| page_labels = PageLabels(self.catalog["PageLabels"]) | |
| except (PDFTypeError, KeyError): | |
| raise PDFNoPageLabels | |
| return page_labels.labels | |
| def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any: | |
| try: | |
| names = dict_value(self.catalog["Names"]) | |
| except (PDFTypeError, KeyError): | |
| raise PDFKeyError((cat, key)) | |
| # may raise KeyError | |
| d0 = dict_value(names[cat]) | |
| def lookup(d: Dict[str, Any]) -> Any: | |
| if "Limits" in d: | |
| (k1, k2) = list_value(d["Limits"]) | |
| if key < k1 or k2 < key: | |
| return None | |
| if "Names" in d: | |
| objs = list_value(d["Names"]) | |
| names = dict( | |
| cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)), | |
| ) | |
| return names[key] | |
| if "Kids" in d: | |
| for c in list_value(d["Kids"]): | |
| v = lookup(dict_value(c)) | |
| if v: | |
| return v | |
| raise PDFKeyError((cat, key)) | |
| return lookup(d0) | |
| def get_dest(self, name: Union[str, bytes]) -> Any: | |
| try: | |
| # PDF-1.2 or later | |
| obj = self.lookup_name("Dests", name) | |
| except KeyError: | |
| # PDF-1.1 or prior | |
| if "Dests" not in self.catalog: | |
| raise PDFDestinationNotFound(name) | |
| d0 = dict_value(self.catalog["Dests"]) | |
| if name not in d0: | |
| raise PDFDestinationNotFound(name) | |
| obj = d0[name] | |
| return obj | |
| # find_xref | |
| def find_xref(self, parser: PDFParser) -> int: | |
| """Internal function used to locate the first XRef.""" | |
| # search the last xref table by scanning the file backwards. | |
| prev = b"" | |
| for line in parser.revreadlines(): | |
| line = line.strip() | |
| # log.debug("find_xref: %r", line) | |
| if line == b"startxref": | |
| # log.debug("xref found: pos=%r", prev) | |
| if not prev.isdigit(): | |
| raise PDFNoValidXRef(f"Invalid xref position: {prev!r}") | |
| start = int(prev) | |
| if not start >= 0: | |
| raise PDFNoValidXRef(f"Invalid negative xref position: {start}") | |
| return start | |
| if line: | |
| prev = line | |
| raise PDFNoValidXRef("Unexpected EOF") | |
| # read xref table | |
| def read_xref_from( | |
| self, | |
| parser: PDFParser, | |
| start: int, | |
| xrefs: List[PDFBaseXRef], | |
| ) -> None: | |
| """Reads XRefs from the given location.""" | |
| parser.seek(start) | |
| parser.reset() | |
| try: | |
| (pos, token) = parser.nexttoken() | |
| except PSEOF: | |
| raise PDFNoValidXRef("Unexpected EOF") | |
| # log.debug("read_xref_from: start=%d, token=%r", start, token) | |
| if isinstance(token, int): | |
| # XRefStream: PDF-1.5 | |
| parser.seek(pos) | |
| parser.reset() | |
| xref: PDFBaseXRef = PDFXRefStream() | |
| xref.load(parser) | |
| else: | |
| if token is parser.KEYWORD_XREF: | |
| parser.nextline() | |
| xref = PDFXRef() | |
| xref.load(parser) | |
| xrefs.append(xref) | |
| trailer = xref.get_trailer() | |
| # log.debug("trailer: %r", trailer) | |
| if "XRefStm" in trailer: | |
| pos = int_value(trailer["XRefStm"]) | |
| self.read_xref_from(parser, pos, xrefs) | |
| if "Prev" in trailer: | |
| # find previous xref | |
| pos = int_value(trailer["Prev"]) | |
| self.read_xref_from(parser, pos, xrefs) | |
| class PageLabels(NumberTree): | |
| """PageLabels from the document catalog. | |
| See Section 8.3.1 in the PDF Reference. | |
| """ | |
| def labels(self) -> Iterator[str]: | |
| ranges = self.values | |
| # The tree must begin with page index 0 | |
| if len(ranges) == 0 or ranges[0][0] != 0: | |
| if settings.STRICT: | |
| raise PDFSyntaxError("PageLabels is missing page index 0") | |
| else: | |
| # Try to cope, by assuming empty labels for the initial pages | |
| ranges.insert(0, (0, {})) | |
| for next, (start, label_dict_unchecked) in enumerate(ranges, 1): | |
| label_dict = dict_value(label_dict_unchecked) | |
| style = label_dict.get("S") | |
| prefix = decode_text(str_value(label_dict.get("P", b""))) | |
| first_value = int_value(label_dict.get("St", 1)) | |
| if next == len(ranges): | |
| # This is the last specified range. It continues until the end | |
| # of the document. | |
| values: Iterable[int] = itertools.count(first_value) | |
| else: | |
| end, _ = ranges[next] | |
| range_length = end - start | |
| values = range(first_value, first_value + range_length) | |
| for value in values: | |
| label = self._format_page_label(value, style) | |
| yield prefix + label | |
| def _format_page_label(value: int, style: Any) -> str: | |
| """Format page label value in a specific style""" | |
| if style is None: | |
| label = "" | |
| elif style is LIT("D"): # Decimal arabic numerals | |
| label = str(value) | |
| elif style is LIT("R"): # Uppercase roman numerals | |
| label = format_int_roman(value).upper() | |
| elif style is LIT("r"): # Lowercase roman numerals | |
| label = format_int_roman(value) | |
| elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... | |
| label = format_int_alpha(value).upper() | |
| elif style is LIT("a"): # Lowercase letters a-z, aa-zz... | |
| label = format_int_alpha(value) | |
| else: | |
| log.warning("Unknown page label style: %r", style) | |
| label = "" | |
| return label | |