Source code for sdklib.http.authorization

# -*- coding: utf-8 -*-

import base64
import hmac
import binascii

from hashlib import sha1

from sdklib.compat import convert_str_to_bytes, convert_bytes_to_str
from sdklib.http import url_encode
from sdklib.http.renderers import FormRenderer, guess_file_name_stream_type_header
from sdklib.http.headers import (
    AUTHORIZATION_HEADER_NAME, X_11PATHS_DATE_HEADER_NAME, X_11PATHS_BODY_HASH_HEADER_NAME,
    X_11PATHS_FILE_HASH_HEADER_NAME
)
from sdklib.util.times import get_current_utc
from sdklib.util.urls import ensure_url_path_starts_with_slash
from sdklib.util.structures import to_key_val_list
from sdklib.http.methods import PUT_METHOD, POST_METHOD
from sdklib.http.headers import CONTENT_TYPE_HEADER_NAME


X_11PATHS_HEADER_PREFIX = "X-11paths-"
X_11PATHS_HEADER_SEPARATOR = ":"
AUTHORIZATION_HEADER_FIELD_SEPARATOR = " "
AUTHORIZATION_METHOD = "11PATHS"


[docs]def basic_authorization(username, password): combined_username_password = username + ":" + password b64_combined = base64.b64encode(convert_str_to_bytes(combined_username_password)) return "Basic " + convert_bytes_to_str(b64_combined)
[docs]def x_11paths_authorization(app_id, secret, context, utc=None): """ Calculate the authentication headers to be sent with a request to the API. :param app_id: :param secret: :param context :param utc: :return: array a map with the Authorization and Date headers needed to sign a Latch API request """ utc = utc or context.headers[X_11PATHS_DATE_HEADER_NAME] url_path = ensure_url_path_starts_with_slash(context.url_path) url_path_query = url_path if context.query_params: url_path_query += "?%s" % (url_encode(context.query_params, sort=True)) string_to_sign = (context.method.upper().strip() + "\n" + utc + "\n" + _get_11paths_serialized_headers(context.headers) + "\n" + url_path_query.strip()) if context.body_params and isinstance(context.renderer, FormRenderer): string_to_sign = string_to_sign + "\n" + \ url_encode(context.body_params, sort=True).replace("&", "") authorization_header_value = ( AUTHORIZATION_METHOD + AUTHORIZATION_HEADER_FIELD_SEPARATOR + app_id + AUTHORIZATION_HEADER_FIELD_SEPARATOR + _sign_data(secret, string_to_sign) ) return authorization_header_value
def _sign_data(secret, data): """ Sign data. :param data: the string to sign :return: string base64 encoding of the HMAC-SHA1 hash of the data parameter using {@code secretKey} as cipher key. """ sha1_hash = hmac.new(secret.encode(), data.encode(), sha1) return binascii.b2a_base64(sha1_hash.digest())[:-1].decode('utf8') def _hash_body(context): body, _ = context.renderer.encode_params(context.body_params, files=context.files) return sha1(convert_str_to_bytes(body)).hexdigest() def _hash_file(context): files = context.files assert len(files) == 1, "This method only can sign requests with one file" for param in files: _, fdata, _, _ = guess_file_name_stream_type_header(files[param]) return sha1(convert_str_to_bytes(fdata)).hexdigest() def _get_utc(): utc = get_current_utc() return utc.strip() def _get_11paths_serialized_headers(x_headers): """ Prepares and returns a string ready to be signed from the 11-paths specific HTTP headers received. :param x_headers: a non necessarily ordered map (array without duplicates) of the HTTP headers to be ordered. :return: string The serialized headers, an empty string if no headers are passed, or None if there's a problem such as non 11paths specific headers """ if x_headers: headers = to_key_val_list(x_headers, sort=True, insensitive=True) serialized_headers = "" for key, value in headers: if key.lower().startswith(X_11PATHS_HEADER_PREFIX.lower()) and \ key.lower() != X_11PATHS_DATE_HEADER_NAME.lower(): serialized_headers += key.lower() + X_11PATHS_HEADER_SEPARATOR + \ value.replace("\n", " ") + " " return serialized_headers.strip() else: return ""
[docs]class AbstractAuthentication(object):
[docs] def apply_authentication(self, context): return context
[docs]class X11PathsAuthentication(AbstractAuthentication): def __init__(self, app_id, secret, utc=None): self.app_id = app_id self.secret = secret self.utc = utc
[docs] def apply_authentication(self, context): context.headers[X_11PATHS_DATE_HEADER_NAME] = self.utc or _get_utc() if context.method == POST_METHOD or context.method == PUT_METHOD: if CONTENT_TYPE_HEADER_NAME in context.headers and \ context.headers[CONTENT_TYPE_HEADER_NAME].lower().startswith( "multipart/form-data" ): context.headers[X_11PATHS_FILE_HASH_HEADER_NAME] = _hash_file(context) elif CONTENT_TYPE_HEADER_NAME in context.headers and \ context.headers[CONTENT_TYPE_HEADER_NAME].lower() == "application/json": context.headers[X_11PATHS_BODY_HASH_HEADER_NAME] = _hash_body(context) elif CONTENT_TYPE_HEADER_NAME not in context.headers and not context.body_params: # 11paths bug: server validate that content-type header exists in POST and PUT # requests context.headers[CONTENT_TYPE_HEADER_NAME] = "application/x-www-form-urlencoded" context.headers[AUTHORIZATION_HEADER_NAME] = x_11paths_authorization( self.app_id, self.secret, context ) return context
[docs]class BasicAuthentication(AbstractAuthentication): def __init__(self, username, password): self.username = username self.password = password
[docs] def apply_authentication(self, context): context.headers[AUTHORIZATION_HEADER_NAME] = basic_authorization( self.username, self.password ) return context