Run R inside Databricks Connect

Last updated: Wed Feb 28 13:29:24 2024

Intro

Support for spark_apply() is currently available in the development versions of sparklyr, and pysparklyr. To install, run the following:

remotes::install_github("sparklyr/sparklyr")
remotes::install_github("mlverse/pysparklyr")

Databricks Connect is now able to run regular Python code inside Spark. sparklyr takes advantage of this capability by having Python transport and run the R code. It does this via the rpy2 Python library. Using this library also guarantees Arrow support.

flowchart LR
  subgraph mm[My machine]
    sp[R <br> **********  <br>sparklyr]
    rp[Python<br> **************** <br>rpy2 'packages'<br> the R code]
  end
  subgraph db[Databricks]
    subgraph sr[Spark]
      pt[Python<br> ********************* <br>rpy2 runs the R code]
    end
  end

sp --> rp
rp --> sr

style mm   fill:#fff,stroke:#666,color:#000
style sp   fill:#fff,stroke:#666,color:#000
style rp   fill:#fff,stroke:#666,color:#000
style db   fill:#fff,stroke:#666,color:#000
style sr   fill:#fff,stroke:#666,color:#000
style pt   fill:#fff,stroke:#666,color:#000

Figure 1: How sparklyr uses rpy2 to run R code in Databricks Connect

Getting started

If you have been using sparklyr with Databricks Connect v2 already, then after upgrading the packages, you will be prompted to install rpy2 in your Python environment. The prompt will occur the first time you use spark_apply() in an interactive R session. If this is the first time you are using sparklyr with Databricks Connect v2, please refer to our intro article“Databricks Connect v2” to learn how to setup your environment.

As shown in the diagram on the previous section, rpy2 is needed on the Databricks cluster you plan to use. This means that you will need to “manually” install the library in the cluster. This is a simple operation that is done via your Databricks web portal. Here are the instructions that shows you how to do that: Databricks - Cluster Libraries.

What is supported in spark_apply() - At a glance

Argument Supported? Notes
x Yes
f Yes
columns Yes Requires a string entry that contains the name of the column and its Spark variable type. Accepted values are: long, decimal, string, datetime and bool. Example: columns = "x long, y string". If not provided, sparklyr will automatically create one, by examining the first 10 records of x, and it will provide a columns spec you can use when running spark_apply() again. See: Providing a schema
memory Yes
group_by Yes
packages No You will need to pre-install the needed R packages in your cluster via the Databricks web portal, see R packages
context No
name Yes
barrier Yes Support only on ungrouped data. In other words, it is valid when the group_by argument is used.
fetch_result_as_sdf Yes At this time, spark_apply() inside Databricks Connect only supports rectangular data, so seeing to FALSE will always return a data frame.
partition_index_param No
arrow_max_records_per_batch Yes Support only on ungrouped data. In other words, it is valid when the group_by argument is used.
auto_deps No
...

R packages

If your spark_apply() call uses specific R packages, you will need to pre-install those specific packages in your target cluster. This is a simple operation, because you can do this via your Databricks web portal, please see Databricks - Cluster Libraries to learn how to do this.

Only CRAN packages supported

The Databricks cluster library interface is able to source packages from CRAN only. This means that packages installed from GitHub, or another alternative sources, will not be available.

Additional background

In previous implementation, spark_apply() was able to easily copy the locally installed R packages in order to ensure that your code will run in the cluster. This was possible because R, and RStudio, was running in one of the matching servers in the Spark cluster. Because sparklyr is running on a remote machine, more likely a laptop, this is no longer an option. In the vast majority of cases, the remote machine will be on different a Operating System than the cluster. Additionally, transmitting the unpacked, compiled, R packages would take a long time over a broadband Internet connection.

Providing a schema

Passing a schema in columns will makespark_apply() run faster. Because if not provided, sparklyr has to collect the first 10 rows, and run the R code in order to try and determine the names and types of your resulting data set. As a convenience, sparklyr will output a message with the schema it used as the schema. If you are going to rerun your spark_apply() command again, you can copy and paste the output of the message to you code.

spark_apply(
  tbl_mtcars,
  nrow,
  group_by = "am"
)
#> To increase performance, use the following schema:
#> columns = "am double, x long"
#> # Source:   table<sparklyr_tmp_table_c6c2c676_ed3f_445d_b98a_af1e9c16bb17> [2 x 2]
#> # Database: spark_connection
#>      am     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13

Passing the columns argument, silences the message:

spark_apply(
  tbl_mtcars,
  nrow,
  group_by = "am", 
  columns = "am double, x long"
)
#> # Source:   table<sparklyr_tmp_table_0dad7e8f_edce_4456_9608_df59e158d0d7> [2 x 2]
#> # Database: spark_connection
#>      am     x
#>   <dbl> <dbl>
#> 1     0    19
#> 2     1    13

Partition data

Typically, with un-grouped data, the number of parallel jobs will correspond with the number of partitions of the data. For Databricks connections, sparklyr will, by default, attempt to use Apache Arrow. The Databricks Connect clusters come with Arrow installed. This approach also changes how Spark will partition your data. Instead of the number of partitions, Spark will use the value in the “Arrow Max Records per Bach” option. This option can be controlled directly in the spark_apply() call by setting the arrow_max_records_per_batch.

spark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 4, columns = "x long")
#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 4
#> # Source:   table<sparklyr_tmp_table_836e3dc0_7b34_44f3_8d34_3baf575644ed> [8 x 1]
#> # Database: spark_connection
#>       x
#>   <dbl>
#> 1     4
#> 2     4
#> 3     4
#> 4     4
#> 5     4
#> 6     4
#> 7     4
#> 8     4

If you pass a different Arrow Batch size than what the option is set to currently, sparklyr will change the value of that option, and will notify you of that:

spark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 2, columns = "x long")
#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 2
#> # Source:   table<sparklyr_tmp_table_d213594e_ab82_4fad_8415_e9a59cbd7d22> [?? x 1]
#> # Database: spark_connection
#>        x
#>    <dbl>
#>  1     2
#>  2     2
#>  3     2
#>  4     2
#>  5     2
#>  6     2
#>  7     2
#>  8     2
#>  9     2
#> 10     2
#> # ℹ more rows

Limitations

spark_apply() will only work on Databricks “Single Access” mode. “Shared Access” mode does not currently support mapInPandas(), and applyInPandas() (see Databricks - Access mode limitations). These are the Python functions that sparklyr uses to run the Python code, which in turn runs the R code via rpy2.