Creating Extensions for sparklyr

library(sparklyr)
sc <- spark_connect(master = "local", version = "2.0.0")

Introduction

The sparklyr package provides a dplyr interface to Spark DataFrames as well as an R interface to Spark’s distributed machine learning pipelines. However, since Spark is a general-purpose cluster computing system there are many other R interfaces that could be built (e.g. interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.).

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. This guide describes how you can use these tools to create your own custom R interfaces to Spark.

Examples

Here’s an example of an extension function that calls the text file line counting function available via the SparkContext:

library(sparklyr)
count_lines <- function(sc, file) {
  spark_context(sc) %>% 
    invoke("textFile", file, 1L) %>% 
    invoke("count")
}

The count_lines function takes a spark_connection (sc) argument which enables it to obtain a reference to the SparkContext object, and in turn call the textFile().count() method.

You can use this function with an existing sparklyr connection as follows:

library(sparklyr)
sc <- spark_connect(master = "local")
count_lines(sc, "hdfs://path/data.csv")

Here are links to some additional examples of extension packages:

Package Description
spark.sas7bdat Read in SAS data in parallel into Apache Spark.
rsparkling Extension for using H2O machine learning algorithms against Spark Data Frames.
sparkhello Simple example of including a custom JAR file within an extension package.
rddlist Implements some methods of an R list as a Spark RDD (resilient distributed dataset).
sparkwarc Load WARC files into Apache Spark with sparklyr.
sparkavro Load Avro data into Spark with sparklyr. It is a wrapper of spark-avro
crassy Connect to Cassandra with sparklyr using the Spark-Cassandra-Connector.
sparklygraphs R interface for GraphFrames which aims to provide the functionality of GraphX.
sparklyr.nested Extension for working with nested data.
sparklyudf Simple example registering an Scala UDF within an extension package.
mleap R Interface to MLeap.
sparkbq Sparklyr extension package to connect to Google BigQuery.
sparkgeo Sparklyr extension package providing geospatial analytics capabilities.
sparklytd Spaklyr plugin for td-spark to connect TD from R.
sparkts Extensions for the spark-timeseries framework.
sparkxgb R interface for XGBoost on Spark.
sparktf R interface to Spark TensorFlow Connector.
geospark R interface to GeoSpark to perform spatial analysis in Spark.
synapseml Simplifies the creation of scalable machine learning pipelines. SynapseML builds on Apache Spark and SparkML to enable new kinds of machine learning, analytics, and model deployment workflows.

Core Types

Three classes are defined for representing the fundamental types of the R to Java bridge:

Function Description
spark_connection Connection between R and the Spark shell process
spark_jobj Instance of a remote Spark object
spark_dataframe Instance of a remote Spark DataFrame object

S3 methods are defined for each of these classes so they can be easily converted to or from objects that contain or wrap them. Note that for any given spark_jobj it’s possible to discover the underlying spark_connection.

Calling Spark from R

There are several functions available for calling the methods of Java objects and static methods of Java classes:

Function Description
invoke Call a method on an object
invoke_new Create a new object by invoking a constructor
invoke_static Call a static method on an object

For example, to create a new instance of the java.math.BigInteger class and then call the longValue() method on it you would use code like this:

billionBigInteger <- invoke_new(sc, "java.math.BigInteger", "1000000000")
billion <- invoke(billionBigInteger, "longValue")

Note the sc argument: that’s the spark_connection object which is provided by the front-end package (e.g. sparklyr).

The previous example can be re-written to be more compact and clear using magrittr pipes:

billion <- sc %>% 
  invoke_new("java.math.BigInteger", "1000000000") %>%
    invoke("longValue")

This syntax is similar to the method-chaining syntax often used with Scala code so is generally preferred.

Calling a static method of a class is also straightforward. For example, to call the Math::hypot() static function you would use this code:

hypot <- sc %>% 
  invoke_static("java.lang.Math", "hypot", 10, 20) 

Wrapper Functions

Creating an extension typically consists of writing R wrapper functions for a set of Spark services. In this section we’ll describe the typical form of these functions as well as how to handle special types like Spark DataFrames.

Here’s the wrapper function for textFile().count() which we defined earlier:

count_lines <- function(sc, file) {
  spark_context(sc) %>% 
    invoke("textFile", file, 1L) %>% 
      invoke("count")
}

The count_lines function takes a spark_connection (sc) argument which enables it to obtain a reference to the SparkContext object, and in turn call the textFile().count() method.

The following functions are useful for implementing wrapper functions of various kinds:

Function Description
spark_connection Get the Spark connection associated with an object (S3)
spark_jobj Get the Spark jobj associated with an object (S3)
spark_dataframe Get the Spark DataFrame associated with an object (S3)
spark_context Get the SparkContext for a spark_connection
hive_context Get the HiveContext for a spark_connection
spark_version Get the version of Spark (as a numeric_version) for a spark_connection

The use of these functions is illustrated in this simple example:

analyze <- function(x, features) {
  
  # normalize whatever we were passed (e.g. a dplyr tbl) into a DataFrame
  df <- spark_dataframe(x)
  
  # get the underlying connection so we can create new objects
  sc <- spark_connection(df)
  
  # create an object to do the analysis and call its `analyze` and `summary`
  # methods (note that the df and features are passed to the analyze function)
  summary <- sc %>%  
    invoke_new("com.example.tools.Analyzer") %>% 
      invoke("analyze", df, features) %>% 
      invoke("summary")

  # return the results
  summary
}

The first argument is an object that can be accessed using the Spark DataFrame API (this might be an actual reference to a DataFrame or could rather be a dplyr tbl which has a DataFrame reference inside it).

After using the spark_dataframe function to normalize the reference, we extract the underlying Spark connection associated with the data frame using the spark_connection function. Finally, we create a new Analyzer object, call it’s analyze method with the DataFrame and list of features, and then call the summary method on the results of the analysis.

Accepting a spark_jobj or spark_dataframe as the first argument of a function makes it very easy to incorporate into magrittr pipelines so this pattern is highly recommended when possible.

Dependencies

When creating R packages which implement interfaces to Spark you may need to include additional dependencies. Your dependencies might be a set of Spark Packages or might be a custom JAR file. In either case, you’ll need a way to specify that these dependencies should be included during the initialization of a Spark session. A Spark dependency is defined using the spark_dependency function:

Function Description
spark_dependency Define a Spark dependency consisting of JAR files and Spark packages

Your extension package can specify it’s dependencies by implementing a function named spark_dependencies within the package (this function should not be publicly exported). For example, let’s say you were creating an extension package named sparkds that needs to include a custom JAR as well as the Redshift and Apache Avro packages:

spark_dependencies <- function(spark_version, scala_version, ...) {
  spark_dependency(
    jars = c(
      system.file(
        sprintf("java/sparkds-%s-%s.jar", spark_version, scala_version), 
        package = "sparkds"
      )
    ),
    packages = c(
      sprintf("com.databricks:spark-redshift_%s:0.6.0", scala_version),
      sprintf("com.databricks:spark-avro_%s:2.0.1", scala_version)
    )
  )
}

.onLoad <- function(libname, pkgname) {
  sparklyr::register_extension(pkgname)
}

The spark_version argument is provided so that a package can support multiple Spark versions for it’s JARs. Note that the argument will include just the major and minor versions (e.g. 1.6 or 2.0) and will not include the patch level (as JARs built for a given major/minor version are expected to work for all patch levels).

The scala_version argument is provided so that a single package can support multiple Scala compiler versions for it’s JARs and packages (currently Scala 1.6 downloadable binaries are compiled with Scala 2.10 and Scala 2.0 downloadable binaries are compiled with Scala 2.11).

The ... argument is unused but nevertheless should be included to ensure compatibility if new arguments are added to spark_dependencies in the future.

The .onLoad function registers your extension package so that it’s spark_dependencies function will be automatically called when new connections to Spark are made via spark_connect:

library(sparklyr)
library(sparkds)
sc <- spark_connect(master = "local")

Compiling JARs

The sparklyr package includes a utility function (compile_package_jars) that will automatically compile a JAR file from your Scala source code for the required permutations of Spark and Scala compiler versions. To use the function just invoke it from the root directory of your R package as follows:

sparklyr::compile_package_jars()

Note that a prerequisite to calling compile_package_jars is the installation of the Scala 2.10 and 2.11 compilers to one of the following paths:

  • /opt/scala
  • /opt/local/scala
  • /usr/local/scala
  • ~/scala (Windows-only)

See the sparkhello repository for a complete example of including a custom JAR within an extension package.

CRAN

When including a JAR file within an R package distributed on CRAN, you should follow the guidelines provided in Writing R Extensions:

Java code is a special case: except for very small programs, .java files should be byte-compiled (to a .class file) and distributed as part of a .jar file: the conventional location for the .jar file(s) is inst/java. It is desirable (and required under an Open Source license) to make the Java source files available: this is best done in a top-level java directory in the package – the source files should not be installed.

Data Types

The ensure_* family of functions can be used to enforce specific data types that are passed to a Spark routine. For example, Spark routines that require an integer will not accept an R numeric element. Use these functions ensure certain parameters are scalar integers, or scalar doubles, and so on.

  • ensure_scalar_integer
  • ensure_scalar_double
  • ensure_scalar_boolean
  • ensure_scalar_character

In order to match the correct data types while calling Scala code from R, or retrieving results from Scala back to R, consider the following types mapping:

From R Scala To R
NULL void NULL
integer Int integer
character String character
logical Boolean logical
double Double double
numeric Double double
Float double
Decimal double
Long double
raw Array[Byte] raw
Date Date Date
POSIXlt Time
POSIXct Time POSIXct
list Array[T] list
environment Map[String, T]
jobj Object jobj

Compiling

Most Spark extensions won’t need to define their own compilation specification, and can instead rely on the default behavior of compile_package_jars. For users who would like to take more control over where the scalac compilers should be looked up, use the spark_compilation_spec fucnction. The Spark compilation specification is used when compiling Spark extension Java Archives, and defines which versions of Spark, as well as which versions of Scala, should be used for compilation.