Source code for wolframclient.evaluation.kernel.futuresession

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

import logging
from subprocess import PIPE

from wolframclient.evaluation.kernel.kernelsession import (
    WolframLanguageSession)
from wolframclient.utils.api import futures

logger = logging.getLogger(__name__)

__all__ = ['WolframLanguageFutureSession']


[docs]class WolframLanguageFutureSession(WolframLanguageSession): """Evaluate expressions asynchronously and return Future instances. Asynchronous evaluations are backed by the :mod:`concurrent.futures` module, especially the :class:`~concurrent.futures.Future` class. Contrary to :class:`~wolframclient.evaluation.WolframLanguageAsyncSession`, none of the methods of this class is a coroutine. Evaluation methods are synchronous; they start the evaluation as a background task in a thread, and immediately return a :class:`~concurrent.futures.Future` objects, that can later be awaited. """ def __init__(self, kernel=None, consumer=None, initfile=None, in_socket=None, out_socket=None, kernel_loglevel=logging.NOTSET, stdin=PIPE, stdout=PIPE, stderr=PIPE, inputform_string_evaluation=True, wxf_bytes_evaluation=True, **kwargs): super().__init__( kernel=kernel, consumer=consumer, initfile=initfile, in_socket=in_socket, out_socket=out_socket, kernel_loglevel=kernel_loglevel, stdin=stdin, stdout=stdout, stderr=stderr, inputform_string_evaluation=inputform_string_evaluation, wxf_bytes_evaluation=wxf_bytes_evaluation, **kwargs) self.thread_pool_exec = None
[docs] def evaluate(self, expr, **kwargs): """Evaluate :meth:`~wolframclient.evaluation.kernel.kernelsession.WolframLanguageSession.evaluate` asynchronously and return a :class:`~concurrent.futures.Future` object.""" return self._do_in_thread(super().evaluate, self.normalize_input(expr), **kwargs)
[docs] def evaluate_wxf(self, expr, **kwargs): """Evaluate :meth:`~wolframclient.evaluation.kernel.kernelsession.WolframLanguageSession.evaluate_wxf` asynchronously and return a :class:`~concurrent.futures.Future` object.""" return self._do_in_thread(super().evaluate_wxf, self.normalize_input(expr), **kwargs)
[docs] def evaluate_wrap(self, expr, **kwargs): """Evaluate :meth:`~wolframclient.evaluation.kernel.kernelsession.WolframLanguageSession.evaluate_wrap` asynchronously and return a :class:`~concurrent.futures.Future` object.""" return self._do_in_thread(super().evaluate_wrap, self.normalize_input(expr), **kwargs)
def _do_in_thread(self, func, *args, **kwargs): try: self._get_exec_pool() return self.thread_pool_exec.submit(func, *args, **kwargs) except ImportError: logger.fatal('Module concurrent.futures is missing.') raise NotImplementedError( 'Asynchronous evaluation is not available on this Python interpreter.' ) def _get_exec_pool(self): if self.thread_pool_exec is None: self.thread_pool_exec = futures.ThreadPoolExecutor(max_workers=1) return self.thread_pool_exec
[docs] def terminate(self): """Terminate the current session. This is a synchronous method.""" # First terminate all executions. # Then use the socket to actually quit. Avoid crashes when freeing zmq resources still in use. if self.thread_pool_exec: try: self.thread_pool_exec.shutdown(wait=True) except Exception as e: logger.fatal('Failed to stop thread pool executor: %s' % e) super().terminate()