什么是 Spark?
Apache Spark™ is a unified analytics engine for large-scale data processing.
为什么要有 Spark?
分布式
具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等分布式所具备的特性
高层次抽象
RDD(Resilient Distributed Datasets)提供 一个可以被并行计算的 不变、分区的数据集 抽象
快速计算能力
内存计算 基于内存的迭代计算框架,能够避免计算结果落地,磁盘 I/O
所带来的瓶颈
Machine Learning、Data Mining 等都需要递归地计算,因此非常适合实现这些算法
高效性能
DAG(Directed Acyclic Grap)利用有向无环图,构建优化任务中 父 RDD 和 子 RDD 的依赖关系
其中,依赖分为两种,一个为窄依赖(Narrow Dependencies),如 map
/ filter
/ union
等;另一种为宽依赖(Wide Dependencies),如 groupByKey
等
在划分依赖时,join
需要额外考虑 co-partitione
:
- 如果 RDD 和
cogroup
有相同的 数据结构,将会确定一个OneToOneDependency
反之,则说明
join
的时候,需要shuffle
(ShuffleDependency
)因为,宽依赖只有等到所有 父 partiton 计算完,并传递结束,才能继续进行下一步运算,所以应该尽量减少宽依赖,避免失败后 recompute 的成本
容错性
Lineage 血统,能在计算失败的时候,将会找寻 最小重新计算损耗的 结点,避免全部重新计算
Spark 核心组件
主要概念
物理层面
Master
Master 负责分配资源
Worker
Worker 负责监控自己节点的内存和 CPU 等状况
Driver
在集群启动时,Driver 向 Master 申请资源
运行时 Driver 能获得 Executor 的具体运行资源,Driver 与 Executor 之间直接进行通信,Driver 把 Job 划为 Task 传送给 Executor,Task 就是 Spark 程序的业务逻辑代码
Executor
Executor 接收任务,进行反序列化,得到数据的输入和输出,在分布式集群的相同数据分片上,数据的业务逻辑一样,只是数据不一样罢了。然后由 Executor 进程中的线程池负责执行,执行的结果汇报再返回汇报给 Driver 进程
Task
Task 就是 Spark 程序的业务逻辑代码
逻辑层面
Row
Row 表示关系运算中一行输出,本质上来说就是一个数组
Dataset
DataSet 和 RDD、DataFrame 一样,都是分布式数据结构的概念。区别在于 DataSet 可以面向特定类型,也就是其无需将输入数据类型限制为 Row(还可以使用 Seq、Array、Product、Int 等类型)
DataFrame
DataFrame 相当于是 Dataset[Row] 的别名
Encoder
Encoder 是 Dataset 中的关键组件,用来将外部类型转化为 Dataset 的内部类型 InternalRow
Spark SQL
同时支持 HiveQL
/ UDFs
/ SerDes
等多样性的数据源,并采用 JDBC
/ ODBC
等标准化连接驱动,保证其通用性(整个流程的入口是 org.apache.spark.sql.SparkSession#sql
方法)
Spark GraphX
支持在 graph
或 collection
中查看数据,并提供丰富的 图形处理 API
Spark Streaming
将数据流 按 时间间隔 Duration 划分为一组连续的 RDD,再将这些 RDD 抽象为 DStream
随后,通过对 DStream 这个 high-level 抽象的操作,实现对底层 标记了时间间隙
的 RDD 组的操控
Spark MLbase
提供了对 Machine Learning 的易用、高效的实现
总体的结构,基于 Spark,自底向上分别是,MLlib
/ MLI
/ ML Optimizer
- MLlib 这一层,设计了 本地 / 分布式 的矩阵,对稀疏数据的支持,多模型的训练,提供了 计算模型 和 逻辑的 API
- MLI 主要任务则是 提供 表针采集器 和 逻辑规则,并进一步对 高层次 ML 编程抽象成接口
- ML Optimizer 则是通过自动构建 ML 的 pipe 管道路径实现 ML 优化器的作用。同时,该优化器还解决了一个在
MLI
和MLlib
中 表征采集器 和 ML 逻辑规则的搜索问题
Spark 实时机器学习
什么是机器学习?
Wikipedia 给出的定义是,一个计算机科学的子领域,由 模式识别 和 人工智能 中的计算机学习理论 演变而来
探索 结构化的、可学习的规则引擎,如何用来对数据 进行训练 和 预测
什么又是 Real-time 机器学习呢?
一般性的 机器学习 的对象 是一堆 offline 的训练集,通过对这些数据的学习,来确立模型
如果数据是快速变化的,这时就需要将 新数据 分配好权重,加入到目标训练集中;之后,将预测出来的结果,再次反馈到 数据模型中去
Real-time 和 No Real-time 的本质区别在哪儿?
因为 实时模型 是动态更新的,实现算法上,比 非实时的 ML 需要考虑,如何避免依赖 将 新数据 和 旧数据 整合在一块再计算所带来的性能问题
更多时候,长期积累的数据,是很难再做到全量计算(比如,多项式贝叶斯 Multinomial naive bayes,用于处理 dataset 过大,而内存不足的情况)
利用 Spark 实现 Real-time ML
源数据流
- 利用
java.util.Random
产生满足高斯分布的随机数据,再通过 breeze 放入到vector
中,作为特征值 在
generateNoisyData
中,将这个vector
做inner product
, 并加入一点噪声数据,作为 label 标签1
2
3
4
5
6
7
8
9
10
11
12
13
14
15val MaxEvents = 100
val NumFeatures = 100
val random = new Random()
def generateRandomArray(n: Int) = Array.tabulate(n)(_ => random.nextGaussian())
val w = new DenseVector(generateRandomArray(NumFeatures))
val intercept = random.nextGaussian() * 10
def generateNoisyData(n: Int) = {
(1 to n).map { i =>
val x = new DenseVector(generateRandomArray(NumFeatures))
// inner product
val y: Double = w.dot(x)
val noisy = y + intercept
(noisy, x)
}
}通过
socket
将数据 发送到指定端口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27if (args.length != 2) {
System.err.println("Usage: <port> <millisecond>")
System.exit(1)
}
val listener = new ServerSocket(args(0).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(1).toLong)
val num = random.nextInt(MaxEvents)
val data = generateNoisyData(num)
data.foreach { case (y, x) =>
val xStr = x.data.mkString(",")
val content = s"$y\t$xStr"
println(content)
out.write(content)
out.write("\n")
}
}
socket.close()
}
}.start()
}
实时 Machine Learning 模型
指定
spark-master
/interval
等参数,创建 StreamingContext(此处可以利用 local[n] 快速开发)1
2
3
4
5
6if (args.length < 4) {
System.err.println("Usage: WindowCounter <master> <hostname> <port> <interval> \n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
val ssc = new StreamingContext(args(0), "ML Analysis", Seconds(args(3).toInt))获取到发送过来的 源数据
1
val stream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
利用
DenseVector.zeros[Double]
创建全零的初始矩阵使用
StreamingLinearRegressionWithSGD
创建 流式随机递归下降的线性回归 模型目前 MLlib 只支持 Streaming(
KMeans
/LinearRegression
/LinearRegressionWithSGD
)in Spark 1.4.1
Streaming MLlib 和普通的 MLlib 没有本质上的区别,只是输入的训练集是 DStream,需要使用foreachRDD
/map
进行 训练 / 预测1
2
3
4
5
6
7
8
9
10
11
12val NumFeatures = 100
val zeroVector = DenseVector.zeros[Double](NumFeatures)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.dense(zeroVector.data))
.setNumIterations(1)
.setStepSize(0.01)
val labeledStream = stream.map { event =>
val split = event.split("\t")
val y = split(0).toDouble
val features = split(1).split(",").map(_.toDouble)
LabeledPoint(label = y, features = Vectors.dense(features))
}利用 模型 进行
train
/predict
1
2
3
4
5
6
7
8model.trainOn(labeledStream)
val predictAndTrue = labeledStream.transform { rdd =>
val latest = model.latestModel()
rdd.map { point =>
val predict = latest.predict(point.features)
(predict - point.label)
}
}通过 MSE(Mean Squared Error)均方差 和 RMSE(Root Mean Squared Error)均方根误差 对模型的性能进行评估(这里也可以使用 RegressionMetrics 来实现)
1
2
3
4
5
6
7
8
9
10
11
12predictAndTrue.foreachRDD { (rdd, time) =>
val mse = rdd.map { case (err) => err * err }.mean()
val rmse = math.sqrt(mse)
println( s"""
|-------------------------------------------
|Time: $time
|-------------------------------------------
""".stripMargin)
println(s"MSE current batch: Model : $mse")
println(s"RMSE current batch: Model : $rmse")
println("...\n")
}启动 Spark 上下文
1
2ssc.start()
ssc.awaitTermination()
一劳永逸了?Not at all!
一个优秀的 Machine Learning 模型,是要结合具体业务,从对数据流入的清洗,特征值维度的考量,模型类型的选择,到最终的性能的评估、监控、持续优化,都需要仔细地考究,最终才能打造出高效、稳定、精准的数据模型
数据
对目标数据集进行处理之前,首先就是对数据的类型进行归类,是数值型、类别型、文本型,还是其他一些多媒体、地理信息等
针对不同的数据,分别采取不同的处理手段,对于类别型常用 1-of-k encoding
对每个类别进行编码
对于文本型,则会采用 分词、移除 stop words
(的、这、地;the
/and
/but
…)、向量化、标准化(避免度量单位的影响)
1 | import numpy as np |
还有还多常用的数据处理方式,如 平均值、中位数、总和、方差、差值、最大值、最小值
针对时间的处理,还可以加上 “时间戳” 字段
1 | def assign_tod(hr): |
特征维度
常见的一个影响模型的因素,便是没有对特征 进行标准化
1 | (element-wise - the preceding mean vector from the feature vector) / the vector of feature standard deviations |
利用 StandarScaler 完成标准化工作
1 | import org.apache.spark.mllib.feature.StandardScaler |
调整模型
首先需要在众多的模型 和 对应的算法 中找到最为适用的选择
模型的类别主要有,推荐引擎、分类模型、回归模型、聚类模型 等
相应的实现算法,又有(线性 / 逻辑 / 多元)回归、(随机森林)决策树、(朴素 / 高斯 / 多项式 / 伯努利 / 信念网络)贝叶斯 等
在选择的时候,更多会考虑 特征值是否多维(可以尝试降维),目标类别是 multiclass,binary,还是 probability(连续值)
根据 数据集的 稀疏程度 对正则化(Regularizer)进行调整
1
2
3
4
5
6
7
8zero: 没有任何正规化操作
L1: SGD(Stochastic gradient descent,随机梯度下降)
L2: LBFGS(Limited-memory BFGS,受限的 BFGS)
L2 相比 L1 更为平滑(同样,L1 可以让 稀疏的数据集 得到更 直观的模型)
还有其它 求最优解 的方法,如 求全局最优解的 BGD(Batch gradient descent,批量梯度下降)
但是,由于每次迭代都需要依据训练集中所有的数据,所以速度很慢;
以及 CG(Conjugate gradient,共轭梯度法),但还没有被 Spark MLlib 所支持,可以在 Breeze 中找到它可以通过
setUpdater
将模型的 规则化算法 设置为L1
(默认为L2
)1
2
3
4
5
6
7
8import org.apache.spark.mllib.optimization.L1Updater
val svmAlg = new SVMWithSGD()
svmAlg.optimizer
.setNumIterations(200)
.setRegParam(0.1)
.setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)当然,还有举不胜举的优化方式,详见文章最后的 Spark 思维导图 :-)
性能评估指标
针对不同的业务,对性能评测的手段,也需要相应取舍,毕竟有些 “宁可错杀一千” 的变态 防护系统,就需要对
recall
有较高的要求,而precision
则相对宽松些- 这时便可采用 ROC(Receiver Operating Characteristic)curve 评测引擎:
1
2
3
4
5
6
7
8// binary classification
val metrics = Seq(lrModel, svmModel).map { model =>
val scoreAndLabels = data.map { point =>
(model.predict(point.features), point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
- 这时便可采用 ROC(Receiver Operating Characteristic)curve 评测引擎:
如果是 贝叶斯 / 决策树 的数据模型,则可以用
0.5
对其进行划分,转换为binary
分类问题1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// naive bayes
val nbMetrics = Seq(nbModel).map { model =>
val scoreAndLabels = nbData.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
// decision tree
val dtMetrics = Seq(dtModel).map { model =>
val scoreAndLabels = data.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val allMetrics = metrics ++ nbMetrics ++ dtMetrics
allMetrics.foreach { case (m, pr, roc) =>
println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
}然而,如果是一些推荐系统,更多的希望能够了解到 大体的预测精度,则可以采用 MAP(Mean Average Precision)平均精度均值 进行评估
1
MAP 同时还解决了 precision,recall,F-measure 中存在的单点值局限性问题
踩过的坑
spark 任务使用 UGI 之后,实际 task 在生成中间文件的时候,没有感知到外层设置的 UGI 信息
描述
1 | $ hdfs dfs -ls /user | grep yuzhouwan |
分析
1 | $ cd $HADOOP_HOME/logs |
解决
首先,需要保证用户 user_A 不需要写入的数据,只能由它自己才能查看。一方面,如果权限控制不在 spark 任务中,而是交给上层应用来控制,则完全可以使用 yuzhouwan 用户权限写入到 HDFS 中;另一方面,也可以先使用 yuzhouwan 用户权限写入,然后再通过 chown 赋权到 user_A 用户下
saveAsTextFile 不传入压缩类型,也对输出文件进行了压缩
描述
1 | $ spark-shell --master local |
解决
1 | # 发现依赖的 Hadoop 配置文件中,已经开启了压缩 |
补充
1 | // saveAsTextFile 方法的第二个参数可以指定压缩类型,可以覆盖全局配置中,已经存在的压缩配置 |
无法判断 spark session 中临时函数是否存在
描述
1 | # 获取 session state 中判断临时函数 FunctionIdentifier 是否存在,但是仍然不行 |
解决
1 | if (!sparkSession.catalog.functionExists("yuzhouwan")) { |
补充
目前,Spark SQL 中 IF NOT EXISTS
的语法只支持创建 DATABASE、TABLE(包括 临时表 和 外部表)、VIEW(不包括 临时视图)、SCHEMA 的时候使用,尚不支持临时函数
1 | $ spark-shell --master local |
控制 spark 结果文件数量,并返回唯一的文件路径
解决
1 | import java.net.URI |
KafkaUtils.createDirectStream 报错 OffsetOutOfRangeException
描述
1 | <scala.short.version>2.11</scala.short.version> |
1 | Caused by: kafka.common.OffsetOutOfRangeException |
分析
该问题是因为 Kafka 的 topic 中,数据因为长时间未消费,超出了 log.retention.[hours|minutes|ms]
时间(默认 168 小时,也就是一周)。此时,broker 会将这部分数据清除掉,并更新 offset 信息(offset 变小了)。但是,程序中仍然用的是之前的 offset 信息,所以就会报错超出了现有 offset 的范围
解决
1 | // 使用不指定 offset 的 createDirectStream 重载方法,并重启 spark streaming 程序 |
整体知识树
至此,相信你已经对 Spark 这个生态圈有了大致了解了,下面就是一步一步地 在 实践 和 深入学习中,体验大数据的乐趣啦 O(∩_∩)O~~
资料
Blog
- Spark 源码分析
- Spark SQL 源码分析系列文章
- Deep Dive into Spark SQL’s Catalyst Optimizer
- SparkSQL – 从 0 到 1 认识 Catalyst
Book
- PySpark Recipes
- Advanced Analytics with Spark
- High Performance Spark
- Fast Data Processing with Spark 2, 3rd Edition
- Apache Spark 2 for Beginners
- Spark for Data Science
- Spark GraphX in Action
- Pro Spark Streaming
- Machine Learning with Spark, 2nd Edition
- Apache Spark 2.x Cookbook
- Learning Spark
- Big Data Analytics with Spark
- Spark for Python Developers
- Mastering Apache Spark 2.x, 2nd Edition
- Apache Spark Graph Processing