|
|
| """upload_docs
|
|
|
| Implements a Distutils 'upload_docs' subcommand (upload documentation to
|
| sites other than PyPi such as devpi).
|
| """
|
|
|
| from base64 import standard_b64encode
|
| from distutils import log
|
| from distutils.errors import DistutilsOptionError
|
| import os
|
| import socket
|
| import zipfile
|
| import tempfile
|
| import shutil
|
| import itertools
|
| import functools
|
| import http.client
|
| import urllib.parse
|
| import warnings
|
|
|
| from .._importlib import metadata
|
| from .. import SetuptoolsDeprecationWarning
|
|
|
| from .upload import upload
|
|
|
|
|
| def _encode(s):
|
| return s.encode('utf-8', 'surrogateescape')
|
|
|
|
|
| class upload_docs(upload):
|
|
|
|
|
| DEFAULT_REPOSITORY = 'https://pypi.python.org/pypi/'
|
|
|
| description = 'Upload documentation to sites other than PyPi such as devpi'
|
|
|
| user_options = [
|
| ('repository=', 'r',
|
| "url of repository [default: %s]" % upload.DEFAULT_REPOSITORY),
|
| ('show-response', None,
|
| 'display full response text from server'),
|
| ('upload-dir=', None, 'directory to upload'),
|
| ]
|
| boolean_options = upload.boolean_options
|
|
|
| def has_sphinx(self):
|
| return bool(
|
| self.upload_dir is None
|
| and metadata.entry_points(group='distutils.commands', name='build_sphinx')
|
| )
|
|
|
| sub_commands = [('build_sphinx', has_sphinx)]
|
|
|
| def initialize_options(self):
|
| upload.initialize_options(self)
|
| self.upload_dir = None
|
| self.target_dir = None
|
|
|
| def finalize_options(self):
|
| log.warn(
|
| "Upload_docs command is deprecated. Use Read the Docs "
|
| "(https://readthedocs.org) instead.")
|
| upload.finalize_options(self)
|
| if self.upload_dir is None:
|
| if self.has_sphinx():
|
| build_sphinx = self.get_finalized_command('build_sphinx')
|
| self.target_dir = dict(build_sphinx.builder_target_dirs)['html']
|
| else:
|
| build = self.get_finalized_command('build')
|
| self.target_dir = os.path.join(build.build_base, 'docs')
|
| else:
|
| self.ensure_dirname('upload_dir')
|
| self.target_dir = self.upload_dir
|
| self.announce('Using upload directory %s' % self.target_dir)
|
|
|
| def create_zipfile(self, filename):
|
| zip_file = zipfile.ZipFile(filename, "w")
|
| try:
|
| self.mkpath(self.target_dir)
|
| for root, dirs, files in os.walk(self.target_dir):
|
| if root == self.target_dir and not files:
|
| tmpl = "no files found in upload directory '%s'"
|
| raise DistutilsOptionError(tmpl % self.target_dir)
|
| for name in files:
|
| full = os.path.join(root, name)
|
| relative = root[len(self.target_dir):].lstrip(os.path.sep)
|
| dest = os.path.join(relative, name)
|
| zip_file.write(full, dest)
|
| finally:
|
| zip_file.close()
|
|
|
| def run(self):
|
| warnings.warn(
|
| "upload_docs is deprecated and will be removed in a future "
|
| "version. Use tools like httpie or curl instead.",
|
| SetuptoolsDeprecationWarning,
|
| )
|
|
|
|
|
| for cmd_name in self.get_sub_commands():
|
| self.run_command(cmd_name)
|
|
|
| tmp_dir = tempfile.mkdtemp()
|
| name = self.distribution.metadata.get_name()
|
| zip_file = os.path.join(tmp_dir, "%s.zip" % name)
|
| try:
|
| self.create_zipfile(zip_file)
|
| self.upload_file(zip_file)
|
| finally:
|
| shutil.rmtree(tmp_dir)
|
|
|
| @staticmethod
|
| def _build_part(item, sep_boundary):
|
| key, values = item
|
| title = '\nContent-Disposition: form-data; name="%s"' % key
|
|
|
| if not isinstance(values, list):
|
| values = [values]
|
| for value in values:
|
| if isinstance(value, tuple):
|
| title += '; filename="%s"' % value[0]
|
| value = value[1]
|
| else:
|
| value = _encode(value)
|
| yield sep_boundary
|
| yield _encode(title)
|
| yield b"\n\n"
|
| yield value
|
| if value and value[-1:] == b'\r':
|
| yield b'\n'
|
|
|
| @classmethod
|
| def _build_multipart(cls, data):
|
| """
|
| Build up the MIME payload for the POST data
|
| """
|
| boundary = '--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
|
| sep_boundary = b'\n--' + boundary.encode('ascii')
|
| end_boundary = sep_boundary + b'--'
|
| end_items = end_boundary, b"\n",
|
| builder = functools.partial(
|
| cls._build_part,
|
| sep_boundary=sep_boundary,
|
| )
|
| part_groups = map(builder, data.items())
|
| parts = itertools.chain.from_iterable(part_groups)
|
| body_items = itertools.chain(parts, end_items)
|
| content_type = 'multipart/form-data; boundary=%s' % boundary
|
| return b''.join(body_items), content_type
|
|
|
| def upload_file(self, filename):
|
| with open(filename, 'rb') as f:
|
| content = f.read()
|
| meta = self.distribution.metadata
|
| data = {
|
| ':action': 'doc_upload',
|
| 'name': meta.get_name(),
|
| 'content': (os.path.basename(filename), content),
|
| }
|
|
|
| credentials = _encode(self.username + ':' + self.password)
|
| credentials = standard_b64encode(credentials).decode('ascii')
|
| auth = "Basic " + credentials
|
|
|
| body, ct = self._build_multipart(data)
|
|
|
| msg = "Submitting documentation to %s" % (self.repository)
|
| self.announce(msg, log.INFO)
|
|
|
|
|
|
|
|
|
| schema, netloc, url, params, query, fragments = \
|
| urllib.parse.urlparse(self.repository)
|
| assert not params and not query and not fragments
|
| if schema == 'http':
|
| conn = http.client.HTTPConnection(netloc)
|
| elif schema == 'https':
|
| conn = http.client.HTTPSConnection(netloc)
|
| else:
|
| raise AssertionError("unsupported schema " + schema)
|
|
|
| data = ''
|
| try:
|
| conn.connect()
|
| conn.putrequest("POST", url)
|
| content_type = ct
|
| conn.putheader('Content-type', content_type)
|
| conn.putheader('Content-length', str(len(body)))
|
| conn.putheader('Authorization', auth)
|
| conn.endheaders()
|
| conn.send(body)
|
| except socket.error as e:
|
| self.announce(str(e), log.ERROR)
|
| return
|
|
|
| r = conn.getresponse()
|
| if r.status == 200:
|
| msg = 'Server response (%s): %s' % (r.status, r.reason)
|
| self.announce(msg, log.INFO)
|
| elif r.status == 301:
|
| location = r.getheader('Location')
|
| if location is None:
|
| location = 'https://pythonhosted.org/%s/' % meta.get_name()
|
| msg = 'Upload successful. Visit %s' % location
|
| self.announce(msg, log.INFO)
|
| else:
|
| msg = 'Upload failed (%s): %s' % (r.status, r.reason)
|
| self.announce(msg, log.ERROR)
|
| if self.show_response:
|
| print('-' * 75, r.read(), '-' * 75)
|
|
|