import json
try:
from exceptions import BaseException
except Exception:
pass
from urllib3.filepost import encode_multipart_formdata
from urllib3.fields import RequestField, guess_content_type
from sdklib.util.files import guess_filename_stream
from sdklib.util.structures import to_key_val_list, to_key_val_dict
from sdklib.compat import urlencode, quote_plus, basestring, str
java_strings = {"true": "true", "false": "false", "null": "null"}
python_strings = {"true": "True", "false": "False", "null": "None"}
csharp_strings = {"true": "True", "false": "False", "null": "Null"}
[docs]def to_string(value, lang='javascript'):
if lang in ['javascript', 'java', 'php']:
return get_primitive_as_string(java_strings, value)
elif lang in ['python']:
return get_primitive_as_string(python_strings, value)
elif lang in ['csharp', 'dotnet']:
return get_primitive_as_string(csharp_strings, value)
else:
return get_primitive_as_string(python_strings, value)
[docs]def get_primitive_as_string(strings_dict, value):
if isinstance(value, bool) and value:
return strings_dict["true"]
elif isinstance(value, bool):
return strings_dict["false"]
elif value is None:
return strings_dict["null"]
else:
return str(value)
[docs]class BaseRenderer(object):
DEFAULT_CONTENT_TYPE = ""
[docs] def encode_params(self, data=None, **kwargs):
"""
Build the body for a request.
"""
raise BaseException("Not implemented yet")
[docs]class MultiPartRenderer(BaseRenderer):
def __init__(self, boundary="----------ThIs_Is_tHe_bouNdaRY", output_str='javascript'):
self.boundary = boundary
self.output_str = output_str
[docs] def encode_params(self, data=None, files=None, **kwargs):
"""
Build the body for a multipart/form-data request.
Will successfully encode files when passed as a dict or a list of
tuples. Order is retained if data is a list of tuples but arbitrary
if parameters are supplied as a dict.
The tuples may be string (filepath), 2-tuples (filename, fileobj), 3-tuples
(filename, fileobj, contentype) or 4-tuples (filename, fileobj, contentype, custom_headers).
"""
if isinstance(data, basestring):
raise ValueError("Data must not be a string.")
# optional args
boundary = kwargs.get("boundary", None)
output_str = kwargs.get("output_str", self.output_str)
new_fields = []
fields = to_key_val_list(data or {})
files = to_key_val_list(files or {})
for field, value in fields:
ctype = None
if isinstance(value, (tuple, list)) and len(value) == 2:
val, ctype = value
else:
val = value
if isinstance(val, basestring) or not hasattr(val, '__iter__'):
val = [val]
for v in val:
# Don't call str() on bytestrings: in Py3 it all goes wrong.
if not isinstance(v, bytes):
v = to_string(v, lang=output_str)
field = field.decode('utf-8') if isinstance(field, bytes) else field
v = v.encode('utf-8') if isinstance(v, str) else v
rf = RequestField(name=field, data=v)
rf.make_multipart(content_type=ctype)
new_fields.append(rf)
for (k, v) in files:
fn, fdata, ft, fh = guess_file_name_stream_type_header(v)
rf = RequestField(name=k, data=fdata, filename=fn, headers=fh)
rf.make_multipart(content_type=ft)
new_fields.append(rf)
if boundary is None:
boundary = self.boundary
body, content_type = encode_multipart_formdata(new_fields, boundary=boundary)
return body, content_type
[docs]class PlainTextRenderer(BaseRenderer):
VALID_COLLECTION_FORMATS = ['multi', 'csv', 'ssv', 'tsv', 'pipes', 'plain']
COLLECTION_SEPARATORS = {"csv": ",", "ssv": " ", "tsv": "\t", "pipes": "|"}
def __init__(self, charset=None, collection_format='multi', output_str='javascript'):
self.charset = charset or 'utf-8'
self.collection_format = collection_format
self.output_str = output_str
@property
def collection_format(self):
return self._collection_format
@collection_format.setter
def collection_format(self, value):
assert value in self.VALID_COLLECTION_FORMATS
self._collection_format = value
[docs] def get_content_type(self, charset=None):
if charset is None:
charset = self.charset
if charset:
return "text/plain; charset=%s" % self.charset
return 'text/plain'
@staticmethod
def _encode(data, charset=None, output_str='javascript'):
return to_string(data, lang=output_str).encode(charset) \
if charset else to_string(data, lang=output_str).encode()
[docs] def encode_params(self, data=None, **kwargs):
"""
Build the body for a text/plain request.
Will successfully encode parameters when passed as a dict or a list of
2-tuples. Order is retained if data is a list of 2-tuples but arbitrary
if parameters are supplied as a dict.
"""
charset = kwargs.get("charset", self.charset)
collection_format = kwargs.get("collection_format", self.collection_format)
output_str = kwargs.get("output_str", self.output_str)
if data is None:
return b"", self.get_content_type(charset)
elif isinstance(data, bytes):
return data, self.get_content_type(charset)
elif isinstance(data, str):
return self._encode(data, charset), self.get_content_type(charset)
elif hasattr(data, 'read'):
return data, self.get_content_type(charset)
elif collection_format == 'multi' and hasattr(data, '__iter__'):
result = []
for k, vs in to_key_val_list(data):
if isinstance(vs, basestring) or not hasattr(vs, '__iter__'):
vs = [vs]
for v in vs:
result.append(
b"=".join([self._encode(k, charset), self._encode(v, charset, output_str)])
)
return b'\n'.join(result), self.get_content_type(charset)
elif collection_format == 'plain' and hasattr(data, '__iter__'):
results = []
for k, vs in to_key_val_dict(data).items():
results.append(
b"=".join([self._encode(k, charset), self._encode(vs, charset, output_str)])
)
return b'\n'.join(results), self.get_content_type(charset)
elif hasattr(data, '__iter__'):
results = []
for k, vs in to_key_val_dict(data).items():
if isinstance(vs, list):
v = self.COLLECTION_SEPARATORS[collection_format].join(e for e in vs)
key = k + '[]'
else:
v = vs
key = k
results.append(
b"=".join([self._encode(key, charset), self._encode(v, charset, output_str)])
)
return b"\n".join(results), self.get_content_type(charset)
else:
return str(data).encode(charset) if charset else str(data),\
self.get_content_type(charset)
[docs]class JSONRenderer(BaseRenderer):
DEFAULT_CONTENT_TYPE = "application/json"
def __init__(self):
self.content_type = self.DEFAULT_CONTENT_TYPE
[docs] def encode_params(self, data=None, **kwargs):
"""
Build the body for a application/json request.
"""
if isinstance(data, basestring):
raise ValueError("Data must not be a string.")
if data is None:
return b"", self.content_type
try:
fields = to_key_val_dict(data)
except Exception:
fields = data
try:
body = json.dumps(fields)
except Exception:
body = json.dumps(fields, encoding='latin-1')
return str(body).encode(), self.content_type
[docs]class XMLRenderer(BaseRenderer):
DEFAULT_CONTENT_TYPE = "application/xml"
def __init__(self):
self.content_type = self.DEFAULT_CONTENT_TYPE
[docs] def encode_params(self, data=None, **kwargs):
"""
Build the body for a application/xml request.
"""
raise BaseException("Not implemented yet")
[docs]class CustomRenderer(BaseRenderer):
def __init__(self, content_type):
self.content_type = content_type
[docs] def encode_params(self, data=None, **kwargs):
"""
Build the body for a custom request.
"""
return data, self.content_type
default_renderer = JSONRenderer()
[docs]def get_renderer(name=None, mime_type=None):
if name == 'json' or mime_type == JSONRenderer.DEFAULT_CONTENT_TYPE:
return JSONRenderer()
elif name == 'form' or mime_type == FormRenderer.DEFAULT_CONTENT_TYPE:
return FormRenderer()
elif name == 'multipart' or mime_type == MultiPartRenderer.DEFAULT_CONTENT_TYPE:
return MultiPartRenderer()
elif name == 'plain' or mime_type == PlainTextRenderer.DEFAULT_CONTENT_TYPE:
return PlainTextRenderer()
else:
return default_renderer
[docs]def url_encode(params, sort=False):
r = get_renderer('form')
return r.encode_params(params, sort=sort)[0]