\documentclass[10pt]{seminar} \usepackage{mathptmx} \usepackage{amsmath} \usepackage{graphicx} \usepackage{float} \usepackage{epsfig} \usepackage{semcolor} \usepackage[pdftex,colorlinks=false,pdfborder=0 0 0,pageanchor=false,bookmarks=false,urlcolor=black]{hyperref} \pdfpagewidth=11truein \pdfpageheight=8.5truein \pdfhorigin=1truein % default value(?), but doesn't work without \pdfvorigin=1truein % default value(?), but doesn't work without \newcommand{\fs}{\fromSlide} \newcommand{\bs}{\begin{slide}} \newcommand{\es}{\end{slide}} \slideframe{none} \setcounter{secnumdepth}{0} \newcommand{\Rfunction}[1]{{\texttt{#1}}} \newcommand{\Robject}[1]{{\texttt{#1}}} \newcommand{\Rpackage}[1]{{\textsf{#1}}} \newcommand{\Rmethod}[1]{{\texttt{#1}}} \newcommand{\Rfunarg}[1]{{\texttt{#1}}} \newcommand{\Rclass}[1]{{\textit{#1}}} \newcommand{\R}{\textsf{R}} \newcommand{\term}[1]{\emph{#1}} \newcommand{\mref}[2]{\htmladdnormallinkfoot{#2}{#1}} \begin{document} \bs{} \begin{center} {\Large Parallel R} \\[\bigskipamount] M. T. Morgan (\url{mtmorgan@fhcrc.org}) \\[\bigskipamount] Fred Hutchinson Cancer Research Center \\ Seattle, WA \\[\bigskipamount] \includegraphics[width=!,height=2cm]{../Rlogo} $\qquad$ \includegraphics[width=!,height=2cm]{../bioclogo} \\[\bigskipamount] \url{http://bioconductor.org} \\ 4 August, 2006 \end{center} \es \centerslidesfalse \bs <>= options(width=60) @ \section{Introduction} Why parallel? \begin{itemize} \item Long computations, or big data. \item Goal is to divide computation burden among processors. \end{itemize} Solutions with \R \begin{itemize} \item Usually, no `magic bullet' \begin{itemize} \item \R{} is not \emph{thread safe}, all data is \emph{in memory}. \item Algorithms are written for serial processing. \item Not quite true, e.g., \mref{http://cran.fhcrc.org/doc/manuals/R-admin.html}{BLAS/LAPACK} libraries. \end{itemize} \item Instead: \emph{ad hoc} solutions with packages such as \Rpackage{Rmpi}. \end{itemize} \es \bs \subsection{Problems we will touch on} \begin{itemize} \item Random number generation. Useful to introduce methods and suggest challenges. \item Interactively exploring \Robject{ExpressionSet} data. \item Cross-validation. \term{Embarassingly parallel}: each cross-validation independent of other iterations. Readily parallelized. \item (Advanced) Bootstrapping. Also embarassingly parallel, but more work required to parallelize. \item (Advanced) Interfacing C code. \end{itemize} \es \bs \subsection{Solutions and limitations} \begin{itemize} \item Write an \R{} script for sequential processing; identify bottlenecks in code execution. \item Modify to allow parallelization. \begin{itemize} \item Load packages such as \Rpackage{Rmpi}. \item Redefine or modify functions to distribute calculations. \end{itemize} \item Often: develop algorithm interactively, evaluate in \verb|BATCH| mode. \end{itemize} Limitations. \begin{itemize} \item Quite a bit of programming required. \item Maximum speedup limited by fraction of parallelizable code. \item Communication costs suggest \term{coarse-grained} parallelization. \item Diminishing proportional benefits of additional processors. \end{itemize} \es \bs \section{A first session} <>= library(Rmpi) mpi.spawn.Rslaves(nslaves=2) @ \begin{itemize} \item One \term{node} (\term{master} or \term{manager}) coordinates tasks, other nodes (\term{slaves} or \term{workers}) perform computations. \item \Rfunarg{nslaves} can be more or less than the number of processing units (e.g., CPUs) available. \item Each spawned \R{} is a separate process, sharing only a mechanism of communication with the other \R{} processes. \end{itemize} \es \bs \subsection{Sending data and commands} <>= x <- 1:5 mpi.bcast.Robj2slave(x) @ \begin{itemize} \item Efficiently send (\term{broadcast}) any \R{} object (including functions). \begin{itemize} \item Internally: using \Rfunction{serialize}. \end{itemize} \end{itemize} <>= mpi.remote.exec(search())[1] @ \begin{itemize} \item Evaluate and receive results of any \R{} function. %% \item Try \verb|library(help=Rmpi)| for more information. \end{itemize} \es \bs \subsection{Calculations on \Robject{ExpressionSet}} <>= library(golubEsets) data(golubMerge) exprSet <- exprs(golubMerge) res <- apply(exprSet, 1, mad) res <- mpi.parApply(exprSet, 1, mad) @ \begin{itemize} \item \Rfunction{mpi.parApply} like \Rfunction{apply}, but first argument divided between workers. \item Expensive communication, because portions of \Robject{exprSet} sent `over the wire' to each worker. \end{itemize} \es \bs \subsection{Another way\ldots} <>= ff <- function(i) mad(exprSet[i,]) res <- sapply(1:nrow(exprSet), ff) @ Parallel code: <>= mpi.bcast.cmd(library(golubEsets)) mpi.bcast.cmd(data(golubMerge)) mpi.bcast.cmd(exprSet <- exprs(golubMerge)) res <- mpi.parSapply(1:nrow(exprSet), ff) @ \begin{itemize} \item \Rfunction{mpi.bcast.cmd} like \Rfunction{mpi.remote.exec}, but no return value. \item Data loaded from local disk. \item \Rfunction{mpi.parSapply} like \Rfunction{sapply}; \Robject{FUN} sent to workers. \item Only a vector of length \Sexpr{nrow(exprs(golubMerge))} sent and received. \end{itemize} \es \bs \subsection{Random numbers} <>= mpi.remote.exec(runif(4)) @ \begin{itemize} \item Not very random! \item Each node has the same random number seed, so creates the same random number sequence. \item Parallel computaton, but not very useful: apply the same \term{program} to the same \term{data}. \end{itemize} \es \bs <>= mpi.setup.rngstream() mpi.remote.exec(runif(4)) @ Repeatable research can be \emph{very} problematic \begin{itemize} \item Identical results require repeatable random number sequence \emph{and} identical order of evaluation. \item Order of evaluation depends on, e.g., cluster size, but also vaguaries of processs timing. \end{itemize} \es \bs \section{Lab, part 1} \section{Next: toward useful work} \es \bs \subsection{Cross-validation and machine learning} Can gene expression patterns help identify phenotype? \begin{itemize} \item Divide known phenotypes into a `training' and `test' set. \item Train a machine learning algorithm with the training set. \item Test the trained alogrithm (comparing predicted and known phenotypes) with the `test' set. \item Cross-validiation: repeat with other training and test sets. \item Select `best' machine learning algorithm. \end{itemize} Cross-validiation \begin{itemize} \item Statistically assess machine learning algorithm. \item Each cross-validiation (almost) independent. \end{itemize} \es \bs \subsection{An example: \Robject{golubMerge} data set} <>= library(MLInterfaces) library(golubEsets) data(golubMerge) @ \begin{itemize} \item Data set of \Sexpr{dim(exprs(golubMerge))[[1]]} gene expression values measured on \Sexpr{dim(exprs(golubMerge))[[2]]} samples. \item \Sexpr{ncol(pData(golubMerge))} phenotypic measures on each sample, including leukemia status (\verb|ALL| or \verb|AML|). \end{itemize} We will look at a subset of the data: <>= smallG <- golubMerge[200:250, ] @ \es \bs \subsection{Cross-validation} <>= lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO", group = as.integer(0)) table(lk1, smallG$ALL.AML) @ \begin{itemize} \item Classify patient leukemia status using \Rfunarg{knnB} algorithm ($k$ nearest neighbors). \item \Rfunarg{xvalMethod}: \verb|leave-one-out| -- training set is all but one sample, testing set the remaining sample. \item Cross-validate with all possible training and test sets. \item Interpretation: \Sexpr{length(lk1)} cross-validations, \Sexpr{sum(lk1==smallG$ALL.AML)} correct classifications.%$ \end{itemize} \es \bs \subsection{Cross-validation in parallel} <>= setClass("RmpiXval",representation("list")) setMethod("xvalLoop", signature(cluster="RmpiXval"), function(cluster, ...) mpi.parLapply) cluster = new("RmpiXval") @ <>= mpi.bcast.cmd(library(MLInterfaces)) lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO", group = as.integer(0), cluster=cluster) table(lk1, smallG$ALL.AML) #%$ @ \begin{itemize} \item Same results as before (good!) \item \Rpackage{MLInterfaces} package developers modified \Rfunction{xval} for easy parallelization. \item Implementation presented here has high communication costs, so does not scale too well. \end{itemize} \es \bs \subsection{Under the hood\ldots} <>= <> @ \es \bs \subsection{So far\ldots} \begin{itemize} \item Start a single process, spawn several workers, distribute data, do analysis, return result. \end{itemize} Room for improvement. \begin{itemize} \item Interactive. \item Confusing mix of standard and parallel code. \item High communication costs to distribute data. \item Cluster configuration inside \R{}. \item `Manager' never does any real work. \end{itemize} \es \bs \subsection{Batch programing} Write a script file \verb|xval-batch.R|\ldots \begin{verbatim} # file xval-batch.R # Load Rmpi, setup RmpiXval, xvalLoop (details above) # broadcast and analyze mpi.bcast.cmd(library(MLInterfaces)) library(MLInterfaces) library(golubEsets) data(golubMerge) smallG <- golubMerge[200:250, ] lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO", group = as.integer(0), cluster=new("RmpiXval")) table(lk1, smallG$ALL.AML) \end{verbatim} \es \bs \subsection{} \ldots and execute (from the command line) in `batch' mode. \begin{verbatim} % R CMD BATCH xval-batch.R \end{verbatim} Output presented in \verb|xval-batch.Rout|. \begin{itemize} \item Original issues: \begin{itemize} \item Interactive. (SOLVED) \item Confusing mix of standard and parallel code. (SOLVED?) \item High communication costs to distribute data. \item Cluster configuration inside \R{}. \item `Manager' never does any real work. \end{itemize} \end{itemize} \es \bs \subsection{A different parallel style} \begin{itemize} \item Often, each node has its own hard drive, and cluster hardware efficiently moves large data to each drive. So\ldots \item Each node determines data to analyze, performs the analysis, and coordinates the (usually much smaller) results with other nodes. \end{itemize} \begin{verbatim} # file xval-batch-2.R # Load Rmpi, setup RmpiXval2, xvalLoop (details elsewhere) # analyze data library(MLInterfaces) library(golubEsets) data(golubMerge) smallG <- golubMerge[200:250, ] lk1 <- xval(smallG, "ALL.AML", knnB, xvalMethod = "LOO", group = as.integer(0), cluster=new("RmpiXval2")) # output results, but only once! if (mpi.comm.rank() == 0) table(lk1, smallG$ALL.AML) \end{verbatim} Control cluster (e.g., using 3 nodes) from the command line: \begin{verbatim} % mpiexec -n 3 R CMD BATCH xval-batch-2.R \end{verbatim} Original issues: \begin{itemize} \item Interactive. (SOLVED) \item Confusing mix of standard and parallel code. (SOLVED) \item High communication costs to distribute data. (SOLVED) \item Cluster configuration inside \R{}. (SOLVED) \item `Manager' never does any real work. (SOLVED) \end{itemize} \es \bs \subsection{Under the hood\ldots} <>= setClass("RmpiXval2", representation=list(size="numeric")) setMethod("xvalLoop", signature(cluster="RmpiXval2"), function(cluster, ...) lapplys) cluster <- new("RmpiXval2") @ \begin{itemize} \item All nodes start \Rfunction{xval}, locally prepare data for analysis. \item \Rfunction{xvalLoop} method partitions work for each node, allows computation to occur in parallel, collates results. \item All nodes massage and return result. \end{itemize} \es \bs \subsubsection{\emph{Really} under the hood: \Rfunction{allgather.Robj}} Efficiently collate \Robject{obj} from all nodes. <>= allgather.Robj <- function(obj=NULL, comm=1) { obj <- as.integer(charToRaw(serialize(obj, NULL))) sz <- mpi.allgather(length(obj), 1, integer(mpi.comm.size(comm)), comm) objs <-mpi.allgatherv(obj, 1, integer(sum(sz)), sz, comm) as.list(unlist(lapply(split(as.raw(objs),rep(1:length(sz)-1, sz)), unserialize), recursive=FALSE, use.names=FALSE)) } @ \es \bs \subsubsection{\emph{Really} under the hood: \Rfunction{lapplys}} \Rfunction{lapply}-like loop: obtain work, do calculations, gather all results. <>= lapplys <- function(X, FUN, ..., comm=1) { rank <- mpi.comm.rank(comm) + 1 n <- mpi.comm.size(comm) tasks <- 1:length(X) mywork <- X[split(tasks, cut(tasks, n))[[rank]]] result <- lapply(mywork, FUN, ...) allgather.Robj(result, comm) } @ \es \bs \section{Lab, part 2} \section{Next: advanced topics, tools, and opportunities} \es \bs \subsection{The bootstrap} <>= library(boot) args(boot) @ \begin{itemize} \item Bootstrap often embarassingly parallel, but\ldots \item \Rfunction{boot} not readily accessible to parallelization. \end{itemize} One solution. \begin{itemize} \item Wrap \Rfunction{statistic} to distribute computing. \item WARNING: this is a `hack', and will not work for parametric bootstraps. \end{itemize} \es \bs \section{Bootstrap first attempts} <>= ratio <- function(d, w) sum(d$x * w)/sum(d$u * w) boot(city, ratio, R = 999, stype = "w") @ Parallel processing ideas that don't work. <>= mpi.bcast.cmd(library(boot)) mpi.bcast.Robj2slave(ratio) res1 <- mpi.remote.exec(boot(city, ratio, R=999, stype="w")) sz <- mpi.comm.size() res2 <- mpi.parLapply(1:sz, function(i) boot(city, ratio, R=999/sz, stype="w")) @ \begin{itemize} \item \Robject{res1}: identical bootstraps on each node! \item \Robject{res2}: 999 (ish) bootstraps, but need to be collated! \end{itemize} \es \bs \section{How \Rfunction{boot} works, and how to parallelize it} How \Rfunction{boot} works. \begin{itemize} \item \verb|for| loop, calling \Rfunction{statistic} in a pre-determined order. \item For most versions of \Rfunction{boot}, each call to \Rfunction{statistic} has no influence on subsequent program execution. \end{itemize} How to parallelize it. \begin{itemize} \item Arrange for calls to \Rfunction{statistic} to be distributed, and\ldots \item Manager always recieves results. \item Workers calculate result (expensive) and forwards to manager, but only occasionally. \item \Rfunction{wrap}: see the lab for details. \end{itemize} \es \bs \section{A wrapped \Rfunction{ratio}} <>= wrap <- function(func, pseudo, comm=1) { iter <- 0 sz <- mpi.comm.size(comm)-1 rank <- mpi.comm.rank(comm) tag <- list(result=1) if (mpi.comm.rank()==0) function(...) mpi.recv.Robj(mpi.any.source(), tag$result, comm) else function(...) { if (iter %% sz == rank - 1) { result <- func(...) mpi.send.Robj(result, 0, tag$result, comm) } else result <- pseudo iter <<- iter + 1 result } } @ Manager \Rfunction{ratiow} <>= ratiow <- wrap(ratio, pseudo = 1.) ratiow @ Worker \Rfunction{ratiow} <>= mpi.bcast.Robj2slave(wrap) mpi.bcast.cmd(ratiow <- wrap(ratio, pseudo=1)) mpi.remote.exec(ratiow)[1] mpi.bcast.cmd(boot(city, ratiow, R=999, stype="v")) bootp <- boot(city, ratiow, R=999, stype="v") hist(bootp$t) @ \es \bs \subsection{Interfacing C code} \R{} packages. \begin{itemize} \item Allow user to define collections of useful functions. \item Can include C source code, callable from \R{} \item Package-writing details complex, but available \mref{http://cran.fhcrc.org/doc/manuals/R-exts.html}{online}. \end{itemize} Strategy. \begin{itemize} \item Call a C function. \item Evaluate arbitrary parallel code, including spawning new processes. \item Return result. \end{itemize} \es \bs \subsection{A brief outline: spawning parallel processes} \R{} code <>= parallelPi <- function( nodes ) { nodes <- as.integer(nodes) if (nodes < 1 || length(nodes) > 1) stop("nodes should be a single integer value") prog <- system.file(file.path("examples","cpi"), package="Rpi") .Call("parallelPi", prog, nodes, PACKAGE = "Rpi") } @ C function (to calculate $\pi$). \begin{verbatim} SEXP parallelPi(SEXP program, SEXP nodes) { MPI_Comm intercomm; int *slaverrcode; SEXP ans; /* type checking & set-up */ if(!isInteger(nodes) || (length(nodes) != 1)) error("expected a numeric value for number of nodes"); PROTECT(ans = allocVector(REALSXP, 1)); /* parallel */ slaverrcode = ( int* ) Calloc( nodes, int ); MPI_Comm_spawn(CHAR(STRING_ELT(program, 0)), MPI_ARGV_NULL, INTEGER(nodes)[0], MPI_INFO_NULL, 0, MPI_COMM_SELF, &intercomm, slaverrcode); [...] UNPROTECT(1); return(ans); } \end{verbatim} \es \bs \section{Other opportunities: \Rpackage{Rmpi}-like packages} \begin{itemize} \item \Rpackage{rpvm} provides an interface to the PVM library (similar to MPI); additional functionality not as developed as \Rpackage{Rmpi}. \item \Rpackage{snow} provides a common interface to point-to-point commands (like \Rfunction{mpi.send.Robj}) and functions (like \Rfunction{mpi.parLapply}) built on top of \Rpackage{Rmpi}, \Rpackage{rpvm}, or native `sockets'. \item \Rpackage{papply} simple \Rfunction{mpi.parLapply} functionality, used transparently in serial or parallel modes. \end{itemize} \es \bs \subsection{NetWorkSpaces} \begin{itemize} \item \mref{http://nws-r.sourceforge.net}{NetWorkSpaces} allows variables to be stored on a centralized server, and accessed by multiple instances of \R{} -- the illusion of shared memory. \item Multi-language (\R{}, Python, matlab) support, i.e., possible to store a variable in matlab, access and manipulate it in \R{}, and forward the result to Python. \end{itemize} \es \bs \subsection{Tools used today} Software infrastructure \begin{itemize} \item MPI (e.g., \mref{http://www.lam-mpi.org/}{LAM/MPI}) for linux, \mref{http://www-unix.mcs.anl.gov/mpi/mpich/}{MPICH2} for Windows. \item Intentionally clustered computers likely already have MPI. \end{itemize} \Rpackage{Rmpi} \begin{itemize} \item Linux: usually \verb|biocLite(Rmpi)|. \item Windows: \mref{http://www.stats.uwo.ca/faculty/yu/Rmpi/}{binary} download and instructions. \end{itemize} \mref{http://dirk.eddelbuettel.com/quantian.html}{Quantian} \begin{itemize} \item Linux distribution available on a single bootable CD. \item Contains MPI, \R{}, and most CRAN and Bioconductor packages! \end{itemize} \es \bs \subsection{Ideas for development} \begin{itemize} \item Exposing existing functionality for parallelization, e.g., \Rfunction{xval} in \Rpackage{MLInterfaces}. \item Building high-level abstractions to MPI, e.g., automatically partitioning work in batch jobs, creatign the illusion of shared variables. \item Providing creative solutions to interactive parallel programming. \end{itemize} \es \end{document}