https://spark.posit.co/

Apache Spark 是用于大规模数据处理的统一分析引擎。

Spark 提供了一组超出 MapReduce 的更丰富的动词,以方便优化在多台计算机中运行的代码。Spark 还将数据加载到内存中,使操作速度比 Hadoop 的磁盘存储快得多。

16.1 安装

16.1.1 java 8

https://java.com/download

Show the code
system(command = "E:/java/bin/java.exe -version",intern = T) %>% cat(.,sep = "\n")


# 在 R 中临时设置 JAVA_HOME 环境变量
# Sys.setenv(JAVA_HOME = "E:/java")

16.1.2 sparklyr

Show the code
#install.packages("sparklyr")
packageVersion("sparklyr")

16.1.3 spark

Show the code
library(sparklyr)
# C:\\Users\\DELL\\AppData\\Local/spark
options(spark.install.dir = "E:/spark/")
spark_install_dir()
# spark_available_versions()

#spark_install(version = "3.3")
spark_installed_versions()


# spark_uninstall(version = "1.6.3", hadoop = "2.6")

16.2 连接

Show the code
library(sparklyr)
sc <- spark_connect(master = "local")

16.3 使用

Show the code
cars <- copy_to(sc, mtcars)

cars
Show the code
library(dplyr)
select(cars, hp, mpg) %>%
  sample_n(100) %>%
  collect() %>%
  plot()
Show the code
model <- ml_linear_regression(cars, mpg ~ hp)
model

model %>%
  ml_predict(copy_to(sc, data.frame(hp = 250 + 10 * 1:10))) %>%
  transmute(hp = hp, mpg = prediction) %>%
  full_join(select(cars, hp, mpg)) %>%
  collect() %>%
  plot()
Show the code
spark_write_csv(cars, "data/spark/cars.csv")

cars <- spark_read_csv(sc, "data/spark/cars.csv")

16.3.1 分布式

Show the code
cars %>% spark_apply(~round(.x))

16.3.2

Show the code
dir.create("data/spark/input")
dir.create("data/spark/output")
write.csv(mtcars, "data/spark/input/cars_1.csv", row.names = F)


stream <-stream_read_csv(sc, "data/spark/input/") %>%
    select(mpg, cyl, disp) %>% 
    stream_write_csv("data/spark/output/")

dir("data/spark/output", pattern = ".csv")


write.csv(mtcars, "data/spark/input/cars_2.csv", row.names = F)

# 几秒钟后
dir("data/spark/output", pattern = ".csv")


stream_stop(stream)

file.remove("data/spark/input")
file.remove("data/spark/output")

16.4 Web 界面

Show the code
spark_web(sc)

16.5 断开连接

Show the code
spark_disconnect(sc)
spark_disconnect_all()