Source code for wolframclient.serializers.encoder.encoder

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

import logging
import sys
from collections import defaultdict

import pkg_resources

from wolframclient.serializers.utils import safe_len
from wolframclient.utils import six
from wolframclient.utils.api import multiprocessing
from wolframclient.utils.dispatch import Dispatch
from wolframclient.utils.functional import (composition, is_iterable, iterate,
                                            map)
from wolframclient.utils.importutils import safe_import_string

logger = logging.getLogger(__name__)

__all__ = ['wolfram_encoder', 'Encoder']

wolfram_encoder = Dispatch()
""" Instance of :class:`~wolframclient.serializers.encoder.WolframEncoder` used by default during serialization. """


# for now, this method name is fixed and must match the one in the wolfram_encoder wrapper.
@wolfram_encoder.dispatch(object)
def encode(serializer, o):
    if is_iterable(o):
        return serializer.serialize_iterable(
            map(serializer.encode, o), length=safe_len(o))
    if serializer.allow_external_objects:
        return serializer.serialize_external_object(o)

    raise NotImplementedError(
        'Cannot serialize object of class %s' % o.__class__)


class DispatchUpdater(object):
    def __init__(self, dispatch):
        self.registry = defaultdict(list)
        self.modules = set()
        self.plugins_registry = defaultdict(list)
        self.dispatch = dispatch

    def register_modules(self, **handlers):
        for module, _handlers in handlers.items():
            self.modules.add(module)
            self.registry[module].extend(iterate(_handlers))

    def register_plugins(self, name='wolframclient_serializers_encoder'):
        if logger.isEnabledFor(logging.INFO):
            logger.info(
                'Registering Wolfram encoders plugins associated to entrypoint %s.'
                % name)
        for entry_point in pkg_resources.iter_entry_points(group=name):
            self.plugins_registry[entry_point.name].extend(
                entry_point.module_name)

    def _update_dispatch(self):
        if self.modules:
            installed_modules = sys.modules.keys()
            for module in self.modules.intersection(installed_modules):
                for handler in self.registry[module]:
                    self.dispatch.update(safe_import_string(handler))

                del self.registry[module]
                self.modules.remove(module)

    def _update_plugins(self):
        if self.plugins_registry:
            for plugins_name, handler in self.plugins_registry.items():
                handler = ''.join(handler)
                try:
                    self.dispatch.update(safe_import_string(handler))
                except TypeError as e:
                    logger.fatal(
                        'Failed to load encoder associated to plugins %s.' %
                        plugins_name)
                    raise e
            self.plugins_registry = defaultdict(list)

    if not six.JYTHON:
        # global lock to avoid multiple dispatcher updating in multithreaded programs.
        _lock = multiprocessing.Lock()

        def update_dispatch(self):
            with self._lock:
                self._update_dispatch()
                self._update_plugins()

    else:

        def update_dispatch(self):
            self._update_dispatch()
            self._update_plugins()


wolfram_encoder_updater = DispatchUpdater(wolfram_encoder)
wolfram_encoder_updater.register_modules(

    #builtin libraries
    sys='wolframclient.serializers.encoder.builtin.encoder',
    decimal='wolframclient.serializers.encoder.decimal.encoder',
    datetime='wolframclient.serializers.encoder.datetime.encoder',
    fractions='wolframclient.serializers.encoder.fractions.encoder',

    #wolfram language support
    wolframclient='wolframclient.serializers.encoder.wolfram.encoder',
)

wolfram_encoder_updater.register_plugins()


[docs]class Encoder(object): """ A generic class exposing an :meth:`~wolframclient.serializers.encode.Encoder.encode` method applying an optional normalizer function, followed the most relevant encoding available for a given type. """ default_encoder = wolfram_encoder.as_method() default_updater = wolfram_encoder_updater def __init__(self, normalizer=None, allow_external_objects=False, target_kernel_version=None, **kwargs): self.encode = self.chain_normalizer(normalizer) self.allow_external_objects = allow_external_objects self.target_kernel_version = target_kernel_version or 11.3 self._properties = kwargs
[docs] def chain_normalizer(self, func): self.default_updater.update_dispatch() return composition(*map(safe_import_string, iterate(func or (), self.default_encoder)))
[docs] def get_property(self, key, d=None): return self._properties.get(key, d)