---
title: "mirai - Serialization (Arrow, ADBC, polars, torch)"
vignette: >
%\VignetteIndexEntry{mirai - Serialization (Arrow, ADBC, polars, torch)}
%\VignetteEngine{litedown::vignette}
%\VignetteEncoding{UTF-8}
---
### 1. Serialization: Arrow, polars and beyond
Native R serialization transfers data between host and daemons.
Objects accessed via external pointers cannot be serialized and normally error in mirai operations.
Using [`arrow`](https://arrow.apache.org/docs/r/) as an example:
``` r
library(mirai)
library(arrow, warn.conflicts = FALSE)
daemons(1)
everywhere(library(arrow))
x <- as_arrow_table(iris)
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> 'miraiError' chr Error: Invalid
, external pointer to null
daemons(0)
```
`serial_config()` creates custom serialization configurations with functions that hook into R's native serialization mechanism for reference objects ('refhooks').
Pass this configuration to the 'serial' argument of `daemons()`:
``` r
cfg <- serial_config(
"ArrowTabular",
arrow::write_to_raw,
function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)
daemons(1, serial = cfg)
everywhere(library(arrow))
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> Table
#> 6 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species >
#>
#> See $metadata for additional Schema metadata
#>
#> $b
#> [1] "some text"
daemons(0)
```
The arrow table now handles seamlessly, even when deeply nested in lists or other structures.
Register multiple serialization functions for different object classes.
This example combines Arrow with [`polars`](https://pola-rs.github.io/r-polars/), a Rust dataframe library (requires polars >= 1.0.0):
``` r
daemons(
n = 1,
serial = serial_config(
c("ArrowTabular", "polars_data_frame"),
list(arrow::write_to_raw, \(x) x$serialize()),
list(function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE), polars::pl$deserialize_df)
)
)
x <- polars::as_polars_df(iris)
m <- mirai(list(a = head(x), b = "some text"), x = x)
m[]
#> $a
#> shape: (6, 5)
#> ┌──────────────┬─────────────┬──────────────┬─────────────┬─────────┐
#> │ Sepal.Length ┆ Sepal.Width ┆ Petal.Length ┆ Petal.Width ┆ Species │
#> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │
#> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ cat │
#> ╞══════════════╪═════════════╪══════════════╪═════════════╪═════════╡
#> │ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ setosa │
#> │ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ setosa │
#> │ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ setosa │
#> │ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ setosa │
#> └──────────────┴─────────────┴──────────────┴─────────────┴─────────┘
#>
#> $b
#> [1] "some text"
daemons(0)
```
### 2. Serialization: Torch
[`torch`](https://torch.mlverse.org/) tensors work seamlessly in mirai computations.
**Setup:**
1. Create serialization configuration with 'class' as 'torch_tensor'
2. Set up daemons, supplying configuration to 'serial'
3. (Optional) Use `everywhere()` to load `torch` on all daemons
``` r
library(mirai)
library(torch)
cfg <- serial_config(
class = "torch_tensor",
sfunc = torch::torch_serialize,
ufunc = torch::torch_load
)
daemons(1, serial = cfg)
everywhere(library(torch))
```
**Example Usage:**
This creates a convolutional neural network with `torch::nn_module()`, specifies parameters, then initializes them in a parallel process:
``` r
model <- nn_module(
initialize = function(in_size, out_size) {
self$conv1 <- nn_conv2d(in_size, out_size, 5)
self$conv2 <- nn_conv2d(in_size, out_size, 5)
},
forward = function(x) {
x <- self$conv1(x)
x <- nnf_relu(x)
x <- self$conv2(x)
x <- nnf_relu(x)
x
}
)
params <- list(in_size = 1, out_size = 20)
m <- mirai(do.call(model, params), model = model, params = params)
m[]
#> An `nn_module` containing 1,040 parameters.
#>
#> ── Modules ─────────────────────────────────────────────────────────────────────────────────
#> • conv1: #520 parameters
#> • conv2: #520 parameters
```
The returned model contains many tensor elements:
``` r
m$data$parameters$conv1.weight
#> torch_tensor
#> (1,1,.,.) =
#> -0.1218 0.1835 -0.1114 -0.1365 -0.1824
#> 0.1107 -0.0498 -0.1219 -0.0938 -0.1570
#> -0.1944 0.0355 0.1750 -0.1612 -0.1590
#> -0.0806 -0.1906 -0.0272 -0.1732 -0.0491
#> -0.0079 -0.0874 -0.1256 0.1276 0.0664
#>
#> (2,1,.,.) =
#> -0.1450 -0.0371 0.0601 -0.1578 0.0918
#> 0.1118 -0.0800 0.0359 0.0452 0.1182
#> 0.0516 0.0109 0.0186 0.1399 -0.1431
#> 0.1720 -0.0919 0.0616 0.0937 0.1511
#> -0.0270 0.0936 0.1510 0.1995 0.1934
#>
#> (3,1,.,.) =
#> 0.1055 0.0056 0.0491 -0.0096 0.0655
#> 0.1950 0.0676 0.0254 0.0834 -0.0401
#> 0.1658 0.1767 -0.0338 0.1644 0.1806
#> -0.0346 -0.1521 0.0490 0.1153 0.0755
#> -0.0832 0.0074 -0.0607 0.1704 0.1454
#>
#> (4,1,.,.) =
#> 0.1091 0.1982 0.1185 0.1655 0.1716
#> 0.1987 -0.0517 -0.0115 -0.0641 0.0294
#> 0.0078 -0.0942 -0.1629 -0.1114 -0.0833
#> 0.0395 -0.0101 0.0837 0.1523 0.0673
#> -0.0984 0.0988 -0.1154 0.0453 -0.1577
#>
#> (5,1,.,.) =
#> 0.0630 -0.0820 -0.1399 0.0528 -0.0896
#> ... [the output was truncated (use n=-1 to disable)]
#> [ CPUFloatType{20,1,5,5} ][ requires_grad = TRUE ]
```
Pass model parameters to an optimizer, also initialized in a parallel process:
``` r
optim <- mirai(optim_rmsprop(params = params), params = m$data$parameters)
optim[]
#>
#> Inherits from:
#> Public:
#> add_param_group: function (param_group)
#> clone: function (deep = FALSE)
#> defaults: list
#> initialize: function (params, lr = 0.01, alpha = 0.99, eps = 1e-08, weight_decay = 0,
#> load_state_dict: function (state_dict, ..., .refer_to_state_dict = FALSE)
#> param_groups: list
#> state: State, R6
#> state_dict: function ()
#> step: function (closure = NULL)
#> zero_grad: function (set_to_none = FALSE)
#> Private:
#> deep_clone: function (name, value)
#> step_helper: function (closure, loop_fun)
daemons(0)
```
Tensors and complex objects containing tensors pass seamlessly between host and daemons like any R object.
Custom serialization leverages R's native 'refhook' mechanism for transparent usage.
Fast and efficient, it minimizes data copies and uses official `torch` serialization methods directly.
### 3. Database Hosting using Arrow Database Connectivity
Use `DBI` to access and manipulate Apache Arrow data efficiently through ADBC (Arrow Database Connectivity).
This creates an in-memory SQLite connection using the `adbcsqlite` backend.
Serialization uses `arrow` functions in the `daemons()` call.
The class is 'nanoarrow_array_stream' since `nanoarrow` backs all DBI `db*Arrow()` queries:
``` r
library(mirai)
cfg <- serial_config(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE)
)
daemons(1, serial = cfg)
everywhere(
{
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = ":memory:")
}
)
```
Use `mirai()` to write or query the database in Arrow format:
``` r
m <- mirai(dbWriteTableArrow(con, "iris", iris))
m[]
#> [1] TRUE
m <- mirai(dbReadTableArrow(con, "iris"))
m[]
#> Table
#> 150 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
m <- mirai(dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6'))
m[]
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
```
Tight integration with R's 'refhook' system allows returning complex nested objects with multiple Arrow queries:
``` r
m <- mirai({
a <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Length" < 4.6')
b <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Sepal.Width" < 2.6')
x <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Length" < 1.5')
y <- dbGetQueryArrow(con, 'SELECT * FROM iris WHERE "Petal.Width" < 0.2')
list(sepal = list(length = a, width = b), petal = list(length = x, width = y))
})
m[]
#> $sepal
#> $sepal$length
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
#>
#> $sepal$width
#> Table
#> 19 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
#>
#>
#> $petal
#> $petal$length
#> Table
#> 24 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
#>
#> $petal$width
#> Table
#> 5 rows x 5 columns
#> $Sepal.Length
#> $Sepal.Width
#> $Petal.Length
#> $Petal.Width
#> $Species
```
Use `everywhere()` to cleanly tear down databases before resetting daemons:
``` r
everywhere(dbDisconnect(con))
daemons(0)
```
### 4. Shiny / mirai / DBI / ADBC Integrated Example
This demonstrates database connections hosted in mirai daemons powering a Shiny app.
One-time `serialization()` setup ensures seamless Arrow data transport in the global environment outside `server()`.
Each Shiny session creates a new database connection in a new daemon process, freeing resources when the session ends.
This logic lives in `server()`.
A unique ID identifies each session and specifies the daemons 'compute profile'.
Non-dispatcher daemons work since scheduling isn't needed (all queries take a similar time, each session uses one daemon).
Shiny ExtendedTask performs queries via `mirai()` using the session-specific compute profile:
``` r
library(mirai)
library(secretbase)
library(shiny)
library(bslib)
# create an Arrow serialization configuration
cfg <- serial_config(
class = "nanoarrow_array_stream",
sfunc = arrow::write_to_raw,
ufunc = nanoarrow::read_nanoarrow
)
# write 'iris' dataset to temp database file (for this demonstration)
file <- tempfile()
con <- DBI::dbConnect(adbi::adbi("adbcsqlite"), uri = file)
DBI::dbWriteTableArrow(con, "iris", iris)
DBI::dbDisconnect(con)
# common input parameters
slmin <- min(iris$Sepal.Length)
slmax <- max(iris$Sepal.Length)
ui <- page_fluid(
p("The time is ", textOutput("current_time", inline = TRUE)),
hr(),
h3("Shiny / mirai / DBI / ADBC demonstration"),
p("New daemon-hosted database connection is created for every Shiny session"),
sliderInput(
"sl", "Query iris dataset based on Sepal Length", min = slmin, max = slmax,
value = c(slmin, slmax), width = "75%"
),
input_task_button("btn", "Return query"),
tableOutput("table")
)
# uses Shiny ExtendedTask with mirai
server <- function(input, output, session) {
# create unique session id by hashing current time with a random key
id <- secretbase::siphash13(Sys.time(), key = nanonext::random(4L))
# create new daemon for each session
daemons(1L, serial = cfg, .compute = id)
# tear down daemon when session ends
session$onEnded(function() daemons(0L, .compute = id))
# everywhere() loads DBI and creates ADBC connection in each daemon
# and sets up serialization
everywhere(
{
library(DBI) # `adbi` and `adbcsqlite` packages must also be installed
con <<- dbConnect(adbi::adbi("adbcsqlite"), uri = file)
},
file = file,
.compute = id
)
output$current_time <- renderText({
invalidateLater(1000)
format(Sys.time(), "%H:%M:%S %p")
})
task <- ExtendedTask$new(
function(...) mirai(
dbGetQueryArrow(
con,
sprintf(
"SELECT * FROM iris WHERE \"Sepal.Length\" BETWEEN %.2f AND %.2f",
sl[1L],
sl[2L]
)
),
...,
.compute = id
)
) |> bind_task_button("btn")
observeEvent(input$btn, task$invoke(sl = input$sl))
output$table <- renderTable(task$result())
}
# run Shiny app
shinyApp(ui = ui, server = server)
# deletes temp database file (for this demonstration)
unlink(file)
```