Spaces:
Paused
Paused
| """Provides a general interface to a `physical` OPC package, such as a zip file.""" | |
| import os | |
| from zipfile import ZIP_DEFLATED, ZipFile, is_zipfile | |
| from docx.opc.exceptions import PackageNotFoundError | |
| from docx.opc.packuri import CONTENT_TYPES_URI | |
| class PhysPkgReader: | |
| """Factory for physical package reader objects.""" | |
| def __new__(cls, pkg_file): | |
| # if `pkg_file` is a string, treat it as a path | |
| if isinstance(pkg_file, str): | |
| if os.path.isdir(pkg_file): | |
| reader_cls = _DirPkgReader | |
| elif is_zipfile(pkg_file): | |
| reader_cls = _ZipPkgReader | |
| else: | |
| raise PackageNotFoundError("Package not found at '%s'" % pkg_file) | |
| else: # assume it's a stream and pass it to Zip reader to sort out | |
| reader_cls = _ZipPkgReader | |
| return super(PhysPkgReader, cls).__new__(reader_cls) | |
| class PhysPkgWriter: | |
| """Factory for physical package writer objects.""" | |
| def __new__(cls, pkg_file): | |
| return super(PhysPkgWriter, cls).__new__(_ZipPkgWriter) | |
| class _DirPkgReader(PhysPkgReader): | |
| """Implements |PhysPkgReader| interface for an OPC package extracted into a | |
| directory.""" | |
| def __init__(self, path): | |
| """`path` is the path to a directory containing an expanded package.""" | |
| super(_DirPkgReader, self).__init__() | |
| self._path = os.path.abspath(path) | |
| def blob_for(self, pack_uri): | |
| """Return contents of file corresponding to `pack_uri` in package directory.""" | |
| path = os.path.join(self._path, pack_uri.membername) | |
| with open(path, "rb") as f: | |
| blob = f.read() | |
| return blob | |
| def close(self): | |
| """Provides interface consistency with |ZipFileSystem|, but does nothing, a | |
| directory file system doesn't need closing.""" | |
| pass | |
| def content_types_xml(self): | |
| """Return the `[Content_Types].xml` blob from the package.""" | |
| return self.blob_for(CONTENT_TYPES_URI) | |
| def rels_xml_for(self, source_uri): | |
| """Return rels item XML for source with `source_uri`, or None if the item has no | |
| rels item.""" | |
| try: | |
| rels_xml = self.blob_for(source_uri.rels_uri) | |
| except IOError: | |
| rels_xml = None | |
| return rels_xml | |
| class _ZipPkgReader(PhysPkgReader): | |
| """Implements |PhysPkgReader| interface for a zip file OPC package.""" | |
| def __init__(self, pkg_file): | |
| super(_ZipPkgReader, self).__init__() | |
| self._zipf = ZipFile(pkg_file, "r") | |
| def blob_for(self, pack_uri): | |
| """Return blob corresponding to `pack_uri`. | |
| Raises |ValueError| if no matching member is present in zip archive. | |
| """ | |
| return self._zipf.read(pack_uri.membername) | |
| def close(self): | |
| """Close the zip archive, releasing any resources it is using.""" | |
| self._zipf.close() | |
| def content_types_xml(self): | |
| """Return the `[Content_Types].xml` blob from the zip package.""" | |
| return self.blob_for(CONTENT_TYPES_URI) | |
| def rels_xml_for(self, source_uri): | |
| """Return rels item XML for source with `source_uri` or None if no rels item is | |
| present.""" | |
| try: | |
| rels_xml = self.blob_for(source_uri.rels_uri) | |
| except KeyError: | |
| rels_xml = None | |
| return rels_xml | |
| class _ZipPkgWriter(PhysPkgWriter): | |
| """Implements |PhysPkgWriter| interface for a zip file OPC package.""" | |
| def __init__(self, pkg_file): | |
| super(_ZipPkgWriter, self).__init__() | |
| self._zipf = ZipFile(pkg_file, "w", compression=ZIP_DEFLATED) | |
| def close(self): | |
| """Close the zip archive, flushing any pending physical writes and releasing any | |
| resources it's using.""" | |
| self._zipf.close() | |
| def write(self, pack_uri, blob): | |
| """Write `blob` to this zip package with the membername corresponding to | |
| `pack_uri`.""" | |
| self._zipf.writestr(pack_uri.membername, blob) | |