--- title: "mirai - Serialization (Arrow, ADBC, polars, torch)" vignette: > %\VignetteIndexEntry{mirai - Serialization (Arrow, ADBC, polars, torch)} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### 1. Serialization: Arrow, polars and beyond Native R serialization transfers data between host and daemons. Objects accessed via external pointers cannot be serialized and normally error in mirai operations. Using [`arrow`](https://arrow.apache.org/docs/r/) as an example: ``` r library(mirai) library(arrow, warn.conflicts = FALSE) daemons(1) everywhere(library(arrow)) x <- as_arrow_table(iris) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> 'miraiError' chr Error: Invalid , external pointer to null daemons(0) ``` `serial_config()` creates custom serialization configurations with functions that hook into R's native serialization mechanism for reference objects ('refhooks'). Pass this configuration to the 'serial' argument of `daemons()`: ``` r cfg <- serial_config( "ArrowTabular", arrow::write_to_raw, function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE) ) daemons(1, serial = cfg) everywhere(library(arrow)) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> $a #> Table #> 6 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species > #> #> See $metadata for additional Schema metadata #> #> $b #> [1] "some text" daemons(0) ``` The arrow table now handles seamlessly, even when deeply nested in lists or other structures. Register multiple serialization functions for different object classes. This example combines Arrow with [`polars`](https://pola-rs.github.io/r-polars/), a Rust dataframe library (requires polars >= 1.0.0): ``` r daemons( n = 1, serial = serial_config( c("ArrowTabular", "polars_data_frame"), list(arrow::write_to_raw, \(x) x$serialize()), list(function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE), polars::pl$deserialize_df) ) ) x <- polars::as_polars_df(iris) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> $a #> shape: (6, 5) #> ┌──────────────┬─────────────┬──────────────┬─────────────┬─────────┐ #> │ Sepal.Length ┆ Sepal.Width ┆ Petal.Length ┆ Petal.Width ┆ Species │ #> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ #> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ cat │ #> ╞══════════════╪═════════════╪══════════════╪═════════════╪═════════╡ #> │ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ setosa │ #> │ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ setosa │ #> │ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ setosa │ #> └──────────────┴─────────────┴──────────────┴─────────────┴─────────┘ #> #> $b #> [1] "some text" daemons(0) ``` ### 2. Serialization: Torch [`torch`](https://torch.mlverse.org/) tensors work seamlessly in mirai computations. **Setup:** 1. Create serialization configuration with 'class' as 'torch_tensor' 2. Set up daemons, supplying configuration to 'serial' 3. (Optional) Use `everywhere()` to load `torch` on all daemons ``` r library(mirai) library(torch) cfg <- serial_config( class = "torch_tensor", sfunc = torch::torch_serialize, ufunc = torch::torch_load ) daemons(1, serial = cfg) everywhere(library(torch)) ``` **Example Usage:** This creates a convolutional neural network with `torch::nn_module()`, specifies parameters, then initializes them in a parallel process: ``` r model <- nn_module( initialize = function(in_size, out_size) { self$conv1 <- nn_conv2d(in_size, out_size, 5) self$conv2 <- nn_conv2d(in_size, out_size, 5) }, forward = function(x) { x <- self$conv1(x) x <- nnf_relu(x) x <- self$conv2(x) x <- nnf_relu(x) x } ) params <- list(in_size = 1, out_size = 20) m <- mirai(do.call(model, params), model = model, params = params) m[] #> An `nn_module` containing 1,040 parameters. #> #> ── Modules ───────────────────────────────────────────────────────────────────────────────── #> • conv1: #520 parameters #> • conv2: #520 parameters ``` The returned model contains many tensor elements: ``` r m$data$parameters$conv1.weight #> torch_tensor #> (1,1,.,.) = #> -0.1218 0.1835 -0.1114 -0.1365 -0.1824 #> 0.1107 -0.0498 -0.1219 -0.0938 -0.1570 #> -0.1944 0.0355 0.1750 -0.1612 -0.1590 #> -0.0806 -0.1906 -0.0272 -0.1732 -0.0491 #> -0.0079 -0.0874 -0.1256 0.1276 0.0664 #> #> (2,1,.,.) = #> -0.1450 -0.0371 0.0601 -0.1578 0.0918 #> 0.1118 -0.0800 0.0359 0.0452 0.1182 #> 0.0516 0.0109 0.0186 0.1399 -0.1431 #> 0.1720 -0.0919 0.0616 0.0937 0.1511 #> -0.0270 0.0936 0.1510 0.1995 0.1934 #> #> (3,1,.,.) = #> 0.1055 0.0056 0.0491 -0.0096 0.0655 #> 0.1950 0.0676 0.0254 0.0834 -0.0401 #> 0.1658 0.1767 -0.0338 0.1644 0.1806 #> -0.0346 -0.1521 0.0490 0.1153 0.0755 #> -0.0832 0.0074 -0.0607 0.1704 0.1454 #> #> (4,1,.,.) = #> 0.1091 0.1982 0.1185 0.1655 0.1716 #> 0.1987 -0.0517 -0.0115 -0.0641 0.0294 #> 0.0078 -0.0942 -0.1629 -0.1114 -0.0833 #> 0.0395 -0.0101 0.0837 0.1523 0.0673 #> -0.0984 0.0988 -0.1154 0.0453 -0.1577 #> #> (5,1,.,.) = #> 0.0630 -0.0820 -0.1399 0.0528 -0.0896 #> ... [the output was truncated (use n=-1 to disable)] #> [ CPUFloatType{20,1,5,5} ][ requires_grad = TRUE ] ``` Pass model parameters to an optimizer, also initialized in a parallel process: ``` r optim <- mirai(optim_rmsprop(params = params), params = m$data$parameters) optim[] #> #> Inherits from: #> Public: #> add_param_group: function (param_group) #> clone: function (deep = FALSE) #> defaults: list #> initialize: function (params, lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0, #> load_state_dict: function (state_dict, ..., .refer_to_state_dict = FALSE) #> param_groups: list #> state: State, R6 #> state_dict: function () #> step: function (closure = NULL) #> zero_grad: function (set_to_none = FALSE) #> Private: #> deep_clone: function (name, value) #> step_helper: function (closure, loop_fun) daemons(0) ``` Tensors and complex objects containing tensors pass seamlessly between host and daemons like any R object. Custom serialization leverages R's native 'refhook' mechanism for transparent usage. Fast and efficient, it minimizes data copies and uses official `torch` serialization methods directly. ### 3. Database Hosting using Arrow Database Connectivity Use `DBI` to access and manipulate Apache Arrow data efficiently through ADBC (Arrow Database Connectivity). This creates an in-memory SQLite connection using the `adbcsqlite` backend. Serialization uses `arrow` functions in the `daemons()` call. The class is 'nanoarrow_array_stream' since `nanoarrow` backs all DBI `db*Arrow()` queries: ``` r library(mirai) cfg <- serial_config( class = "nanoarrow_array_stream", sfunc = arrow::write_to_raw, ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE) ) daemons(1, serial = cfg) everywhere( { library(DBI) # `adbi` and `adbcsqlite` packages must also be installed con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:") } ) ``` Use `mirai()` to write or query the database in Arrow format: ``` r m <- mirai(dbWriteTableArrow(con, "iris", iris)) m[] #> [1] TRUE m <- mirai(dbReadTableArrow(con, "iris")) m[] #> Table #> 150 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')) m[] #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species ``` Tight integration with R's 'refhook' system allows returning complex nested objects with multiple Arrow queries: ``` r m <- mirai({ a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6') b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6') x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5') y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2') list(sepal = list(length = a, width = b), petal = list(length = x, width = y)) }) m[] #> $sepal #> $sepal$length #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> $sepal$width #> Table #> 19 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> #> $petal #> $petal$length #> Table #> 24 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species #> #> $petal$width #> Table #> 5 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species ``` Use `everywhere()` to cleanly tear down databases before resetting daemons: ``` r everywhere(dbDisconnect(con)) daemons(0) ``` ### 4. Shiny / mirai / DBI / ADBC Integrated Example This demonstrates database connections hosted in mirai daemons powering a Shiny app. One-time `serialization()` setup ensures seamless Arrow data transport in the global environment outside `server()`. Each Shiny session creates a new database connection in a new daemon process, freeing resources when the session ends. This logic lives in `server()`. A unique ID identifies each session and specifies the daemons 'compute profile'. Non-dispatcher daemons work since scheduling isn't needed (all queries take a similar time, each session uses one daemon). Shiny ExtendedTask performs queries via `mirai()` using the session-specific compute profile: ``` r library(mirai) library(secretbase) library(shiny) library(bslib) # create an Arrow serialization configuration cfg <- serial_config( class = "nanoarrow_array_stream", sfunc = arrow::write_to_raw, ufunc = nanoarrow::read_nanoarrow ) # write 'iris' dataset to temp database file (for this demonstration) file <- tempfile() con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file) DBI::dbWriteTableArrow(con, "iris", iris) DBI::dbDisconnect(con) # common input parameters slmin <- min(iris$Sepal.Length) slmax <- max(iris$Sepal.Length) ui <- page_fluid( p("The time is ", textOutput("current_time", inline = TRUE)), hr(), h3("Shiny / mirai / DBI / ADBC demonstration"), p("New daemon-hosted database connection is created for every Shiny session"), sliderInput( "sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax, value = c(slmin, slmax), width = "75%" ), input_task_button("btn", "Return query"), tableOutput("table") ) # uses Shiny ExtendedTask with mirai server <- function(input, output, session) { # create unique session id by hashing current time with a random key id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L)) # create new daemon for each session daemons(1L, serial = cfg, .compute = id) # tear down daemon when session ends session$onEnded(function() daemons(0L, .compute = id)) # everywhere() loads DBI and creates ADBC connection in each daemon # and sets up serialization everywhere( { library(DBI) # `adbi` and `adbcsqlite` packages must also be installed con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file) }, file = file, .compute = id ) output$current_time <- renderText({ invalidateLater(1000) format(Sys.time(), "%H:%M:%S %p") }) task <- ExtendedTask$new( function(...) mirai( dbGetQueryArrow( con, sprintf( "SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f", sl[1L], sl[2L] ) ), ..., .compute = id ) ) |> bind_task_button("btn") observeEvent(input$btn, task$invoke(sl = input$sl)) output$table <- renderTable(task$result()) } # run Shiny app shinyApp(ui = ui, server = server) # deletes temp database file (for this demonstration) unlink(file) ```