Spaces:
Paused
Paused
File size: 3,945 Bytes
5b01a63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from urllib.request import parse_keqv_list, parse_http_list
from authlib.common.urls import (
urlparse, extract_params, url_decode,
)
from .signature import (
SIGNATURE_TYPE_QUERY,
SIGNATURE_TYPE_BODY,
SIGNATURE_TYPE_HEADER
)
from .errors import (
InsecureTransportError,
DuplicatedOAuthProtocolParameterError
)
from .util import unescape
class OAuth1Request:
def __init__(self, method, uri, body=None, headers=None):
InsecureTransportError.check(uri)
self.method = method
self.uri = uri
self.body = body
self.headers = headers or {}
# states namespaces
self.client = None
self.credential = None
self.user = None
self.query = urlparse.urlparse(uri).query
self.query_params = url_decode(self.query)
self.body_params = extract_params(body) or []
self.auth_params, self.realm = _parse_authorization_header(headers)
self.signature_type, self.oauth_params = _parse_oauth_params(
self.query_params, self.body_params, self.auth_params)
params = []
params.extend(self.query_params)
params.extend(self.body_params)
params.extend(self.auth_params)
self.params = params
@property
def client_id(self):
return self.oauth_params.get('oauth_consumer_key')
@property
def client_secret(self):
if self.client:
return self.client.get_client_secret()
@property
def rsa_public_key(self):
if self.client:
return self.client.get_rsa_public_key()
@property
def timestamp(self):
return self.oauth_params.get('oauth_timestamp')
@property
def redirect_uri(self):
return self.oauth_params.get('oauth_callback')
@property
def signature(self):
return self.oauth_params.get('oauth_signature')
@property
def signature_method(self):
return self.oauth_params.get('oauth_signature_method')
@property
def token(self):
return self.oauth_params.get('oauth_token')
@property
def token_secret(self):
if self.credential:
return self.credential.get_oauth_token_secret()
def _filter_oauth(params):
for k, v in params:
if k.startswith('oauth_'):
yield (k, v)
def _parse_authorization_header(headers):
"""Parse an OAuth authorization header into a list of 2-tuples"""
authorization_header = headers.get('Authorization')
if not authorization_header:
return [], None
auth_scheme = 'oauth '
if authorization_header.lower().startswith(auth_scheme):
items = parse_http_list(authorization_header[len(auth_scheme):])
try:
items = parse_keqv_list(items).items()
auth_params = [(unescape(k), unescape(v)) for k, v in items]
realm = dict(auth_params).get('realm')
return auth_params, realm
except (IndexError, ValueError):
pass
raise ValueError('Malformed authorization header')
def _parse_oauth_params(query_params, body_params, auth_params):
oauth_params_set = [
(SIGNATURE_TYPE_QUERY, list(_filter_oauth(query_params))),
(SIGNATURE_TYPE_BODY, list(_filter_oauth(body_params))),
(SIGNATURE_TYPE_HEADER, list(_filter_oauth(auth_params)))
]
oauth_params_set = [params for params in oauth_params_set if params[1]]
if len(oauth_params_set) > 1:
found_types = [p[0] for p in oauth_params_set]
raise DuplicatedOAuthProtocolParameterError(
'"oauth_" params must come from only 1 signature type '
'but were found in {}'.format(','.join(found_types))
)
if oauth_params_set:
signature_type = oauth_params_set[0][0]
oauth_params = dict(oauth_params_set[0][1])
else:
signature_type = None
oauth_params = {}
return signature_type, oauth_params
|