# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, unicode_literals
import logging
from concurrent import futures
from subprocess import PIPE
from wolframclient.evaluation.base import WolframEvaluator
from wolframclient.evaluation.kernel.kernelcontroller import WolframKernelController
from wolframclient.serializers import export
logger = logging.getLogger(__name__)
__all__ = ["WolframLanguageSession"]
# Some callback methods for internal use.
def do_get_wxf(result):
return result.wxf
def do_get_result(result):
return result.get()
[docs]class WolframLanguageSession(WolframEvaluator):
"""A session to a Wolfram kernel enabling evaluation of Wolfram Language expressions.
Start a new session and send an expression for evaluation::
with WolframLanguageSession() as session:
session.evaluate('Range[3]')
Set `timeout` to a number to set an evaluation timeout in seconds. If the evaluation
time extends the timeout, a :class:`~concurrent.futures.TimeoutError` is raised.
Evaluate an expression taking 10 seconds to return using a 5-second timeout::
long_evaluation = wl.Pause(10)
with WolframLanguageSession() as session:
session.evaluate(long_evaluation, timeout=5)
The asynchronous evaluation method
:meth:`~wolframclient.evaluation.kernel.localsession.WolframLanguageSession.evaluate_future`
returns an instance of :class:`~concurrent.futures.Future` class wrapping the evaluation result::
with WolframLanguageSession() as session:
future = session.evaluate_future('1+1')
result = future.result()
When `consumer` is set to a :class:`~wolframclient.deserializers.WXFConsumer` instance, this instance is passed to
:func:`~wolframclient.deserializers.binary_deserialize` when deserializing the WXF output.
By default, packed arrays are deserialized as :class:`list`. Specify a consumer instance that supports NumPy arrays
:class:`~wolframclient.deserializers.WXFConsumerNumpy`::
from wolframclient.deserializers import WXFConsumerNumpy
with WolframLanguageSession(consumer=WXFConsumerNumpy()) as session:
numpy_array = session.evaluate('Range[3]')
Communication with a given kernel is based on ZMQ sockets:
* one `PUSH` socket to send expressions for evaluation
* one `PULL` socket to receive evaluation results
Kernel logging is disabled by default and is done through a third socket (type `SUB`). The initial log level is
specified by the parameter `kernel_loglevel`.
If the log level was not set at initialization, logging is not available for the entire session.
The kernel associated with a given session provides the following logging functions:
* ``ClientLibrary`debug`` corresponding to :py:meth:`logging.Logger.debug`
* ``ClientLibrary`info`` corresponding to :py:meth:`logging.Logger.info`
* ``ClientLibrary`warn`` corresponding to :py:meth:`logging.Logger.warning`
* ``ClientLibrary`error`` corresponding to :py:meth:`logging.Logger.error`
* ``ClientLibrary`SetDebugLogLevel[]`` send debug messages and above
* ``ClientLibrary`SetInfoLogLevel[]`` send info messages and above
* ``ClientLibrary`SetWarnLogLevel[]`` send warning messages and above
* ``ClientLibrary`SetErrorLogLevel[]`` only send error messages
* ``ClientLibrary`DisableKernelLogging[]`` stop sending error message to the logging socket
The standard input, output and error file handles can be specified with `stdin`, `stdout` and `stderr` named
parameters. Valid values are those accepted by :class:`subprocess.Popen` (e.g. :data:`sys.stdout`). Those parameters
should be handled with care as deadlocks can arise from misconfiguration.
"""
def __init__(
self,
kernel=None,
consumer=None,
initfile=None,
kernel_loglevel=logging.NOTSET,
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
inputform_string_evaluation=True,
wxf_bytes_evaluation=True,
controller_class=WolframKernelController,
**kwargs
):
super().__init__(inputform_string_evaluation=inputform_string_evaluation)
self.kernel = kernel
self.consumer = None
self.initfile = None
self.kernel_loglevel = logging.NOTSET
self._stdin = stdin
self._stdout = stdout
self._stderr = stderr
self.wxf_bytes_evaluation = wxf_bytes_evaluation
self.controller_class = controller_class
self.kernel_controller = self.controller_class(
kernel=kernel,
initfile=initfile,
kernel_loglevel=kernel_loglevel,
stdin=stdin,
stdout=stdout,
stderr=stderr,
**kwargs
)
self.parameters = kwargs
self.stopped = True
[docs] def duplicate(self):
return self.__class__(
kernel=self.kernel,
consumer=self.consumer,
initfile=self.initfile,
kernel_loglevel=self.kernel_loglevel,
stdin=self._stdin,
stdout=self._stdout,
stderr=self._stderr,
inputform_string_evaluation=self.inputform_string_evaluation,
controller_class=self.controller_class,
**self.parameters
)
@property
def started(self):
return self.kernel_controller.started
[docs] def start(self, block=True, timeout=None):
""" Start a kernel controller and eventually start a fresh one if the previous one was terminated.
Set `block` to :data:`True` (default is :data:`False`) to wait for the kernel to be up and running
before returning. Optionally, set a timeout in seconds. If the timeout is reached, a :data:`TimeoutError`
will be raised and the kernel is terminated.
"""
try:
future = self.start_future()
if future and block:
future.result(timeout=timeout)
except Exception as e:
try:
self.terminate()
finally:
raise e
[docs] def start_future(self):
""" Request the Wolfram kernel to start and return a future object.
The result of the future object is :data:`True` when the kernel is ready to evaluate input."""
self.stopped = False
if self.kernel_controller.terminated:
self.kernel_controller = self.kernel_controller.duplicate()
if not self.started:
return self.kernel_controller.request_kernel_start()
future = futures.Future()
future.set_result(True)
return future
[docs] def stop(self):
""" Request the Wolfram kernel to stop gracefully. """
self._stop(gracefully=True)
[docs] def terminate(self):
""" Request the Wolfram kernel to stop immediately.
Ongoing evaluations may be cancelled. """
self._stop(gracefully=False)
def _stop(self, gracefully=True):
# if the kernel is terminated the queue no more accept new tasks. Stop would hang.
if not self.stopped:
future = self.stop_future(gracefully=gracefully)
future.result()
[docs] def stop_future(self, gracefully=True):
""" Request the Wolfram kernel to stop and return a future object.
The result of the future object is :data:`True` when the controller thread is no longer alive.
Set `gracefully` to :data:`False` to request an immediate stop, eventually cancelling ongoing
evaluations.
"""
self.stopped = True
if gracefully:
return self.kernel_controller.stop()
else:
return self.kernel_controller.terminate()
[docs] def ensure_started(self):
if not self.started:
self.start(block=True, timeout=None)
if self.stopped:
self.restart()
[docs] def restart(self, block=True, timeout=None):
""" Restart a given evaluator by stopping it in cases where it is already started. """
if self.started:
self.stop()
self.start(block=block, timeout=timeout)
CALLBACK_GET_WXF = staticmethod(do_get_wxf)
CALLBACK_GET = staticmethod(do_get_result)
[docs] def do_evaluate_future(self, expr, result_update_callback=None, **kwargs):
future = futures.Future()
wxf = export(self.normalize_input(expr), target_format="wxf", **kwargs)
self.kernel_controller.evaluate_future(
wxf, future, result_update_callback=result_update_callback, **kwargs
)
return future
[docs] def evaluate_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the evaluated expression. See
:func:`~wolframclient.evaluation.WolframLanguageSession.evaluate`.
"""
self.ensure_started()
return self.do_evaluate_future(
expr, result_update_callback=self.CALLBACK_GET, **kwargs
)
[docs] def evaluate_wxf_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the WXF serialization of the evaluated expression.
See :func:`~wolframclient.evaluation.WolframLanguageSession.evaluate_wxf`.
"""
self.ensure_started()
return self.do_evaluate_future(
expr, result_update_callback=self.CALLBACK_GET_WXF, **kwargs
)
[docs] def evaluate_wrap_future(self, expr, **kwargs):
""" Evaluate an expression and return a future object.
The future object result is the result object with the evaluated expression and meta information.
See :func:`~wolframclient.evaluation.WolframLanguageSession.evaluate_wrap`.
"""
self.ensure_started()
return self.do_evaluate_future(expr, **kwargs)
[docs] def evaluate_wrap(self, expr, **kwargs):
return self.evaluate_wrap_future(expr, **kwargs).result()
[docs] def evaluate(self, expr, **kwargs):
result = self.evaluate_wrap(expr, **kwargs)
self.log_message_from_result(result)
return result.get()
[docs] def evaluate_wxf(self, expr, **kwargs):
""" Evaluate an expression and return the serialized expression.
This method does not deserialize the Wolfram kernel input. """
result = self.evaluate_wrap(expr, **kwargs)
self.log_message_from_result(result)
return result.wxf
[docs] def log_message_from_result(self, result):
if not result.success:
for msg in result.messages:
logger.warning(msg)
[docs] def get_parameter(self, parameter_name):
return self.kernel_controller.get_parameter(parameter_name)
[docs] def set_parameter(self, parameter_name, parameter_value):
self.kernel_controller.set_parameter(parameter_name, parameter_value)
get_parameter.__doc__ = WolframKernelController.get_parameter.__doc__
set_parameter.__doc__ = WolframKernelController.set_parameter.__doc__
def __repr__(self):
if self.started:
return "<%s: kernel controller=%s>" % (
self.__class__.__name__,
self.kernel_controller,
)
else:
return "<%s: not started>" % self.__class__.__name__