Source code for jdcloud_sdk.core.signer

# coding=utf8

# Copyright 2018 JDCLOUD.COM
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import time
import hashlib
import hmac
import re
import uuid
from . import const
from .logger import INFO
from .util import quote, unquote_plus, parse_url


[docs]class Signer(object): ignored_headers = ['authorization', 'user-agent'] def __init__(self, logger): self.__logger = logger
[docs] def sign(self, method, service, region, uri, headers, data, credential, security_token): uri_dict = self.__url_path_to_dict(uri) host = uri_dict['host'] port = uri_dict['port'] path = uri_dict['path'] query = uri_dict['query'] if port and port not in ['80', '443']: full_host = host + ':' + port else: full_host = host return self.signV3(method, service, region, full_host, path, query, headers, data, credential, security_token)
[docs] def signV3(self, method, service, region, host, path, query, headers, data, credential, security_token): canonical_host = self.__build_canonical_host(host) now = self.__now() nonce = str(uuid.uuid4()) jdcloud_date = now.strftime('%Y%m%dT%H%M%SZ') if const.JDCLOUD_DATE in headers: jdcloud_date = headers[const.JDCLOUD_DATE] if const.JDCLOUD_NONCE in headers: nonce = headers[const.JDCLOUD_NONCE] date_str = jdcloud_date[:8] canonical_querystring = self.__normalize_query_string(query) canonical_headers, signed_headers = self.__build_canonical_headers(headers, security_token, canonical_host) payload_hash = self.__sha256_hash(data) canonical_request = (method + '\n' + self.__build_canonical_uri(path) + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash) algorithm = const.JDCLOUD3_ALGORITHM credential_scope = (date_str + '/' + region + '/' + service + '/' + const.JDCLOUD3_REQUEST) string_to_sign = (algorithm + '\n' + jdcloud_date + '\n' + credential_scope + '\n' + self.__sha256_hash(canonical_request)) self.__logger.log(INFO, '---canonical_request---\n' + canonical_request) self.__logger.log(INFO, '----string_to_sign---\n' + string_to_sign) signing_key = self.__get_signature_key(credential.secret_key, date_str, region, service) encoded = string_to_sign.encode('utf-8') signature = hmac.new(signing_key, encoded, hashlib.sha256).hexdigest() authorization_header = ( algorithm + ' ' + 'Credential=' + credential.access_key + '/' + credential_scope + ', ' + 'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature ) headers.update({ const.JDCLOUD_AUTH: authorization_header, const.JDCLOUD_DATE: jdcloud_date, const.JDCLOUD_CONTENT_SHA256: payload_hash, const.JDCLOUD3_ALGORITHM: const.JDCLOUD3_ALGORITHM, const.JDCLOUD_NONCE: nonce }) if security_token: headers.update({const.JDCLOUD_SECURITY_TOKEN: security_token})
def __build_canonical_host(self, full_host): if full_host.lower().find('http://') == 0: full_host = full_host[7:] elif full_host.lower().find('https://') == 0: full_host = full_host[8:] return full_host def __normalize_query_string(self, query): params = [] if isinstance(query, str): for s in query.split('&'): if len(s) <= 0: continue list = [] for val in s.split('='): list.append(self.__urlencode(self.__urldecode(val))) params.append(list) elif isinstance(query, dict): for key in query.keys(): list = [] list.append(self.__urlencode(self.__urldecode(key))) list.append(self.__urlencode(self.__urldecode(query[key]))) params.append(list) normalized = '' for p in sorted(params): if p[0] == '': continue elif len(p) == 2: normalized += '%s=%s&' % (p[0], p[1]) elif len(p) > 2: normalized += '%3D'.join(p[1:]) + '&' if normalized.endswith('&'): normalized = normalized[:normalized.__len__()-1] return normalized def __now(self): return datetime.datetime.utcfromtimestamp(time.time()) def __url_path_to_dict(self, path): """http://stackoverflow.com/a/17892757/142207""" # pattern = (r'^' # r'((?P<schema>.+?)://)?' # r'((?P<user>.+?)(:(?P<password>.*?))?@)?' # r'(?P<host>.*?)' # r'(:(?P<port>\d+?))?' # r'(?P<path>/.*?)?' # r'(\?(?P<query>.*?))?' # r'$') # regex = re.compile(pattern) # match = regex.match(path) # group_dict = match.groupdict() if match is not None else None # # if group_dict['path'] is None: # group_dict['path'] = '/' # # if group_dict['query'] is None: # group_dict['query'] = '' # return group_dict return parse_url(path) def __sign(self, key, msg): return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest() def __get_signature_key(self, key, date_stamp, region_name, service_name): k_date = self.__sign((const.JDCLOUD3 + key).encode('utf-8'), date_stamp) k_region = self.__sign(k_date, region_name) k_service = self.__sign(k_region, service_name) k_signing = self.__sign(k_service, const.JDCLOUD3_REQUEST) return k_signing def __sha256_hash(self, val): return hashlib.sha256(val.encode('utf-8')).hexdigest() def __build_canonical_uri(self, path): reg = re.compile('/+') decoded_path = reg.sub('/', self.__urldecode(path)) if decoded_path == '': return '/' encoded_path = self.__urlencode_ignore_slashes(decoded_path) if encoded_path.startswith('/'): return encoded_path return '/' + encoded_path def __urlencode(self, value): return quote(value, '~') def __urlencode_ignore_slashes(self, value): return quote(value, '/~') def __urldecode(self, value): return unquote_plus(value) def __build_canonical_headers(self, req_headers, security_token, full_host): headers = ['host'] # add host header first signed_values = {} for key in req_headers.keys(): value = req_headers[key] lower_key = key.lower() if lower_key in Signer.ignored_headers: continue headers.append(lower_key) signed_values[lower_key] = value headers.sort() signed_headers = ';'.join(headers) canonical_values = [] for key in headers: if key == 'host': canonical_values.append('host:' + full_host.strip()) else: # canonical_values.append(key + ':' + ) header_value = signed_values[key] if isinstance(header_value, str): canonical_values.append(key + ':' + header_value.strip()) elif isinstance(header_value, list): arr = [] for val in header_value: arr.append(val.strip()) canonical_values.append(key + ':' + ','.join(arr)) canonical_headers = '\n'.join(canonical_values) + '\n' return canonical_headers, signed_headers