--- title: "Running an AnVIL workflow within R" author: - name: Kayla Interdonato affiliation: Roswell Park Comprehensive Cancer Center - name: Yubo Cheng affiliation: Roswell Park Comprehensive Cancer Center - name: Martin Morgan affiliation: Roswell Park Comprehensive Cancer Center email: Martin.Morgan@RoswellPark.org package: AnVIL output: BiocStyle::html_document abstract: | This vignette demonstrates how a user can edit, run, and stop a Terra / AnVIL workflow from within their R session. The configuration of the workflow can be retrieved and edited. Then this new configuration can be sent back to the Terra / AnVIL workspace for future use. With the new configuration defined by the user will then be able to run the workflow as well as stop any jobs from running. vignette: | %\VignetteIndexEntry{Running an AnVIL workflow within R} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r, include = FALSE} has_gcloud <- AnVILBase::has_avworkspace( strict = TRUE, platform = AnVILGCP::gcp() ) knitr::opts_chunk$set( eval = has_gcloud, collapse = TRUE, cache = TRUE ) options(width=75) ``` # Installation Install the _AnVIL_ package with ```{r, eval = FALSE} if (!requireNamespace("BiocManager", quietly = TRUE)) install.packages("BiocManager", repos = "https://cran.r-project.org") BiocManager::install("AnVIL") ``` Once installed, load the package with ```{r, message = FALSE, eval = TRUE, cache = FALSE} library(AnVILGCP) library(AnVIL) ``` # Workflow setup: DESeq2 ## Setting up the workspace and choosing a workflow The first step will be to define the namespace (billing project) and name of the workspace to be used with the functions. In our case we will be using the Bioconductor AnVIL namespace and a DESeq2 workflow as the intended workspace. ```{r workspace, eval = has_gcloud} avworkspace("bioconductor-rpci-anvil/Bioconductor-Workflow-DESeq2") ``` Each workspace can have 0 or more workflows. The workflows have a `name` and `namespace`, just as workspaces. Discover the workflows available in a workspace ```{r workflows, eval = has_gcloud} avworkflows() ``` From the table returned by `avworkflows()`, record the namespace and name of the workflow of interest using `avworkflow()`. ```{r workflow, eval = has_gcloud} avworkflow("bioconductor-rpci-anvil/AnVILBulkRNASeq") ``` ## Retrieving the configuration Each workflow defines inputs, outputs and certain code execution. These workflow 'configurations' that can be retrieved with `avworkflow_configuration_get`. ```{r configuration, eval = has_gcloud} config <- avworkflow_configuration_get() config ``` This function is using the workspace namespace, workspace name, workflow namespace, and workflow name we recorded above with `avworkspace()` and `avworkflow()`. # Updating workflows ## Changing the inputs / outputs There is a lot of information contained in the configuration but the only variables of interest to the user would be the inputs and outputs. In our case the inputs and outputs are pre-defined so we don't have to do anything to them. But for some workflows these inputs / outputs may be blank and therefore would need to be defined by the user. We will change one of our inputs values to show how this would be done. There are two functions to help users easily see the content of the inputs and outputs, they are `avworkflow_configuration_inputs` and `avworkflow_configuration_outputs`. These functions display the information in a `tibble` structure which users are most likely familiar with. ```{r inputs_outputs, eval = has_gcloud} inputs <- avworkflow_configuration_inputs(config) inputs outputs <- avworkflow_configuration_outputs(config) outputs ``` Let's change the `salmon.transcriptome_index_name` field; this is an arbitrary string identifier in our workflow. ```{r change_input, eval = has_gcloud} inputs <- inputs |> mutate( attribute = ifelse( name == "salmon.transcriptome_index_name", '"new_index_name"', attribute ) ) inputs ``` ## Update configuration locally Since the inputs have been modified we need to put this information into the configuration of the workflow. We can do this with `avworkflow_configuration_update()`. By default this function will take the inputs and outputs of the original configuration, just in case there were no changes to one of them (like in our example our outputs weren't changed). ```{r update_config, eval = has_gcloud} new_config <- avworkflow_configuration_update(config, inputs) new_config ``` ## Set a workflow configuration for reuse in AnVIL Use `avworkflow_configuration_set()` to permanently update the workflow to new parameter values. ```{r set_config, eval = has_gcloud} avworkflow_configuration_set(new_config) ``` Actually, the previous command validates `new_config` only; to update the configuration in AnVIL (i.e., replacing the values in the workspace workflow graphical user interface), add the argument `dry = FALSE`. ```{r set_config_not_dry} ## avworkflow_configuration_set(new_config, dry = FALSE) ``` # Running and stopping workflows ## Running a workflow To finally run the new workflow we need to know the name of the data set to be used in the workflow. This can be discovered by looking at the table of interest and using the name of the data set. ```{r entityName, eval = has_gcloud} entityName <- avtable("participant_set") |> pull(participant_set_id) |> head(1) avworkflow_run(new_config, entityName) ``` Again, actually running the new configuration requires the argument `dry = FALSE`. ```{r run_not_dry} ## avworkflow_run(new_config, entityName, dry = FALSE) ``` `config` is used to set the `rootEntityType` and workflow method name and namespace; other components of `config` are ignored (the other components will be read by Terra / AnVIL from values updated with `avworkflow_configuration_set()`). ## Monitoring workflows We can see that the workflow is running by using the `avworkflow_jobs` function. The elements of the table are ordered chronologically, with the most recent submission (most likely the job we just started!) listed first. ```{r checking_workflow, eval = has_gcloud} avworkflow_jobs() ``` ## Stopping workflows Use `avworkflow_stop()` to stop a currently running workflow. This will change the status of the job, reported by `avworkflow_jobs()`, from 'Submitted' to 'Aborted'. ```{r stop_workflow, eval = has_gcloud} avworkflow_stop() # dry = FALSE to stop avworkflow_jobs() ``` # Managing workflow output ## Workflow files Workflows can generate a large number of intermediate files (including diagnostic logs), as well as final outputs for more interactive analysis. Use the `submissionId` from `avworkflow_jobs()` to discover files produced by a submission; the default behavior lists files produced by the most recent job. ```{r files, eval = has_gcloud} submissionId <- "fb8e35b7-df5d-49e6-affa-9893aaeebf37" avworkflow_files(submissionId) ``` Workflow files are stored in the workspace bucket. The files can be localized to the persistent disk of the current runtime using `avworkflow_localize()`; the default is again to localize files from the most recently submitted job; use `type=` to influence which files ('control' e.g., log files, 'output', or 'all') are localized. ```{r localize, eval = has_gcloud} avworkflow_localize( submissionId, type = "output" ## dry = FALSE to localize ) ``` ## Workflow information Information on workflows (status, start, and end times, and input and output parameters) is available with `avworkflow_info()`. The examples below are from workflows using the [Rcollectl][] package to measure time spent in different parts of a single-cell RNA-seq analysis. The workspace is not publicly available, so results from `avworkflow_info()` are read from files in this package. [Rcollectl]: https://bioconductor.org/packages/Rcollectl A single job submission can launch multiple workflows. This occurs, e.g., when a workflow uses several rows from a DATA table to perform independent analyses. In the example used here, the workflows were configured to use different numbers of cores (3 or 8) and different ways of storing single-cell expression data (an in-memory `dgCMatrix` or an on-disk representation). Thus a single job submission started four workflows. This example was retrieved with ```{r, eval = FALSE} avworkspace("bioconductor-rpci-yubo/Rcollectlworkflowh5ad") submissionId <- "9385fd75-4cb7-470f-9e07-1979e2c8f193" info_1 <- avworkflow_info(submissionId) ``` Read the saved version of this result in to *R*. ```{r, message=FALSE, warning=FALSE} info_file_1 <- system.file(package = "AnVIL", "extdata", "avworkflow_info_1.rds") info_1 <- readRDS(info_file_1) ## view result of avworkflow_info() info_1 ``` Three of the workflows were successful, one failed. ```{r} info_1 |> select(workflowId, status, inputs, outputs) ``` Inputs and outputs for each workflow are stored as list. Strategies for working with list-columns in tibbles are described in Chapter 24 of [R for Data Science][r4ds]. Use `tidyr::unnest_wider()` to expand the inputs. The failed workflow involved 8 `core` using the on-disk data representation `dgCMatrix = FALSE`. [r4ds]: https://r4ds.hadley.nz/rectangling ```{r} info_1 |> select(workflowId, status, inputs) |> tidyr::unnest_wider(inputs) ``` The outputs (files summarizing the single-cell analysis, and the timestamps associated with each step in the analysis) involve two levels of nesting; following the strategy outlined in [R for Data Science][r4ds], the outputs (google bucket locations) are [r4ds]: https://r4ds.hadley.nz/rectangling ```{r} info_1 |> select(workflowId, outputs) |> tidyr::unnest_wider(outputs) |> tidyr::unnest_longer(starts_with("Rcollectl"), keep_empty = TRUE) ``` In the example used so far, each workflow produces a single file. A different examples is a workflow that produces multiple output files. This corresponds to the following submissionId: ```{r, eval = FALSE, eval = has_gcloud} submissionId <- "35280de1-42d8-492b-aa8c-5feff984bffa" info_2 <- avworkflow_info(submissionId) ``` Reading the result from the stored version: ```{r, message=FALSE, warning=FALSE} info_file_2 <- system.file(package = "AnVIL", "extdata", "avworkflow_info_2.rds") info_2 <- readRDS(info_file_2) info_2 ``` Inputs and outputs are manipulated in the same way as before, but this time there are multiple output files. ```{r, message=FALSE, warning=FALSE} info_2 |> select(workflowId, outputs) |> tidyr::unnest_wider(outputs) ``` To see the output files, expand the outputs column using `unnest_longer()`. ```{r} output_files <- info_2 |> select(workflowId, outputs) |> tidyr::unnest_wider(outputs) |> select(RcollectlWorkflowDelayedArrayParameters.Rcollectl_result) |> tidyr::unnest_longer( "RcollectlWorkflowDelayedArrayParameters.Rcollectl_result" ) output_files ``` The full file paths are available using `pull()` or `as.vector()`. ```{r} output_files |> as.vector() ``` # Session information ```{r sessionInfo} sessionInfo() ```