Legacy Documentation

Parallel Computing Toolkit (2000)

This is documentation for an obsolete product.
Current products and services
Previous section-----Next section

Concurrency: Managing Parallel Processes

Processes and Processors

A process is simply a Mathematica expression being evaluated. A processor is a remote kernel that performs such evaluations.
The command RemoteEvaluate discussed in the chapter Parallel Evaluation will send an evaluation to an explicitly given processor, requiring you to keep track of available processors and processes yourself. The scheduling functions discussed in this chapter perform these functions for you. You can create any number of processes, many more than the number of available processors. If more processes are created than there are processors, the remaining processes will be queued and serviced when a processor becomes available.

Starting and Waiting for Processes

The two basic commands are Queue[expr] to line up an expression for evaluation on any available processor, and Wait[pid] to wait until a given process has finished.
Each process in the queue is identified by its unique process ID, its pid.
Queue[cmd]submits cmd for evaluation on a remote kernel and returns the queued job's pid
Queue[{vars...}, cmd]    builds a closure for the local values of the given variables before sending cmd to a remote kernel
QueueRun[]checks all remote kernels for available results and submits waiting jobs to available remote kernels
DoneQ[pid]returns True if the given process has finished
Wait[pid]waits for the given process to finish and returns its result
Wait[{pid1, pid2, ...}]waits for all given processes and returns the list of results
Wait[expr]waits for all process IDs contained in expr to finish and replaces them by the respective process' result
WaitOne[{pid1, pid2, ...}]waits for one of the given processes to finish. It returns {res, id, ids}, where id is the pid of the finished process, res is its result, and ids is the list of remaining pids

Queuing processes.

WaitOne is nondeterministic. It returns an arbitrary process that has finished. If no process has finished, it calls QueueRun until a result is available. The third element of the resulting list is suitable as an argument of another call to WaitOne.
The functions Queue and Wait implement concurrency. You can start arbitrarily many processes, and they will all be evaluated eventually on any available remote processors. When you need a particular result, you can wait for any particular pid, or you can wait for all results using repeated calls to WaitOne.
QueueRun[] returns True if at least one job was submitted to a remote kernel or one result received from a kernel, False otherwise. Normally, you should not have to run QueueRun yourself. It is called at appropriate places inside Wait and other functions. You need it only if you implement your own main loop for a concurrent program.
Load[] gives the sum of the lengths of the input queues of all remote kernels
Load[kernel] gives the length of the input queue of a remote kernel
$QueueLength gives the length of the input queue of commands submitted with Queue but not yet assigned to an available remote kernel
ResetQueues[]waits for all running processes and abandons any queued processes

System load and input queue size.

Basic Usage

To try the examples here, load Parallel Computing Toolkit and then start a few local or remote kernels as described in the Introduction.
In[1]:=
Queue the evaluation 1+1 for processing on a remote kernel. Note that Queue has the attribute HoldAll to prevent evaluation of the expression before queueing. The value returned by Queue is the process ID (pid) of the queued process.
In[2]:=
Out[2]=
After queuing a process, you can perform other calculations and eventually collect the result. If the result is not yet available, Wait will wait for it.
In[3]:=
Out[3]=
You can queue several processes. Here the expression 12, 22,..., 52 is queued for evaluation.
In[4]:=
Out[4]=
Next, wait for any one process to finish.
In[5]:=
Out[5]=
Note the reassignment of pids to the list of remaining process IDs. Repeating the previous evaluation until the pids list becomes empty allows you to drain the queue.
In[6]:=
Out[6]=
You can also wait for all of the remaining processes to finish.
In[7]:=
Out[7]=

A Note on the Use of Variables

If an expression e in
involves variables with assigned values, care must be taken to ensure that the remote kernels have the same variable values defined. Unless you use ExportEnvironment or shared variables, locally defined variables will not be available to remote kernels. See the section Values of Variables in the chapter Parallel Evaluation for more information.
Here are a few common cases where there may be problems.
This assigns the value 2 to the variable a in the local master kernel.
In[8]:=
You want to evaluate the expression Head[a] on a remote kernel. The result is not Integer, as it is on the local kernel, because on the remote kernel the symbol a does not have a value.
In[9]:=
Out[9]=
In[10]:=
Out[10]=
You can use a local constant, and even name it a, to textually insert the value of a into the argument of Queue.
In[11]:=
Out[11]=
To make this frequent case simpler, you can use an optional first argument in Queue to declare the variables. The variables will then be inserted into the expression in the second argument.
In[12]:=
Out[12]=
The syntax Queue[{vars...},expr] is by design similar to Function[{vars...},expr]. Both form closures where the variables are bound to their values.
Iterator variables behave in the same way. In the following two outputs, the parallel evaluation does not give the correct result.
In[13]:=
Out[13]=
In[14]:=
Out[14]=
Insert the iterator variable as a constant or declare a closure to ensure that you are getting correct results, as is done with the following command.
In[15]:=
Out[15]=
Note that ParallelTable[] treats the iterator variable correctly.
In[16]:=
Out[16]=

Working with Process IDs

Wait[{pid1,pid2,...}] is merely a simple form of a general mechanism to parallelize computations. Wait can take any expression containing pids in its arguments and will wait for all associated processes to finish. The pids will then be replaced by the results of their processes.
You can view Wait as the inverse of Queue; that is, Wait[Queue[expr]] gives expr, evaluated on a remote kernel, just as expr itself is evaluated locally. Further, Wait[... Queue[e1] ... Queue[en] ...] is equivalent to ...e1...en..., where each ei is evaluated in parallel. Here the ellipses represent an arbitrary surrounding computation.
The pids generated by an instance of Queue should be left intact and should neither be destroyed nor duplicated before Wait performs its task. The reason is that each of them represents an ongoing parallel computation whose result should be collected exactly once.
Examples of expressions that leave pids intact follow.
  • pids in a list are safe, because the list operation does not do anything to its arguments; it merely keeps them together. Nested lists are also safe for the same reason.
  • pids are symbolic objects that are not affected by Plus. They may be reordered, which is irrelevant. Most arithmetic operations are safe.
  • Mapping a function involving Queue onto a list is safe because the result will contain the list of the pids.
  • Table returns lists of pids and is, therefore, safe.
Examples of expressions that are not safe include the following.
  • The Head operation will destroy the symbolic pid, as will other structural operations such as Length, ByteCount, and so on.
  • Multiplying a pid by 0 will destroy it.
  • Do does not return anything, so all pids are lost. A similar case is Scan.
To recover from a computation where pids were destroyed or duplicated, use the command ResetQueues[] or ResetSlaves[].
ProcessID[pid] a unique integer identifying the process
Process[pid] the expression representing the process
Scheduling[pid] the priority assigned to the process
ProcessState[pid]the state of the process: queued, running, finished

Properties of Process IDs.

Here we queue several processes.
In[1]:=
Out[1]=
Because the scheduler has not yet been running, all of them are still queued for evaluation.
In[2]:=
Out[2]//TableForm=
To demonstrate how it works, we invoke the scheduler by hand.
In[3]:=
Out[3]=
Now some processes are running on the available processors; some may already have finished.
In[4]:=
Out[4]//TableForm=
Wait[] invokes the scheduler until all processes are finished and returns their results. Note that the priorities are not used with the default queue type, see the section The Scheduler.
In[5]:=
Out[5]=

Latency Hiding

The latency is the communication overhead, the time period between the completion of one request and the start of servicing a new one.
MathLink is a buffered stream. You can send additional requests before receiving all outstanding results. If you can keep the buffers full at all times, there will be no latency; each remote processor will always be busy.
To turn on latency hiding, set $LoadFactor to a value larger than 1, such as 2 through 5. $LoadFactor determines the maximum number of computations sent to a slave kernel before at least one of the results is read. Here are two example commands that set latency hiding.
To benefit from latency hiding, you should plan to create many more processes than there are remote kernels. You can create more processes using Queue or an appropriate parallel mapping or table construct. If the number of processes is larger than $LoadFactor*Length[$Slaves] there is an added benefit of this scheme, automatic load balancing. The faster processors will automatically service more processes.
Do not use latency hiding if you plan to create exactly one process for each remote kernel.
You should develop your program with the default $LoadFactor=1 for easier debugging. Once your program runs correctly, you can try to increase $LoadFactor and measure the effect on the elapsed time of your computation. The smaller the remote computations are, the greater the benefit from latency hiding.
Latency hiding is incompatible with virtual shared memory. When you load Parallel`VirtualShared`, the value of $LoadFactor will be permanently set to 1.

Examples

Before evaluating these examples, load the toolkit and start several remote kernels.

An Infinite Calculation

If you want to verify that the polynomials i xi-1 for n=1,2,... are all irreducible (an open conjecture), factor the polynomials and then check that the length of the list of factors is 2, one factor being the overall numerical factor.
This computation will continue forever. To stop it, abort the local evaluation by pressing . or choosing Kernel Abort Evaluation. After the abort, collect any waiting results, as follows.
Here is the definition of the polynomial in x with degree n.
In[2]:=
You then make this definition known to all remote kernels.
In[3]:=
For better performance, you turn on latency hiding by setting $LoadFactor to a value larger than 1.
In[4]:=
Now you can start the computation, requiring that it print each result as it goes on. To stop the computation, abort it by choosing Kernel Abort Evaluation or pressing .. The explicit call of QueueRun[] is necessary in such examples where you program your own scheduling details.
In[5]:=
Out[5]=
Do not forget to collect the orphaned processes after an interrupt.
In[6]:=
Out[6]=

Automatic Process Generation

A general way to parallelize many kinds of computation is to replace a function g occurring in a functional operation by Composition[Queue,g]. This new operation will cause all instances of calls of the function g to be queued for parallel evaluation. The result of this composition is a process ID (pid) that will appear inside the structure constructed by the outer computation where g occurred. To put back the results of the computation of g, wrap the whole expression in Wait. Wait will replace any pid inside its expression by the result returned by the corresponding process.
Here are a few examples of such functional compositions.

Parallel mapping

A parallel version of Map is easy to develop. The sequential Map wraps a function f around all elements in a list.
In[1]:=
Out[1]=
Simply use Composition[Queue,f] instead of f to schedule all mappings for parallel execution. The result is a list of pids.
In[2]:=
Out[2]=
Finally, simply wait for the processes to finish. Every pid will be replaced with the result of its associated process.
In[3]:=
Out[3]=

Parallel inner products

To see how this works for symbolic inner products, assume you want a generalized inner product where d is the common last dimension of a and first dimension of b. Think of p as Plus and t as Times.
Here is an example with d=2.
In[4]:=
Out[4]=
You can use Composition[Queue,p] in place of p in the previous expression to queue all calculations of p for concurrent execution. The result is a tensor of process IDs.
In[5]:=
Out[5]=
Now, simply wait for all processes in this expression.
In[6]:=
Out[6]=

Parallel tables and sums

This code generates a 55 matrix of random numbers, where each row is computed in parallel.
In[7]:=
Out[7]=
Here is a sum whose elements are computed in parallel. Each element of the sum is a numerical integration.
In[8]:=
Out[8]=
Here is the corresponding table of parallel exact results. The value of is k.
In[9]:=
Out[9]//TableForm=

Comparison With ParallelEvaluate

Parallel Mapping, tables, and inner products were already introduced in chapter Parallel Evaluation. Those functions perform a single dispatch of part of the computation on each remote processors. The functions in this chapter generate one process for each subproblem.
If all subproblems take the same amount of time, the functions such as ParallelMap[], ParallelTable[] are faster. However, if the computation times of the subproblems are different, and not easy to estimate in advance, it can be better to use Wait[... Queue[] ...] as described in this section. If the number of processes generated is larger than the number of remote kernels, this method performs automatic load balancing, because jobs are assigned to a kernel as soon as the previous job is done, and all kernels are kept busy all the time.

Tracing

To observe how processes are scheduled, you can use tracing. To use these features, you have to load the debugging package before loading the toolkit and starting several kernels.
In[1]:=
In[2]:=
Now you can enable Queueing tracing.
In[3]:=
Out[3]=
Here several processes are queued showing how the queue grows in size.
In[4]:=
Out[4]=
Wait[] invokes the scheduler which sends queued jobs to idle processors, collects results, and hands them back to the application.
In[5]:=
Out[5]=
To turn off tracing when you are done, use
In[6]:=
Out[6]=

The Scheduler

Whenever Queue[] is called, a process is entered into a queue on the master kernel. The scheduler QueueRun[] selects the first process in the queue as soon as a remote kernel is available. By choosing different queue implementations, you can specify which process should be considered the first one. Parallel Computing Toolkit provides a number of queues that you can use, or you can write your own. Queues are implemented in packages that you can load as needed.
Parallel`Queue`FIFO FIFOQueue: first in, first out
Parallel`Queue`Priority priorityQueue: user-definable priorities
Parallel`Queue`LIFO LIFOQueue: last in, first out (reverse)
Parallel`Queue`Interfaceinterface definitions for all queues

Packages and queue types.

$Queue the queue of waiting processes
$QueueLength the number of processes in the queue
$QueueType gives the current queue type
SetQueueType[queue]change the queue type to queue

Manipulating the queue.

This is the current queue type (the default).
In[1]:=
Out[1]=
To use priority queues, for example, load the corresponding package,
In[2]:=
In[3]:=
then change the queue type.
In[4]:=
Out[4]=
Now you can generate processes with optional priorities.
Queue[expr, Scheduling->p]queue expr with priority p

Specifying priorities.

In[5]:=
Out[5]=
Each queue type impelements the method Normal[] to give a list of the queue's contents.
In[6]:=
Out[6]=
The scheduler will schedule jobs with higher priority first. The order of the results is not affected.
In[7]:=
Out[7]=
This resets the queue type to the default.
In[8]:=
Out[8]=