Real-time ML with Spark

什么是 Spark?

 Apache Spark™ is a unified analytics engine for large-scale data processing. – Official website

为什么要有 Spark?

分布式

 具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等分布式所具备的特性

高层次抽象

 RDDResilient Distributed Datasets)提供 一个可以被并行计算的 不变、分区的数据集 抽象

快速计算能力

 内存计算 基于内存的迭代计算框架,能够避免计算结果落地,磁盘 I/O 所带来的瓶颈
 Machine Learning、Data Mining 等都需要递归地计算,因此非常适合实现这些算法

高效性能

 DAGDirected Acyclic Grap)利用有向无环图,构建优化任务中 父 RDD 和 子 RDD 的依赖关系

Spark DAG

(图片来源:spark.apache.org

 其中,依赖分为两种,一个为窄依赖(Narrow Dependencies),如 map / filter / union 等;另一种为宽依赖(Wide Dependencies),如 groupByKey

 在划分依赖时,join 需要额外考虑 co-partitione

  • 如果 RDD 和 cogroup 有相同的 数据结构,将会确定一个 OneToOneDependency
  • 反之,则说明 join 的时候,需要 shuffleShuffleDependency

    因为,宽依赖只有等到所有 父 partiton 计算完,并传递结束,才能继续进行下一步运算,所以应该尽量减少宽依赖,避免失败后 recompute 的成本

Narrow Dependencies and Wide Dependencies

(图片来源:spark.apache.org

容错性

 Lineage 血统,能在计算失败的时候,将会找寻 最小重新计算损耗的 结点,避免全部重新计算

Spark 核心组件

主要概念

物理层面

Master

 Master 负责分配资源

Worker

 Worker 负责监控自己节点的内存和 CPU 等状况

Driver

 在集群启动时,Driver 向 Master 申请资源

 运行时 Driver 能获得 Executor 的具体运行资源,Driver 与 Executor 之间直接进行通信,Driver 把 Job 划为 Task 传送给 Executor,Task 就是 Spark 程序的业务逻辑代码

Executor

 Executor 接收任务,进行反序列化,得到数据的输入和输出,在分布式集群的相同数据分片上,数据的业务逻辑一样,只是数据不一样罢了。然后由 Executor 进程中的线程池负责执行,执行的结果汇报再返回汇报给 Driver 进程

Task

 Task 就是 Spark 程序的业务逻辑代码

逻辑层面

Row

 Row 表示关系运算中一行输出,本质上来说就是一个数组

Dataset

 DataSet 和 RDD、DataFrame 一样,都是分布式数据结构的概念。区别在于 DataSet 可以面向特定类型,也就是其无需将输入数据类型限制为 Row(还可以使用 Seq、Array、Product、Int 等类型)

DataFrame

 DataFrame 相当于是 Dataset[Row] 的别名

Encoder

 Encoder 是 Dataset 中的关键组件,用来将外部类型转化为 Dataset 的内部类型 InternalRow

Spark SQL

介绍

 同时支持 HiveQL / UDFs / SerDes 等多样性的数据源,并采用 JDBC / ODBC 等标准化连接驱动,保证其通用性

核心流程

(图片来源:databricks.com

 整个流程的入口是 org.apache.spark.sql.SparkSession#sql 方法

Spark GraphX

 支持在 graphcollection 中查看数据,并提供丰富的 图形处理 API

Spark Streaming

 将数据流 按 时间间隔 Duration 划分为一组连续的 RDD,再将这些 RDD 抽象为 DStream
 随后,通过对 DStream 这个 high-level 抽象的操作,实现对底层 标记了时间间隙的 RDD 组的操控

Spark MLbase

 提供了对 Machine Learning 的易用、高效的实现
 总体的结构,基于 Spark,自底向上分别是,MLlib / MLI / ML Optimizer

  • MLlib 这一层,设计了 本地 / 分布式 的矩阵,对稀疏数据的支持,多模型的训练,提供了 计算模型 和 逻辑的 API
  • MLI 主要任务则是 提供 表针采集器 和 逻辑规则,并进一步对 高层次 ML 编程抽象成接口
  • ML Optimizer 则是通过自动构建 ML 的 pipe 管道路径实现 ML 优化器的作用。同时,该优化器还解决了一个在 MLIMLlib 中 表征采集器 和 ML 逻辑规则的搜索问题

Spark 实时机器学习

什么是机器学习

 Wikipedia 给出的定义是,一个计算机科学的子领域,由 模式识别 和 人工智能 中的计算机学习理论 演变而来
 探索 结构化的、可学习的规则引擎,如何用来对数据 进行训练 和 预测

什么又是 Real-time 机器学习呢?

 一般性的 机器学习 的对象 是一堆 offline 的训练集,通过对这些数据的学习,来确立模型
 如果数据是快速变化的,这时就需要将 新数据 分配好权重,加入到目标训练集中;之后,将预测出来的结果,再次反馈到 数据模型中去

Real-time 和 No Real-time 的本质区别在哪儿?

 因为 实时模型 是动态更新的,实现算法上,比 非实时的 ML 需要考虑,如何避免依赖 将 新数据 和 旧数据 整合在一块再计算所带来的性能问题
 更多时候,长期积累的数据,是很难再做到全量计算(比如,多项式贝叶斯 Multinomial naive bayes,用于处理 dataset 过大,而内存不足的情况)

利用 Spark 实现 Real-time ML

源数据流

  • 利用 java.util.Random 产生满足高斯分布的随机数据,再通过 breeze 放入到 vector 中,作为特征值
  • generateNoisyData 中,将这个 vectorinner product, 并加入一点噪声数据,作为 label 标签

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val MaxEvents = 100
    val NumFeatures = 100
    val random = new Random()
    def generateRandomArray(n: Int) = Array.tabulate(n)(_ => random.nextGaussian())
    val w = new DenseVector(generateRandomArray(NumFeatures))
    val intercept = random.nextGaussian() * 10
    def generateNoisyData(n: Int) = {
    (1 to n).map { i =>
    val x = new DenseVector(generateRandomArray(NumFeatures))
    // inner product
    val y: Double = w.dot(x)
    val noisy = y + intercept
    (noisy, x)
    }
    }
  • 通过 socket 将数据 发送到指定端口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    if (args.length != 2) {
    System.err.println("Usage: <port> <millisecond>")
    System.exit(1)
    }
    val listener = new ServerSocket(args(0).toInt)
    while (true) {
    val socket = listener.accept()
    new Thread() {
    override def run = {
    println("Got client connected from: " + socket.getInetAddress)
    val out = new PrintWriter(socket.getOutputStream(), true)
    while (true) {
    Thread.sleep(args(1).toLong)
    val num = random.nextInt(MaxEvents)
    val data = generateNoisyData(num)
    data.foreach { case (y, x) =>
    val xStr = x.data.mkString(",")
    val content = s"$y\t$xStr"
    println(content)
    out.write(content)
    out.write("\n")
    }
    }
    socket.close()
    }
    }.start()
    }

实时 Machine Learning 模型

  • 指定 spark-master / interval 等参数,创建 StreamingContext(此处可以利用 local[n] 快速开发)

    1
    2
    3
    4
    5
    6
    if (args.length < 4) {
    System.err.println("Usage: WindowCounter <master> <hostname> <port> <interval> \n" +
    "In local mode, <master> should be 'local[n]' with n > 1")
    System.exit(1)
    }
    val ssc = new StreamingContext(args(0), "ML Analysis", Seconds(args(3).toInt))
  • 获取到发送过来的 源数据

    1
    val stream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
  • 利用 DenseVector.zeros[Double] 创建全零的初始矩阵

  • 使用 StreamingLinearRegressionWithSGD 创建 流式随机递归下降的线性回归 模型

    目前 MLlib 只支持 Streaming(KMeans / LinearRegression / LinearRegressionWithSGD)in Spark 1.4.1
    Streaming MLlib 和普通的 MLlib 没有本质上的区别,只是输入的训练集是 DStream,需要使用 foreachRDD / map 进行 训练 / 预测

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val NumFeatures = 100
    val zeroVector = DenseVector.zeros[Double](NumFeatures)
    val model = new StreamingLinearRegressionWithSGD()
    .setInitialWeights(Vectors.dense(zeroVector.data))
    .setNumIterations(1)
    .setStepSize(0.01)
    val labeledStream = stream.map { event =>
    val split = event.split("\t")
    val y = split(0).toDouble
    val features = split(1).split(",").map(_.toDouble)
    LabeledPoint(label = y, features = Vectors.dense(features))
    }
  • 利用 模型 进行 train / predict

    1
    2
    3
    4
    5
    6
    7
    8
    model.trainOn(labeledStream)
    val predictAndTrue = labeledStream.transform { rdd =>
    val latest = model.latestModel()
    rdd.map { point =>
    val predict = latest.predict(point.features)
    (predict - point.label)
    }
    }
  • 通过 MSEMean Squared Error)均方差 和 RMSERoot Mean Squared Error)均方根误差 对模型的性能进行评估(这里也可以使用 RegressionMetrics 来实现)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    predictAndTrue.foreachRDD { (rdd, time) =>
    val mse = rdd.map { case (err) => err * err }.mean()
    val rmse = math.sqrt(mse)
    println( s"""
    |-------------------------------------------
    |Time: $time
    |-------------------------------------------
    """.stripMargin)
    println(s"MSE current batch: Model : $mse")
    println(s"RMSE current batch: Model : $rmse")
    println("...\n")
    }
  • 启动 Spark 上下文

    1
    2
    ssc.start()
    ssc.awaitTermination()

一劳永逸了?Not at all!

 一个优秀的 Machine Learning 模型,是要结合具体业务,从对数据流入的清洗,特征值维度的考量,模型类型的选择,到最终的性能的评估、监控、持续优化,都需要仔细地考究,最终才能打造出高效、稳定、精准的数据模型

数据

 对目标数据集进行处理之前,首先就是对数据的类型进行归类,是数值型、类别型、文本型,还是其他一些多媒体、地理信息等
 针对不同的数据,分别采取不同的处理手段,对于类别型常用 1-of-k encoding 对每个类别进行编码
 对于文本型,则会采用 分词、移除 stop words(的、这、地;the/and/but …)、向量化、标准化(避免度量单位的影响)

1
2
3
4
5
6
7
8
9
import numpy as np
np.random.seed(42)
x = np.random.randn(10)
norm_x_2 = np.linalg.norm(x)
normalized_x = x / norm_x_2
print "x:\n%s" % x
print "2-Norm of x: %2.4f" % norm_x_2
print "Normalized x:\n%s" % normalized_x
print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)

 还有还多常用的数据处理方式,如 平均值、中位数、总和、方差、差值、最大值、最小值
 针对时间的处理,还可以加上 “时间戳” 字段

1
2
3
4
5
6
7
8
9
10
11
12
def assign_tod(hr):
times_of_day = {
'morning' : range(7, 12),
'lunch' : range(12, 14),
'afternoon' : range(14, 18),
'evening' : range(18, 23),
'night' : range(23, 7)
}
for k, v in times_of_day.iteritems():
if hr in v:
return k
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))

特征维度

 常见的一个影响模型的因素,便是没有对特征 进行标准化

1
(element-wise - the preceding mean vector from the feature vector) / the vector of feature standard deviations

 利用 StandarScaler 完成标准化工作

1
2
3
4
import org.apache.spark.mllib.feature.StandardScaler
val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)
val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))

调整模型

 首先需要在众多的模型 和 对应的算法 中找到最为适用的选择
 模型的类别主要有,推荐引擎分类模型回归模型聚类模型
 相应的实现算法,又有(线性 / 逻辑 / 多元)回归、(随机森林)决策树、(朴素 / 高斯 / 多项式 / 伯努利 / 信念网络)贝叶斯
 在选择的时候,更多会考虑 特征值是否多维(可以尝试降维),目标类别是 multiclassbinary,还是 probability(连续值)

  • 根据 数据集的 稀疏程度 对正则化(Regularizer)进行调整

    1
    2
    3
    4
    5
    6
    7
    8
    zero: 没有任何正规化操作
    L1: SGDStochastic gradient descent,随机梯度下降)
    L2: LBFGSLimited-memory BFGS,受限的 BFGS
    L2 相比 L1 更为平滑(同样,L1 可以让 稀疏的数据集 得到更 直观的模型)
    还有其它 求最优解 的方法,如 求全局最优解的 BGDBatch gradient descent,批量梯度下降)
    但是,由于每次迭代都需要依据训练集中所有的数据,所以速度很慢;
    以及 CGConjugate gradient,共轭梯度法),但还没有被 Spark MLlib 所支持,可以在 Breeze 中找到它
  • 可以通过 setUpdater 将模型的 规则化算法 设置为 L1(默认为 L2

    1
    2
    3
    4
    5
    6
    7
    8
    import org.apache.spark.mllib.optimization.L1Updater
    val svmAlg = new SVMWithSGD()
    svmAlg.optimizer
    .setNumIterations(200)
    .setRegParam(0.1)
    .setUpdater(new L1Updater)
    val modelL1 = svmAlg.run(training)
  • 当然,还有举不胜举的优化方式,详见文章最后的 Spark 思维导图 :-)

性能评估指标

  • 针对不同的业务,对性能评测的手段,也需要相应取舍,毕竟有些 “宁可错杀一千” 的变态 防护系统,就需要对 recall 有较高的要求,而 precision 则相对宽松些

    • 这时便可采用 ROCReceiver Operating Characteristic)curve 评测引擎:
      1
      2
      3
      4
      5
      6
      7
      8
      // binary classification
      val metrics = Seq(lrModel, svmModel).map { model =>
      val scoreAndLabels = data.map { point =>
      (model.predict(point.features), point.label)
      }
      val metrics = new BinaryClassificationMetrics(scoreAndLabels)
      (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
      }
  • 如果是 贝叶斯 / 决策树 的数据模型,则可以用 0.5 对其进行划分,转换为 binary 分类问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // naive bayes
    val nbMetrics = Seq(nbModel).map { model =>
    val scoreAndLabels = nbData.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    // decision tree
    val dtMetrics = Seq(dtModel).map { model =>
    val scoreAndLabels = data.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    val allMetrics = metrics ++ nbMetrics ++ dtMetrics
    allMetrics.foreach { case (m, pr, roc) =>
    println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
    }
  • 然而,如果是一些推荐系统,更多的希望能够了解到 大体的预测精度,则可以采用 MAPMean Average Precision)平均精度均值 进行评估

    1
     MAP 同时还解决了 precision,recall,F-measure 中存在的单点值局限性问题

踩过的坑

spark 任务使用 UGI 之后,实际 task 在生成中间文件的时候,没有感知到外层设置的 UGI 信息

描述

1
2
3
4
5
6
7
8
9
10
11
$ hdfs dfs -ls /user | grep yuzhouwan
drwxrwxrwx - yuzhouwan yuzhouwan 0 2018-04-24 14:27 /user/yuzhouwan
$ hdfs dfs -ls /user/yuzhouwan
drwxrwxrwx - user_A user_A 0 2018-04-24 14:27 /user/yuzhouwan/user_A
# 代码中,需要将数据,写入到 /user/yuzhouwan/user_A 中,但是报错权限不足
UGICache.doAs("user_A", () => {
sparkContext.parallelize(Seq(1 to 10), 1).saveAsTextFile("/user/yuzhouwan/user_A/seq")
})
org.apache.hadoop.security.AccessControlException: Permission denied: user=user_A, access=WRITE, inode="/user/yuzhouwan/user_A/seq":yuzhouwan:user_A:drwxr-xr-x

分析

1
2
3
4
5
6
7
8
9
10
$ cd $HADOOP_HOME/logs
$ grep "/user/yuzhouwan/user_A/" hdfs-audit.log | grep -v "getfileinfo"
2018-04-23 19:04:34,764 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=user_A (auth:SIMPLE) ip=/10.10.10.1 cmd=mkdirs src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0 dst=null perm=user_A:yuzhouwan:rwxr-xr-x proto=rpc
2018-04-23 19:04:35,353 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=yuzhouwan (auth:SIMPLE) ip=/10.10.10.1 cmd=create src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0/_temporary/attempt_20180423190434_0723_m_000000_737/part-00000.deflate dst=null perm=yuzhouwan:yuzhouwan:rw-r--r-- proto=rpc
2018-04-23 19:04:36,017 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=yuzhouwan (auth:SIMPLE) ip=/10.10.10.1 cmd=rename src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0/_temporary/attempt_20180423190434_0723_m_000000_737 dst=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0/task_20180423190434_0723_m_000000 perm=yuzhouwan:yuzhouwan:rwxr-xr-x proto=rpc
2018-04-23 19:04:36,040 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=user_A (auth:SIMPLE) ip=/10.10.10.1 cmd=listStatus src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0 dst=null perm=null proto=rpc
2018-04-23 19:04:36,060 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=true ugi=user_A (auth:SIMPLE) ip=/10.10.10.1 cmd=listStatus src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0/task_20180423190434_0723_m_000000 dst=null perm=null proto=rpc
2018-04-23 19:04:36,083 INFO org.apache.hadoop.hdfs.server.namenode.FSNamesystem.audit: allowed=false ugi=user_A (auth:SIMPLE) ip=/10.10.10.1 cmd=rename src=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/_temporary/0/task_20180423190434_0723_m_000000/part-00000.deflate dst=/user/yuzhouwan/user_A/2018-04-23/940_2018-04-23_user_A_68673976_0/part-00000.deflate perm=null proto=rpc
# 通过查看 HDFS 的 audit 审计日志,可以看到创建 deflate 中间文件的时候,使用的 UGI 是 yuzhouwan,而不是 user_A,导致到最后一步,使用 user_A 用户 rename 的时候,报错权限不足

解决

 首先,需要保证用户 user_A 不需要写入的数据,只能由它自己才能查看。一方面,如果权限控制不在 spark 任务中,而是交给上层应用来控制,则完全可以使用 yuzhouwan 用户权限写入到 HDFS 中;另一方面,也可以先使用 yuzhouwan 用户权限写入,然后再通过 chown 赋权到 user_A 用户下

saveAsTextFile 不传入压缩类型,也对输出文件进行了压缩

描述

1
2
3
4
5
6
7
8
9
10
11
$ spark-shell --master local
scala> sc.parallelize(Seq(1 to 10), 1).collect.foreach(print(_))
Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> sc.parallelize(Seq(1 to 10), 1).saveAsTextFile("/user/yuzhouwan")
18/04/24 11:22:01 WARN DefaultCodec: DefaultCodec.createOutputStream() may leak memory. Create a compressor first.
$ hdfs dfs -ls /user/yuzhouwan
Found 2 items
-rw-r--r-- 3 bigdata yuzhouwan 0 2018-04-24 11:22 /user/yuzhouwan/_SUCCESS
-rw-r--r-- 3 bigdata yuzhouwan 45 2018-04-24 11:22 /user/yuzhouwan/part-00000.deflate

解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 发现依赖的 Hadoop 配置文件中,已经开启了压缩
$ vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
<property>
<name>mapred.compress.map.output</name>
<value>true</value>
</property>
# 需要设置 spark.hadoop.mapreduce.output.fileoutputformat.compress=false
# 或者 spark.hadoop.mapred.output.compress=false
# 前者,是负责往 hadoop 和 hive 里面写的时候,会设置压缩;后者,是负责往 json / txt / csv 里面写的时候,设置压缩
# 一个是面向 目标系统类型,另一个是面向 目标文件类型
$ spark-shell --master local --conf spark.hadoop.mapreduce.output.fileoutputformat.compress=false
scala> sc.parallelize(Seq(1 to 10), 1).saveAsTextFile("/user/yuzhouwan0")
$ hdfs dfs -ls /user/yuzhouwan0
Found 2 items
-rw-r--r-- 3 bigdata yuzhouwan 0 2018-04-24 11:41 /user/yuzhouwan0/_SUCCESS
-rw-r--r-- 3 bigdata yuzhouwan 37 2018-04-24 11:41 /user/yuzhouwan0/part-00000
$ hdfs dfs -cat /user/yuzhouwan0/part-00000
Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

补充

1
2
3
// saveAsTextFile 方法的第二个参数可以指定压缩类型,可以覆盖全局配置中,已经存在的压缩配置
// 需要注意的是,压缩算法只支持 CompressionCodec 接口的实现子类,包括 DefaultCodec、BZip2Codec、GzipCodec、HadoopSnappyCodec、Lz4Codec、SnappyCodec etc.
sc.parallelize(Seq(1 to 10), 1).coalesce(1, shuffle = true).saveAsTextFile("/user/yuzhouwan1", classOf[org.apache.hadoop.io.compress.GzipCodec])

无法判断 spark session 中临时函数是否存在

描述

1
2
3
4
# 获取 session state 中判断临时函数 FunctionIdentifier 是否存在,但是仍然不行
if (!sparkSession.sessionState.catalog.functionExists(yuzhouwanIdentifier)) {
sparkSession.sql("CREATE TEMPORARY FUNCTION yuzhouwan as 'org.apache.spark.sql.hive.udf.YuZhouWan'")
}

解决

1
2
3
if (!sparkSession.catalog.functionExists("yuzhouwan")) {
sparkSession.sql("CREATE TEMPORARY FUNCTION yuzhouwan as 'org.apache.spark.sql.hive.udf.YuZhouWan'")
}

补充

 目前,Spark SQL 中 IF NOT EXISTS 的语法只支持创建 DATABASE、TABLE(包括 临时表 和 外部表)、VIEW(不包括 临时视图)、SCHEMA 的时候使用,尚不支持临时函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ spark-shell --master local
scala> org.apache.spark.sql.SparkSession.builder().enableHiveSupport.getOrCreate.sql("CREATE TEMPORARY FUNCTION IF NOT EXISTS yuzhouwan as 'org.apache.spark.sql.hive.udf.YuZhouWan'")
org.apache.spark.sql.catalyst.parser.ParseException: mismatched input 'NOT' expecting {'.', 'AS'}(line 1, pos 29)
== SQL ==
CREATE TEMPORARY FUNCTION IF NOT EXISTS yuzhouwan as 'org.apache.spark.sql.hive.udf.YuZhouWan'
-----------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:592)
... 48 elided

控制 spark 结果文件数量,并返回唯一的文件路径

解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
// 将数据写入到某一个文件下,会产生两个文件 _SUCCESS 和 part-00000-126ddb4d-22a8-4c3d-88cb-52a59da4c66a.json
val exportDir = "/user/yuzhouwan/export"
sparkSession.sql(subTaskSql).limit(1000).coalesce(1).write.json(exportDir)
// 排除掉 _SUCCESS 文件,拿到 json 文件的全路径
def getSingleExportFile(exportDir: String): String = {
val sourceFS = FileSystem.get(new URI(exportDir), new Configuration())
if (sourceFS == null || !sourceFS.exists(new Path(exportDir))) return ""
val filesStatus = sourceFS.listStatus(new Path(exportDir))
if (filesStatus == null || filesStatus.length <= 0) return ""
for (fileStatus <- filesStatus) {
if (fileStatus.isFile) {
val path = fileStatus.getPath.getName
if (!path.contains("SUCCESS")) {
return if (exportDir.endsWith("/")) exportDir.concat(path) else exportDir.concat("/").concat(path)
}
}
}
""
}
val singleFilePath = getSingleExportFile(exportDir)

参考

KafkaUtils.createDirectStream 报错 OffsetOutOfRangeException

描述

1
2
3
4
5
6
7
8
9
10
11
12
13
<scala.short.version>2.11</scala.short.version>
<scala.version>${scala.short.version}.8</scala.version>
<spark.version>2.1.0.5</spark.version>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.short.version}</artifactId>
<version>${spark.version}</version>
</dependency>
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messageHandler = (mam: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mam.key, mam.message)
KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder, (Array[Byte], Array[Byte])](ssc, kafkaParams, fromOffsets, messageHandler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Caused by: kafka.common.OffsetOutOfRangeException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:184)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

分析

 该问题是因为 Kafka 的 topic 中,数据因为长时间未消费,超出了 log.retention.[hours|minutes|ms] 时间(默认 168 小时,也就是一周)。此时,broker 会将这部分数据清除掉,并更新 offset 信息(offset 变小了)。但是,程序中仍然用的是之前的 offset 信息,所以就会报错超出了现有 offset 的范围

解决

1
2
// 使用不指定 offset 的 createDirectStream 重载方法,并重启 spark streaming 程序
KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicSet)

参考


整体知识树

至此,相信你已经对 Spark 这个生态圈有了大致了解了,下面就是一步一步地 在 实践 和 深入学习中,体验大数据的乐趣啦 O(∩_∩)O~~

Spark EcoSystem

(利用 XMind™ 绘制而成)

资料

Blog

Book

欢迎加入我们的技术群,一起交流学习

人工智能 1020982(高级)& 1217710(进阶)| BigData 1670647

  • 本文作者: Benedict Jin
  • 本文链接: https://yuzhouwan.com/posts/4735/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明出处!
显示 Gitment 评论