Source code for wolframclient.evaluation.kernel.kernelcontroller

# -*- coding: utf-8 -*-

from __future__ import absolute_import, print_function, unicode_literals

import logging
from concurrent import futures
from itertools import count as _count
from queue import Queue
from subprocess import PIPE, Popen
from threading import Event, RLock, Thread

from wolframclient.evaluation.kernel.path import find_default_kernel_path
from wolframclient.evaluation.kernel.zmqsocket import (
    Socket,
    SocketAborted,
    SocketOperationTimeout,
)
from wolframclient.evaluation.result import WolframKernelEvaluationResult
from wolframclient.exception import WolframKernelException
from wolframclient.utils import six
from wolframclient.utils.api import json, os, time, zmq

if six.WINDOWS:
    from subprocess import STARTUPINFO, STARTF_USESHOWWINDOW

__all__ = ["WolframKernelController"]

logger = logging.getLogger(__name__)

TO_PY_LOG_LEVEL = {1: logging.DEBUG, 2: logging.INFO, 3: logging.WARN, 4: logging.FATAL}
FROM_PY_LOG_LEVEL = dict((v, k) for k, v in TO_PY_LOG_LEVEL.items())

_thread_counter = _count().__next__
_thread_counter()


class KernelLogger(Thread):
    """ Asynchronous logger for kernel messages. 
    
    A consumer of messages read from a PUB/SUB socket that turn them into log messages as expected
    by the :mod:`logging` module.
    """

    MAX_MESSAGE_BEFORE_QUIT = 32

    def __init__(self, name=None, level=logging.WARN):
        super().__init__(name=name)
        self.socket = Socket(zmq_type=zmq.SUB)
        self.socket.bind()
        # Subscribe to all since we want all log messages.
        self.socket.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
        if logger.isEnabledFor(logging.INFO):
            logger.info("Initializing Kernel logger on socket " + self.socket.uri)
        self.logger = logging.getLogger("WolframKernel-<%s>" % self.socket.uri)
        self.logger.setLevel(level)
        self.stopped = Event()

    def run(self):
        logger.debug("Start receiving kernel logger messages.")
        msg_after_quit = 0
        zmq_socket = self.socket.zmq_socket
        try:
            while msg_after_quit < KernelLogger.MAX_MESSAGE_BEFORE_QUIT:
                try:
                    msg = zmq_socket.recv_json(flags=zmq.NOBLOCK)
                    level = TO_PY_LOG_LEVEL.get(msg.get("level", 3))
                    msg_text = msg.get("msg", 'Malformed kernel message. Missing key "msg".')
                    self.logger.log(level, msg_text)
                    if self.stopped.is_set():
                        msg_after_quit += 1
                except zmq.Again:
                    if self.stopped.is_set():
                        break
                    else:
                        time.sleep(0.01)
                except json.JSONDecodeError as e:
                    logger.warning("Invalid message: %s", e.doc)
        # no matter what we try to close the socket:
        finally:
            logger.info("Terminating kernel logger thread.")
            if msg_after_quit == KernelLogger.MAX_MESSAGE_BEFORE_QUIT:
                logger.warning(
                    "The maximum number of messages to log after a session finishes has been reached. \
                Some messages may have been discarded."
                )
            try:
                self.socket.close()
            except:
                logger.fatal("Failed to close ZMQ logging socket.")


[docs]class WolframKernelController(Thread): """ Control a Wolfram kernel from a Python thread. A controller can start and stop a Wolfram kernel specified by its path `kernel`. It can evaluate expression, one at a time. Most methods from this class return instances of :class:`~concurrent.futures.Future`. This class is a low level component of the library which is used by local evaluators. ZMQ sockets are not thread safe, this class ensures encapsulation of them, while enabling asynchronous operations. """ def __init__( self, kernel=None, initfile=None, consumer=None, kernel_loglevel=logging.NOTSET, stdin=PIPE, stdout=PIPE, stderr=PIPE, **kwargs ): self.id = _thread_counter() super().__init__(name="wolfram-kernel-%i" % self.id) self.kernel = kernel or self.default_kernel_path() if self.kernel: if not os.isfile(self.kernel): raise WolframKernelException("Kernel not found at %s." % self.kernel) if not os.access(self.kernel, os.X_OK): raise WolframKernelException("Cannot execute kernel %s." % self.kernel) else: raise WolframKernelException( "Cannot locate a kernel automatically. Please provide an explicit kernel path." ) if initfile is None: self.initfile = os.path_join(os.dirname(__file__), "initkernel.m") else: self.initfile = initfile if not os.isfile(self.initfile): raise FileNotFoundError("Kernel initialization file %s not found." % self.initfile) if logger.isEnabledFor(logging.DEBUG): logger.debug( "Initializing kernel %s using script: %s" % (self.kernel, self.initfile) ) self.tasks_queue = Queue() self.kernel_socket_in = None self.kernel_socket_out = None self.kernel_proc = None self.consumer = consumer self.loglevel = kernel_loglevel self.kernel_logger = None self.evaluation_count = 0 self._stdin = stdin self._stdout = stdout self._stderr = stderr # some parameters may be passed as kwargs self.parameters = {} for k, v in kwargs.items(): try: self.set_parameter(k, v) # ignore kwargs unknowns key except KeyError: pass # this is a state: this event is set when the kernel will not serve any more evaluation. self._state_terminated = False # lock controlling concurrent access to the state above. self._state_lock = RLock() # this is a trigger that will abort most blocking operations. self.trigger_termination_requested = Event()
[docs] def duplicate(self): """ Build a new object using the same configuration as the current one. """ return self.__class__( kernel=self.kernel, initfile=self.initfile, kernel_loglevel=self.loglevel, consumer=self.consumer, stdin=self._stdin, stdout=self._stdout, stderr=self._stderr, **self.parameters )
_DEFAULT_PARAMETERS = { "STARTUP_TIMEOUT": 20, "TERMINATE_TIMEOUT": 3, "HIDE_SUBPROCESS_WINDOW": True, }
[docs] def get_parameter(self, parameter_name): """Return the value of a given session parameter. Session parameters are: * ``'STARTUP_TIMEOUT'``: time to wait, in seconds, after the kernel startup is requested. Default is 20 seconds. * ``'TERMINATE_TIMEOUT'``: time to wait, in seconds, after the ``Quit[]`` command is sent to the kernel. The kernel is killed after this duration. Default is 3 seconds. """ try: return self.parameters.get( parameter_name, self._DEFAULT_PARAMETERS[parameter_name] ) except KeyError: raise KeyError( "%s is not one of the valid parameters: %s" % (parameter_name, ", ".join(self._DEFAULT_PARAMETERS.keys())) )
[docs] def set_parameter(self, parameter_name, parameter_value): """Set a new value for a given parameter. The new value only applies for this session. Session parameters are: * ``'STARTUP_TIMEOUT'``: time to wait, in seconds, after the kernel startup is requested. Default is 20 seconds. * ``'TERMINATE_TIMEOUT'``: time to wait, in seconds, after the ``Quit[]`` command is sent to the kernel. The kernel is killed after this duration. Default is 3 seconds. """ if parameter_name not in self._DEFAULT_PARAMETERS: raise KeyError( "%s is not one of the valid parameters: %s" % (parameter_name, ", ".join(self._DEFAULT_PARAMETERS.keys())) ) self.parameters[parameter_name] = parameter_value
[docs] def default_kernel_path(self): return find_default_kernel_path()
def _kernel_terminate(self): self._kernel_stop(gracefully=False) def _kernel_stop(self, gracefully=True): """Stop the kernel process and close sockets. This function must be called when a given session is no longer useful to prevent orphan processes and sockets from being generated. .. note:: Licensing restrictions usually apply to Wolfram kernels and may prevent new instances from starting if too many kernels are running simultaneously. Make sure to always terminate sessions to avoid unexpected start-up errors. """ logger.info("Start termination on kernel %s", self) with self._state_lock: self._state_terminated = True self.trigger_termination_requested.set() if self.kernel_proc is not None: error = False if gracefully: # Graceful stop: first send a Quit command to the kernel. try: self.kernel_socket_out.send(b"8:f\x00s\x04Quit", flags=zmq.NOBLOCK) except: logger.info("Failed to send Quit[] command to the kernel.") error = True if not error: try: self.kernel_proc.wait(timeout=self.get_parameter("TERMINATE_TIMEOUT")) except: logger.info( "Kernel process failed to stop after %.02f seconds. Killing it." % self.get_parameter("TERMINATE_TIMEOUT") ) error = True # Kill process if not already terminated. # Wait for it to cleanly stop if the Quit command was successfully sent, # otherwise the kernel is likely in a bad state so we kill it immediately. if self._stdin == PIPE: try: self.kernel_proc.stdin.close() except: logger.warning("Failed to close kernel process stdin.") error = True if self._stdout == PIPE: try: self.kernel_proc.stdout.close() except: logger.warning("Failed to close kernel process stdout.") error = True if self._stderr == PIPE: try: self.kernel_proc.stderr.close() except: logger.warning("Failed to close kernel process stderr") error = True if error or not gracefully: logger.info("Killing kernel process: %i" % self.kernel_proc.pid) self.kernel_proc.kill() self.kernel_proc = None if self.kernel_socket_out is not None: try: self.kernel_socket_out.close() except Exception as e: logger.fatal(e) finally: self.kernel_socket_out = None if self.kernel_socket_in is not None: try: self.kernel_socket_in.close() except Exception as e: logger.fatal(e) finally: self.kernel_socket_in = None if self.kernel_logger is not None: try: self.kernel_logger.stopped.set() self.kernel_logger.join() except Exception as e: logger.fatal(e) finally: self.kernel_logger = None assert self.kernel_proc is None assert self.kernel_socket_in is None assert self.kernel_socket_out is None assert self.kernel_logger is None @property def started(self): """ Is the kernel starting or being started. """ with self._state_lock: return self.is_alive() and not self._state_terminated @property def terminated(self): """ Is the kernel terminated. Terminated kernel no more handle evaluations. """ with self._state_lock: return self._state_terminated
[docs] def is_kernel_alive(self): """ Return the status of the kernel process. """ try: # subprocess poll function is thread safe. return self.kernel_proc is not None and self.kernel_proc.poll() is None except AttributeError: # in case kernel_proc was set to None. May not even be possible. return False
[docs] def request_kernel_start(self): """ Start the thread and the associated kernel. Return a future object indicating the kernel status. The future object result is True once the kernel is successfully started. Exception raised in the process and passed to the future object. Calling this method twice is a no-op.""" with self._state_lock: future = futures.Future() if not self.started: self.enqueue_task(self.START, future, None) self.start() else: future.set_result(True) return future
[docs] def enqueue_task(self, payload, future, callback): if self.terminated or self.trigger_termination_requested.is_set(): logger.fatal("Cannot enqueue tasks on terminated controller.") raise RuntimeError("Thread is closing. Cannot queue task") self.tasks_queue.put((payload, future, callback))
def _safe_kernel_start(self): """ Start a kernel. If something went wrong, clean-up resources that may have been created. """ try: self._kernel_start() except Exception as e: logger.warning("Failed to start.") try: self._kernel_terminate() finally: raise e _KERNEL_OK = b"OK" _KERNEL_VERSION_NOT_SUPPORTED = 10 def _kernel_start(self): """Start a new kernel process and open sockets to communicate with it.""" # Socket to which we push new expressions for evaluation. if self.kernel_socket_out is None: self.kernel_socket_out = Socket(zmq_type=zmq.PUSH) if self.kernel_socket_in is None: self.kernel_socket_in = Socket(zmq_type=zmq.PULL) # start the evaluation zmq sockets self.kernel_socket_out.bind() self.kernel_socket_in.bind() if logger.isEnabledFor(logging.INFO): logger.info("Kernel writes commands to socket: %s", self.kernel_socket_out) logger.info( "Kernel receives evaluated expressions from socket: %s", self.kernel_socket_in ) # start the kernel process cmd = [self.kernel, "-noprompt", "-initfile", self.initfile] if self.loglevel != logging.NOTSET: self.kernel_logger = KernelLogger( name="wolfram-kernel-logger-%i" % self.id, level=self.loglevel ) self.kernel_logger.start() cmd.append("-run") cmd.append( 'ClientLibrary`Private`SlaveKernelPrivateStart["%s", "%s", "%s", %i];' % ( self.kernel_socket_out.uri, self.kernel_socket_in.uri, self.kernel_logger.socket.uri, FROM_PY_LOG_LEVEL[self.loglevel], ) ) else: cmd.append("-run") cmd.append( 'ClientLibrary`Private`SlaveKernelPrivateStart["%s", "%s"];' % (self.kernel_socket_out.uri, self.kernel_socket_in.uri) ) if logger.isEnabledFor(logging.DEBUG): logger.debug("Kernel called using command: %s." % " ".join(cmd)) # hide the WolframKernel window. if six.WINDOWS and self.get_parameter("HIDE_SUBPROCESS_WINDOW"): startupinfo = STARTUPINFO() startupinfo.dwFlags |= STARTF_USESHOWWINDOW else: startupinfo = None try: self.kernel_proc = Popen( cmd, stdin=self._stdin, stdout=self._stdout, stderr=self._stderr, startupinfo=startupinfo, ) if logger.isEnabledFor(logging.INFO): logger.info("Kernel process started with PID: %s" % self.kernel_proc.pid) t_start = time.perf_counter() except Exception as e: logger.exception(e) raise WolframKernelException("Failed to start kernel process.") try: # First message must be "OK", acknowledging everything is up and running # on the kernel side. response = self.kernel_socket_in.recv_abortable( timeout=self.get_parameter("STARTUP_TIMEOUT"), abort_event=_StartEvent(self.kernel_proc, self.trigger_termination_requested), ) if response == self._KERNEL_OK: if logger.isEnabledFor(logging.INFO): logger.info( "Kernel %s is ready. Startup took %.2f seconds." % (self.pid, time.perf_counter() - t_start) ) else: raise WolframKernelException( "Kernel %s failed to start properly." % self.kernel ) except (SocketAborted, SocketOperationTimeout) as se: if self.kernel_proc.returncode == self._KERNEL_VERSION_NOT_SUPPORTED: raise WolframKernelException( "Wolfram kernel version is not supported. Please consult library prerequisites." ) logger.warning("Socket exception: %s", se) raise WolframKernelException( "Failed to communicate with kernel: %s." % self.kernel ) @property def pid(self): """Return the PID of the Wolfram kernel process, if any, or None.""" try: return self.kernel_proc.pid except AttributeError: return None START = object() STOP = object()
[docs] def stop(self): future = futures.Future() with self._state_lock: if self.terminated: future.set_result(True) return future self.enqueue_task(self.STOP, future, None) self._state_terminated = True return future
[docs] def terminate(self): future = futures.Future() with self._state_lock: if not self.started: future.set_result(True) return future self.enqueue_task(self.STOP, future, None) self._state_terminated = True self.trigger_termination_requested.set() return future
[docs] def join(self, timeout=None): future = self.stop() future.result(timeout=timeout) return super().join(timeout=timeout)
[docs] def evaluate_future(self, wxf, future, result_update_callback=None, **kwargs): self.enqueue_task(wxf, future, result_update_callback)
def _do_evaluate(self, wxf, future, result_update_callback): start = time.perf_counter() self.kernel_socket_out.send(wxf) if logger.isEnabledFor(logging.DEBUG): logger.debug("Expression sent to kernel in %.06fsec", time.perf_counter() - start) start = time.perf_counter() wxf_eval_data = self.kernel_socket_in.recv_abortable(copy=False) if logger.isEnabledFor(logging.DEBUG): logger.debug( "Expression received from kernel after %.06fsec", time.perf_counter() - start ) self.evaluation_count += 1 result = WolframKernelEvaluationResult(wxf_eval_data.buffer, consumer=self.consumer) if logger.isEnabledFor(logging.WARNING): for msg in result.iter_messages(): logger.warning(msg) if result_update_callback: result = result_update_callback(result) future.set_result(result)
[docs] def run(self): future = None task = None try: task = self.tasks_queue.get() payload, future, result_update_callback = task # Kernel start requested. if payload is self.START: self._safe_kernel_start() future.set_result(True) future = None task = None self.tasks_queue.task_done() task = self.tasks_queue.get() payload, future, result_update_callback = task elif payload is self.STOP: future.set_result(True) future = None return # first evaluation. Ensure kernel is started. else: self._safe_kernel_start() while not self.trigger_termination_requested.is_set(): if payload is self.STOP: # lock controlling concurrent access to state above. with self._state_lock: self._state_terminated = True logger.info( "Termination requested for kernel controller. Associated kernel PID: %s", self.pid, ) task = None self.tasks_queue.task_done() break self._do_evaluate(payload, future, result_update_callback) future = None task = None self.tasks_queue.task_done() task = self.tasks_queue.get() payload, future, result_update_callback = task except (KeyboardInterrupt, RuntimeError, futures.CancelledError) as e: self.trigger_termination_requested.set() logger.error("Fatal error in kernel controller: %s", e) raise e except Exception as e: self.trigger_termination_requested.set() if future and not future.cancelled(): future.set_exception(e) future = None else: raise e finally: try: if task: self.tasks_queue.task_done() self._cancel_tasks() if self.trigger_termination_requested.is_set(): self._kernel_terminate() else: self._kernel_stop() except Exception as e: if future: future.set_exception(e) future = None finally: if future: future.set_result(True)
def _cancel_tasks(self): while not self.tasks_queue.empty(): task = self.tasks_queue.get() _, future, _ = task future.cancel() def __repr__(self): if self.started: return "<%s: pid:%i, kernel sockets: (in:%s, out:%s)>" % ( self.__class__.__name__, self.kernel_proc.pid, self.kernel_socket_in.uri, self.kernel_socket_out.uri, ) else: return "<%s: %s>" % (self.__class__.__name__, self.name)
class _StartEvent(object): def __init__(self, subprocess, abort_event): self.subprocess = subprocess self.abort_event = abort_event def is_set(self): return self.subprocess.poll() is not None or self.abort_event.is_set()