T. Why parallel?
- Long computations, or big data.
- Goal is to divide computation burden among processors.

Solutions with R
- Usually, no 'magic bullet'
  - R is not thread safe, all data is in memory.
  - Algorithms are written for serial processing.
  - Not quite true, e.g., BLAS/LAPACK libraries.
- Instead: ad hoc solutions with packages such as Rmpi.

Problems we will touch on
- Random number generation. Useful to introduce methods and suggest challenges.
- Interactively exploring ExpressionSet data.
- Cross-validation. Embarassingly parallel: each cross-validation independent of other iterations. Readily parallelized.
- (Advanced) Bootstrapping. Also embarassingly parallel, but more work required to parallelize.
- (Advanced) Interfacing C code.

Solutions and limitations
- Write an R script for sequential processing; identify bottlenecks in code execution.
- Modify to allow parallelization.
  - Load packages such as Rmpi.
  - Redefine or modify functions to distribute calculations.
- Often: develop algorithm interactively, evaluate in BATCH mode.

Limitations.
- Quite a bit of programming required.
- Maximum speedup limited by fraction of parallelizable code.
- Communication costs suggest coarse-grained parallelization.
- Diminishing proportional benefits of additional processors.

A first session

library(Rmpi)
mpi.spawn.Rslaves(nslaves=2)

- One node (master or manager) coordinates tasks, other nodes (slaves or workers) perform computations.
- nslaves can be more or less than the number of processing units (e.g., CPUs) available.
- Each spawned R is a separate process, sharing only a mechanism of communication with the other R processes.

Sending data and commands

x <- 1:5
mpi.bcast.Robj2slave(x)

- Efficiently send (broadcast) any R object (including functions).
  - Internally: using serialize.

mpi.remote.exec(search())[1]

- Evaluate and receive results of any R function.

Calculations on ExpressionSet

library(golubEsets)
data(golubMerge)
exprSet <- exprs(golubMerge)
res <- apply(exprSet, 1, mad)
res <- mpi.parApply(exprSet, 1, mad)

- mpi.parApply like apply, but first argument divided between workers.
- Expensive communication, because portions of exprSet sent 'over the wire' to each worker.

Another way...

ff <- function(i) mad(exprSet[i,])
res <- sapply(1:nrow(exprSet), ff)

Parallel code:

mpi.bcast.cmd(library(golubEsets))
mpi.bcast.cmd(data(golubMerge))
mpi.bcast.cmd(exprSet <- exprs(golubMerge))
res <- mpi.parSapply(1:nrow(exprSet), ff)

- mpi.bcast.cmd like mpi.remote.exec, but no return value.
- Data loaded from local disk.
- mpi.parSapply like sapply; FUN sent to workers.
- Only a vector of length sent and received.

Random numbers

mpi.remote.exec(runif(4))

- Not very random!
- Each node has the same random number seed, so creates the same random number sequence.
- Parallel computaton, but not very useful: apply the same program to the same data.

mpi.setup.rngstream()
mpi.remote.exec(runif(4))

Repeatable research can be very problematic
- Identical results require repeatable random number sequence and identical order of evaluation.
- Order of evaluation depends on, e.g., cluster size, but also vaguaries of processs timing.

Lab, part 1

Next: toward useful work

Cross-validation and machine learning

Can gene expression patterns help identify phenotype? - Divide known phenotypes into a 'training' and 'test' set.
- Train a machine learning algorithm with the training set.
- Test the trained alogrithm (comparing predicted and known phenotypes) with the 'test' set.
- Cross-validiation: repeat with other training and test sets.
- Select 'best' machine learning algorithm.

Cross-validiation
- Statistically assess machine learning algorithm.
- Each cross-validiation (almost) independent.

An example: golubMerge data set

library(MLInterfaces)
library(golubEsets)
data(golubMerge)

- Data set of gene expression values measured on samples.
- phenotypic measures on each sample, including leukemia status (ALL or AML).

We will look at a subset of the data:

smallG <- golubMerge[200:250, ]

Cross-validation

lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO", group = as.integer(0))
table(lk1, smallG$ALL.AML)

- Classify patient leukemia status using knnB algorithm (k nearest neighbors).
- xvalMethod: leave-one-out -- training set is all but one sample, testing set the remaining sample.
- Cross-validate with all possible training and test sets.
- Interpretation: cross-validations, correct classifications.

Cross-validation in parallel

setClass("RmpiXval",representation("list"))
setMethod("xvalLoop", signature(cluster="RmpiXval"),
  function(cluster, ...) mpi.parLapply)
cluster = new("RmpiXval")

mpi.bcast.cmd(library(MLInterfaces))
lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO",
  group = as.integer(0), cluster=cluster)
table(lk1, smallG$ALL.AML)

- Same results as before (good!)
- MLInterfaces package developers modified xval for easy parallelization.
- Implementation presented here has high communication costs, so does not scale too well.

Under the hood...

So far...
- Start a single process, spawn several workers, distribute data, do analysis, return result.

Room for improvement.
- Interactive.
- Confusing mix of standard and parallel code.
- High communication costs to distribute data.
- Cluster configuration inside R.
- 'Manager' never does any real work.

Batch programing

Write a script file xval-batch.R...

# file xval-batch.R
# Load Rmpi, setup RmpiXval, xvalLoop (details above)
# broadcast and analyze
mpi.bcast.cmd(library(MLInterfaces))
library(MLInterfaces)
library(golubEsets)
data(golubMerge)
smallG <- golubMerge[200:250, ]
lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO",
  group = as.integer(0), cluster=new("RmpiXval"))
table(lk1, smallG$ALL.AML)

...and execute (from the command line) in 'batch' mode.

% R CMD BATCH xval-batch.R

Output presented in xval-batch.Rout.

Original issues:
- Interactive. (SOLVED)
- Confusing mix of standard and parallel code. (SOLVED?)
- High communication costs to distribute data.
- Cluster configuration inside R.
- 'Manager' never does any real work.

A different parallel style
- Often, each node has its own hard drive, and cluster hardware efficiently moves large data to each drive. So...
- Each node determines data to analyze, performs the analysis, and coordinates the (usually much smaller) results with other nodes.

# file xval-batch-2.R
# Load Rmpi, setup RmpiXval2, xvalLoop (details elsewhere)
# analyze data
library(MLInterfaces)
library(golubEsets)
data(golubMerge)
smallG <- golubMerge[200:250, ]
lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO",
  group = as.integer(0), cluster=new("RmpiXval2"))
# output results, but only once!
if (mpi.comm.rank() == 0)
  table(lk1, smallG$ALL.AML)

Control cluster (e.g., using 3 nodes) from the command line:

% mpiexec -n 3 R CMD BATCH xval-batch-2.R

Original issues:
- Interactive. (SOLVED)
- Confusing mix of standard and parallel code. (SOLVED)
- High communication costs to distribute data. (SOLVED)
- Cluster configuration inside R. (SOLVED)
- 'Manager' never does any real work. (SOLVED)

Under the hood...

setClass("RmpiXval2", representation=list(size="numeric"))
setMethod("xvalLoop", signature(cluster="RmpiXval2"),
  function(cluster, ...) lapplys)
cluster <- new("RmpiXval2")

- All nodes start xval, locally prepare data for analysis.
- xvalLoop method partitions work for each node, allows computation to occur in parallel, collates results.
- All nodes massage and return result.

Really under the hood: allgather.Robj

Efficiently collate obj from all nodes.

allgather.Robj <- function(obj=NULL, comm=1) {
  obj <- as.integer(charToRaw(serialize(obj, NULL)))
  sz <- mpi.allgather(length(obj), 1, integer(mpi.comm.size(comm)), comm)
  objs <-mpi.allgatherv(obj, 1, integer(sum(sz)), sz, comm)
  as.list(unlist(lapply(split(as.raw(objs),rep(1:length(sz)-1, sz)),
    unserialize), recursive=FALSE, use.names=FALSE))
}

Really under the hood: lapplys

lapply-like loop: obtain work, do calculations, gather all results.

lapplys <- function(X, FUN, ..., comm=1) {
  rank <- mpi.comm.rank(comm) + 1
  n <- mpi.comm.size(comm)
  tasks <- 1:length(X)
  mywork <- X[split(tasks, cut(tasks, n))[[rank]]]
  result <- lapply(mywork, FUN, ...)
  allgather.Robj(result, comm)
}

Lab, part 2

Next: advanced topics, tools, and opportunities

The bootstrap

library(boot)
args(boot)

- Bootstrap often embarassingly parallel, but...
- boot not readily accessible to parallelization.

One solution.
- Wrap statistic to distribute computing.
- WARNING: this is a 'hack', and will not work for parametric bootstraps.

Bootstrap first attempts

ratio <- function(d, w) sum(d$x * w)/sum(d$u * w)
boot(city, ratio, R = 999, stype = "w")

Parallel processing ideas that don't work.

mpi.bcast.cmd(library(boot))
mpi.bcast.Robj2slave(ratio)
res1 <- mpi.remote.exec(boot(city, ratio, R=999, stype="w"))
sz <- mpi.comm.size()
res2 <- mpi.parLapply(1:sz, function(i)
  boot(city, ratio, R=999/sz, stype="w"))

- res1: identical bootstraps on each node! \item \Robject{res2}: 999 (ish) bootstraps, but need to be collated! \end{itemize} \es \bs \section{How \Rfunction{boot} works, and how to parallelize it} How \Rfunction{boot} works. \begin{itemize} \item \verb|for| loop, calling \Rfunction{statistic} in a pre-determined order. \item For most versions of \Rfunction{boot}, each call to \Rfunction{statistic} has no influence on subsequent program execution. \end{itemize} How to parallelize it. \begin{itemize} \item Arrange for calls to \Rfunction{statistic} to be distributed, and\ldots \item Manager always recieves results. \item Workers calculate result (expensive) and forwards to manager, but only occasionally. \item \Rfunction{wrap}: see the lab for details. \end{itemize} \es \bs \section{A wrapped \Rfunction{ratio}} <>= wrap <- function(func, pseudo, comm=1) { iter <- 0 sz <- mpi.comm.size(comm)-1 rank <- mpi.comm.rank(comm) tag <- list(result=1) if (mpi.comm.rank()==0) function(...) mpi.recv.Robj(mpi.any.source(), tag$result, comm) else function(...) { if (iter %% sz == rank - 1) { result <- func(...) mpi.send.Robj(result, 0, tag$result, comm) } else result <- pseudo iter <<- iter + 1 result } } @ Manager \Rfunction{ratiow} <>= ratiow <- wrap(ratio, pseudo = 1.) ratiow @ Worker \Rfunction{ratiow} <>= mpi.bcast.Robj2slave(wrap) mpi.bcast.cmd(ratiow <- wrap(ratio, pseudo=1)) mpi.remote.exec(ratiow)[1] mpi.bcast.cmd(boot(city, ratiow, R=999, stype="v")) bootp <- boot(city, ratiow, R=999, stype="v") hist(bootp$t) @ \es \bs \subsection{Interfacing C code} \R{} packages. \begin{itemize} \item Allow user to define collections of useful functions. \item Can include C source code, callable from \R{} \item Package-writing details complex, but available \mref{http://cran.fhcrc.org/doc/manuals/R-exts.html}{online}. \end{itemize} Strategy. \begin{itemize} \item Call a C function. \item Evaluate arbitrary parallel code, including spawning new processes. \item Return result. \end{itemize} \es \bs \subsection{A brief outline: spawning parallel processes} \R{} code <>= parallelPi <- function( nodes ) { nodes <- as.integer(nodes) if (nodes < 1 || length(nodes) > 1) stop("nodes should be a single integer value") prog <- system.file(file.path("examples","cpi"), package="Rpi") .Call("parallelPi", prog, nodes, PACKAGE = "Rpi") } @ C function (to calculate $\pi$). \begin{verbatim} SEXP parallelPi(SEXP program, SEXP nodes) { MPI_Comm intercomm; int *slaverrcode; SEXP ans; /* type checking & set-up */ if(!isInteger(nodes) || (length(nodes) != 1)) error("expected a numeric value for number of nodes"); PROTECT(ans = allocVector(REALSXP, 1)); /* parallel */ slaverrcode = ( int* ) Calloc( nodes, int ); MPI_Comm_spawn(CHAR(STRING_ELT(program, 0)), MPI_ARGV_NULL, INTEGER(nodes)[0], MPI_INFO_NULL, 0, MPI_COMM_SELF, &intercomm, slaverrcode); [...] UNPROTECT(1); return(ans); } \end{verbatim} \es \bs \section{Other opportunities: \Rpackage{Rmpi}-like packages} \begin{itemize} \item \Rpackage{rpvm} provides an interface to the PVM library (similar to MPI); additional functionality not as developed as \Rpackage{Rmpi}. \item \Rpackage{snow} provides a common interface to point-to-point commands (like \Rfunction{mpi.send.Robj}) and functions (like \Rfunction{mpi.parLapply}) built on top of \Rpackage{Rmpi}, \Rpackage{rpvm}, or native `sockets'. \item \Rpackage{papply} simple \Rfunction{mpi.parLapply} functionality, used transparently in serial or parallel modes. \end{itemize} \es \bs \subsection{NetWorkSpaces} \begin{itemize} \item \mref{http://nws-r.sourceforge.net}{NetWorkSpaces} allows variables to be stored on a centralized server, and accessed by multiple instances of \R{} -- the illusion of shared memory. \item Multi-language (\R{}, Python, matlab) support, i.e., possible to store a variable in matlab, access and manipulate it in \R{}, and forward the result to Python. \end{itemize} \es \bs \subsection{Tools used today} Software infrastructure \begin{itemize} \item MPI (e.g., \mref{http://www.lam-mpi.org/}{LAM/MPI}) for linux, \mref{http://www-unix.mcs.anl.gov/mpi/mpich/}{MPICH2} for Windows. \item Intentionally clustered computers likely already have MPI. \end{itemize} \Rpackage{Rmpi} \begin{itemize} \item Linux: usually \verb|biocLite(Rmpi)|. \item Windows: \mref{http://www.stats.uwo.ca/faculty/yu/Rmpi/}{binary} download and instructions. \end{itemize} \mref{http://dirk.eddelbuettel.com/quantian.html}{Quantian} \begin{itemize} \item Linux distribution available on a single bootable CD. \item Contains MPI, \R{}, and most CRAN and Bioconductor packages! \end{itemize} \es \bs \subsection{Ideas for development} \begin{itemize} \item Exposing existing functionality for parallelization, e.g., \Rfunction{xval} in \Rpackage{MLInterfaces}. \item Building high-level abstractions to MPI, e.g., automatically partitioning work in batch jobs, creatign the illusion of shared variables. \item Providing creative solutions to interactive parallel programming. \end{itemize} \es \end{document}