library(sparklyr)
<- spark_connect(
sc master = "yarn",
spark_home = "~/spark/spark-2.4.5-bin-hadoop2.7"
) # This is a contrived example to show reader tasks will be distributed across
# all Spark worker nodes
spark_read(
sc, rep("/dev/null", 10),
reader = function(path) system("hostname", intern = TRUE),
columns = c(hostname = "string")
%>% sdf_collect() )
Read file(s) into a Spark DataFrame using a custom reader
R/data_interface.R
spark_read
Description
Run a custom R function on Spark workers to ingest data from one or more files into a Spark DataFrame, assuming all files follow the same schema.
Usage
spark_read(sc, paths, reader, columns, packages = TRUE, ...)
Arguments
Arguments | Description |
---|---|
sc | A spark_connection . |
paths | A character vector of one or more file URIs (e.g., c(“hdfs://localhost:9000/file.txt”, “hdfs://localhost:9000/file2.txt”)) |
reader | A self-contained R function that takes a single file URI as argument and returns the data read from that file as a data frame. |
columns | a named list of column names and column types of the resulting data frame (e.g., list(column_1 = “integer”, column_2 = “character”)), or a list of column names only if column types should be inferred from the data (e.g., list(“column_1”, “column_2”), or NULL if column types should be inferred and resulting data frame can have arbitrary column names |
packages | A list of R packages to distribute to Spark workers |
… | Optional arguments; currently unused. |
Examples
See Also
Other Spark serialization routines: collect_from_rds()
, spark_insert_table()
, spark_load_table()
, spark_read_avro()
, spark_read_binary()
, spark_read_csv()
, spark_read_delta()
, spark_read_image()
, spark_read_jdbc()
, spark_read_json()
, spark_read_libsvm()
, spark_read_orc()
, spark_read_parquet()
, spark_read_source()
, spark_read_table()
, spark_read_text()
, spark_save_table()
, spark_write_avro()
, spark_write_csv()
, spark_write_delta()
, spark_write_jdbc()
, spark_write_json()
, spark_write_orc()
, spark_write_parquet()
, spark_write_source()
, spark_write_table()
, spark_write_text()