Source code for wolframclient.evaluation.cloud.request_adapter

from __future__ import absolute_import, print_function, unicode_literals

from wolframclient.utils.api import aiohttp, requests
from wolframclient.utils.encoding import force_text

__all__ = ["wrap_response"]


class HTTPResponseAdapterBase(object):
    """ Unify various request classes as a unique API. """

    asynchronous = False

    def __init__(self, httpresponse):
        self.response = httpresponse

    def response_object(self):
        return self.response

    def status(self):
        """ HTTP status code """
        return self.response.status_code

    def json(self):
        """ Request body as a json object """
        return self.response.json()

    def text(self):
        """ Request body as decoded text. """
        return self.response.text

    def content(self):
        """ Request body as raw bytes """
        return self.response.content

    def url(self):
        """ String URL. """
        return force_text(self.response.url)

    def headers(self):
        """ Headers as a dict. """
        return self.response.headers


class RequestsHTTPRequestAdapter(HTTPResponseAdapterBase):
    pass


class AIOHttpHTTPRequestAdapter(HTTPResponseAdapterBase):

    asynchronous = True

    def status(self):
        return self.response.status

    async def json(self):
        return await self.response.json()

    async def text(self):
        return await self.response.text()

    async def content(self):
        return await self.response.read()


[docs]def wrap_response(response): if isinstance(response, requests.Response): return RequestsHTTPRequestAdapter(response) elif isinstance(response, aiohttp.ClientResponse): return AIOHttpHTTPRequestAdapter(response) else: raise ValueError("No adapter found for HTTP response class %s" % response.__class__)