Spark Connect

Last updated: Sat Jun 22 18:53:55 2024

Intro

Spark Connect introduced a decoupled client-server architecture that allows remote connectivity to Spark clusters using the DataFrame API. The separation between client and server allows Spark to be leveraged from everywhere, and this would allow R users to interact with a cluster from the comfort of their preferred environment, laptop or otherwise.

The Solution

The API is very different than “legacy” Spark and using the Spark shell is no longer an option. We have decided to use Python as the new interface. In turn, Python uses gRPC to interact with Spark.

We are using reticulate to interact with the Python API. sparklyr extends the functionality, and user experience, by providing the dplyrback-end, DBI back-end, RStudio’s Connection pane integration.

flowchart LR
  subgraph lp[test]
    subgraph r[R]
      sr[sparklyr]
      rt[reticulate]
    end
    subgraph ps[Python]
      dc[Spark Connect]
      g1[gRPC]
    end
  end   
  subgraph db[Compute Cluster]
    sp[Spark]   
  end
  sr <--> rt
  rt <--> dc
  g1 <-- Internet<br>Connection --> sp
  dc <--> g1
  
  style r   fill:#fff,stroke:#666,color:#000
  style sr  fill:#fff,stroke:#666,color:#000
  style rt  fill:#fff,stroke:#666,color:#000
  style ps  fill:#fff,stroke:#666,color:#000
  style lp  fill:#fff,stroke:#666,color:#fff
  style db  fill:#fff,stroke:#666,color:#000
  style sp  fill:#fff,stroke:#666,color:#000
  style g1  fill:#fff,stroke:#666,color:#000
  style dc  fill:#fff,stroke:#666,color:#000

Figure 1: How sparklyr communicates with Spark Connect

Package Installation

To access Spark Connect, you will need the following two packages:

  • sparklyr - 1.8.4
  • pysparklyr - 0.1.3
install.packages("sparklyr")
install.packages("pysparklyr")

Initial setup

sparklyr will need specific Python libraries in order to connect, and interact with Spark Connect. We provide a convenience function that will automatically do the following:

  • Create, or re-create, a Python environment. Based on your OS, it will choose to create a Virtual Environment, or Conda.

  • Install the needed Python libraries

To install the latest versions of all the libraries, use:

pysparklyr::install_pyspark()

sparklyr will query PyPi.org to get the latest version of PySpark and installs that version. It is recommended that the version of the PySpark library matches the Spark version of your cluster. To do this, pass the Spark version in the version argument, for example:

pysparklyr::install_pyspark("3.5")

We have seen Spark sessions crash when the version of PySpark and the version of Spark do not match. Specifically when a newer version of PySpark is used against an older version of Spark. If you are having issues with your connection, consider running install_pyspark() to match the cluster’s specific Spark version.

Connecting

To start a session with an open source Spark cluster, via Spark Connect, you will need to set the master and method values. The master will be an IP and maybe a port that you will need to pass. The protocol to use to put together the proper connection URL is “sc://”. For method, use “spark_connect”. Here is an example:

library(sparklyr)

sc <- spark_connect(
  master = "sc://[Host IP(:Host Port - optional)]", 
  method = "spark_connect"
  version = "[Version that matches your cluster]"
  )

If version is not passed, then sparklyr will automatically choose the installed Python environment with the highest PySpark version. In a console message, sparklyr will let you know which environment it will use.

Run locally

It is possible to run Spark Connect in your machin. We provide helper functions that let you setup and start/stop the services locally.

If you wish to try this out, first install Spark 3.4 or above:

spark_install("3.5")

After installing, start Spark Connect using:

pysparklyr::spark_connect_service_start("3.5")
#> Starting Spark Connect locally ...
#>   starting org.apache.spark.sql.connect.service.SparkConnectServer, logging to
#>   /Users/edgar/spark/spark-3.5.1-bin-hadoop3/logs/spark-edgar-org.apache.spark.sql.connect.service.SparkConnectServer-1-edgarruiz-WL57.out

To connect to your local Spark cluster using SPark Connect, use localhost as the address for master:

sc <- spark_connect(
  master = "sc://localhost", 
  method = "spark_connect", 
  version = "3.5"
  )
#> ℹ Attempting to load 'r-sparklyr-pyspark-3.5'
#> ✔ Python environment: 'r-sparklyr-pyspark-3.5' [393ms]
#> 

Now, you are able to interact with your local Spark session:

library(dplyr)

tbl_mtcars <- copy_to(sc, mtcars)

tbl_mtcars %>% 
  group_by(am) %>% 
  summarise(mpg = mean(mpg, na.rm = TRUE))
#> # Source:   SQL [2 x 2]
#> # Database: spark_connection
#>      am   mpg
#>   <dbl> <dbl>
#> 1     0  17.1
#> 2     1  24.4

When done, you can disconnect from Spark Connect:

spark_disconnect(sc)

The regular version of local Spark would terminate the local cluster when the you pass spark_disconnect(). For Spark Connect, the local cluster needs to be stopped independently:

pysparklyr::spark_connect_service_stop()
#> 
#> ── Stopping Spark Connect
#>   - Shutdown command sent

Additional setup details

If you wish to use your own Python environment, then just make sure to load it before calling spark_connect(). If there is a Python environment already loaded when you connect to your Spark cluster, then sparklyr will use that environment instead. If you use your own Python environment you will need the following libraries installed:

  • pyspark
  • pandas
  • PyArrow
  • grpcio
  • google-api-python-client
  • grpcio_status
  • torch (Spark 3.5+)
  • torcheval (Spark 3.5+)

ML libraries (Optional):

  • torch
  • torcheval
  • scikit-learn