spark_apply(
tbl_mtcars,
nrow,group_by = "am"
)#> To increase performance, use the following schema:
#> columns = "am double, x long"
#> # Source: table<`sparklyr_tmp_table_fa5389aa_0761_4a6a_abf5_1da699868ffc`> [2 x 2]
#> # Database: spark_connection
#> am x
#> <dbl> <dbl>
#> 1 0 19
#> 2 1 13
Run R inside Databricks Connect
Last updated: Thu Apr 25 12:38:27 2024
Intro
Support for spark_apply()
is available starting with the following package versions:
sparklyr
- 1.8.5pysparklyr
- 0.1.4
Databricks Connect is now able to run regular Python code inside Spark. sparklyr
takes advantage of this capability by having Python transport and run the R code. It does this via the rpy2
Python library. Using this library also guarantees Arrow support.
Getting started
If you have been using sparklyr
with Databricks Connect v2 already, then after upgrading the packages, you will be prompted to install rpy2
in your Python environment. The prompt will occur the first time you use spark_apply()
in an interactive R session. If this is the first time you are using sparklyr
with Databricks Connect v2, please refer to our intro article“Databricks Connect v2” to learn how to setup your environment.
As shown in the diagram on the previous section, rpy2
is needed on the Databricks cluster you plan to use. This means that you will need to “manually” install the library in the cluster. This is a simple operation that is done via your Databricks web portal. Here are the instructions that shows you how to do that: Databricks - Cluster Libraries.
What is supported in spark_apply()
- At a glance
Argument | Supported? | Notes |
---|---|---|
x |
Yes | |
f |
Yes | |
columns |
Yes | Requires a string entry that contains the name of the column and its Spark variable type. Accepted values are: long , decimal , string , datetime and bool . Example: columns = "x long, y string" . If not provided, sparklyr will automatically create one, by examining the first 10 records of x , and it will provide a columns spec you can use when running spark_apply() again. See: Providing a schema |
memory |
Yes | |
group_by |
Yes | |
packages |
No | You will need to pre-install the needed R packages in your cluster via the Databricks web portal, see R packages |
context |
No | |
name |
Yes | |
barrier |
Yes | Support only on ungrouped data. In other words, it is valid when the group_by argument is used. |
fetch_result_as_sdf |
Yes | At this time, spark_apply() inside Databricks Connect only supports rectangular data, so seeing to FALSE will always return a data frame. |
partition_index_param |
No | |
arrow_max_records_per_batch |
Yes | Support only on ungrouped data. In other words, it is valid when the group_by argument is used. |
auto_deps |
No | |
... |
R packages
If your spark_apply()
call uses specific R packages, you will need to pre-install those specific packages in your target cluster. This is a simple operation, because you can do this via your Databricks web portal, please see Databricks - Cluster Libraries to learn how to do this.
The Databricks cluster library interface is able to source packages from CRAN only. This means that packages installed from GitHub, or another alternative sources, will not be available.
Additional background
In previous implementation, spark_apply()
was able to easily copy the locally installed R packages in order to ensure that your code will run in the cluster. This was possible because R, and RStudio, was running in one of the matching servers in the Spark cluster. Because sparklyr
is running on a remote machine, more likely a laptop, this is no longer an option. In the vast majority of cases, the remote machine will be on different a Operating System than the cluster. Additionally, transmitting the unpacked, compiled, R packages would take a long time over a broadband Internet connection.
Providing a schema
Passing a schema in columns
will makespark_apply()
run faster. Because if not provided, sparklyr
has to collect the first 10 rows, and run the R code in order to try and determine the names and types of your resulting data set. As a convenience, sparklyr
will output a message with the schema it used as the schema. If you are going to rerun your spark_apply()
command again, you can copy and paste the output of the message to you code.
Passing the columns
argument, silences the message:
spark_apply(
tbl_mtcars,
nrow,group_by = "am",
columns = "am double, x long"
)#> # Source: table<`sparklyr_tmp_table_87ef961e_1009_42f8_abdb_2a04c2a5ad38`> [2 x 2]
#> # Database: spark_connection
#> am x
#> <dbl> <dbl>
#> 1 0 19
#> 2 1 13
Partition data
Typically, with un-grouped data, the number of parallel jobs will correspond with the number of partitions of the data. For Databricks connections, sparklyr
will, by default, attempt to use Apache Arrow. The Databricks Connect clusters come with Arrow installed. This approach also changes how Spark will partition your data. Instead of the number of partitions, Spark will use the value in the “Arrow Max Records per Bach” option. This option can be controlled directly in the spark_apply()
call by setting the arrow_max_records_per_batch
.
spark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 4, columns = "x long")
#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 4
#> # Source: table<`sparklyr_tmp_table_c2866f69_faf7_49e5_a343_707862ec3dcc`> [8 x 1]
#> # Database: spark_connection
#> x
#> <dbl>
#> 1 4
#> 2 4
#> 3 4
#> 4 4
#> 5 4
#> 6 4
#> 7 4
#> 8 4
If you pass a different Arrow Batch size than what the option is set to currently, sparklyr
will change the value of that option, and will notify you of that:
spark_apply(tbl_mtcars, nrow, arrow_max_records_per_batch = 2, columns = "x long")
#> Changing spark.sql.execution.arrow.maxRecordsPerBatch to: 2
#> # Source: table<`sparklyr_tmp_table_a09a5277_7841_4746_bde1_c583aeee4baf`> [?? x 1]
#> # Database: spark_connection
#> x
#> <dbl>
#> 1 2
#> 2 2
#> 3 2
#> 4 2
#> 5 2
#> 6 2
#> 7 2
#> 8 2
#> 9 2
#> 10 2
#> # ℹ more rows
Limitations
spark_apply()
will only work on Databricks “Single Access” mode. “Shared Access” mode does not currently support mapInPandas()
, and applyInPandas()
(see Databricks - Access mode limitations). These are the Python functions that sparklyr
uses to run the Python code, which in turn runs the R code via rpy2
.