Real-time ML with Spark

什么是 Spark?

 Apache Spark™ is a fast and general engine for large-scale data processing – Official website

为什么要有 Spark?

分布式

 具备经济、快速、可靠、易扩充、数据共享、设备共享、通讯方便、灵活等分布式所具备的特性

高层次抽象

 RDD (Resilient Distributed Datasets) 提供 一个可以被并行计算的 不变、分区的数据集 抽象

快速计算能力

 内存计算 基于内存的迭代计算框架,能够避免计算结果落地,磁盘 I/O 所带来的瓶颈
 Machine Learning、Data Mining 等都需要递归地计算,因此非常适合实现这些算法

高效性能

 DAG (Directed Acyclic Grap) 利用有向无环图,构建优化任务中 父 RDD 和 子 RDD 的依赖关系 (Like Ooize)

 依赖分为两种,一个为窄依赖,如 map / filter / union 等;另一种为宽依赖,如 groupByKey
 在划分依赖时,join 需要额外考虑 co-partitione

  • 如果 RDD 和 cogroup 有相同的 数据结构,将会确定一个 OneToOneDependency
  • 反之,则说明 join 的时候,需要 shuffle (ShuffleDependency)

    [建议]: “wide dependencies 只有等到所有 父 partiton 计算完,并传递结束,才能继续进行下一步运算,所以应尽量减少宽依赖,避免失败后 recompute 的成本”

容错性

 Lineage血统,能在计算失败的时候,将会找寻 最小重新计算损耗的 结点,避免全部重新计算

Spark 核心组件

Spark SQL

 同时支持 HiveQL/UDFs/SerDes 等多样性的数据源,并采用 JDBC/ODBC 等标准化连接驱动,保证其通用性

Spark GraphX

 支持在 graphcollection 中查看数据,并提供丰富的 图形处理 API

Spark Streaming

 将数据流 按 时间间隔 Duration划分为一组连续的 RDD,再将这些 RDD抽象为 DStream
 随后,通过对 DStream这个 high-level抽象的操作,实现对底层 标记了时间间隙的 RDD 组的操控

Spark MLbase

 提供了对 Machine Learning的易用、高效的实现
 总体的结构,基于 Spark,自底向上分别是,MLlib / MLI / ML Optimizer

  • MLlib 这一层,设计了 本地/分布式 的矩阵,对稀疏数据的支持,多模型的训练,提供了 计算模型 和 逻辑的 API
  • MLI 主要任务则是 提供 表针采集器 和 逻辑规则,并进一步对 高层次 ML 编程抽象成接口
  • ML Optimizer 则是通过自动构建 ML 的 pipe 管道路径实现 ML 优化器的作用。同时,该优化器还解决了一个在 MLIMLlib 中 表征采集器 和 ML 逻辑规则的搜索问题

Spark 实时机器学习

什么是机器学习

 Wikipedia 给出的定义是,一个计算机科学的子领域,由 模式识别 和 人工智能中的计算机学习理论 演变而来
 探索 结构化的、可学习的规则引擎,如何用来对数据 进行训练 和 预测

什么又是 Real-time 机器学习呢?

 一般性的 机器学习 的对象 是一堆 offline 的训练集,通过对这些数据的学习,来确立模型
 如果数据是快速变化的,这时就需要将 新数据 分配好权重,加入到目标训练集中;之后,将预测出来的结果,再次反馈到 数据模型中去

Real-time 和 No Real-time 的本质区别在哪儿?

 因为 实时模型 是动态更新的,实现算法上,比 非实时的 ML 需要考虑,如何避免依赖 将 新数据 和 旧数据 整合在一块再计算所带来的性能问题
 更多时候,长期积累的数据,是很难再做到全量计算 (比如,多项式贝叶斯 Multinomial naive bayes, 用于处理 dataset 过大,而内存不足的情况)

利用 Spark 实现 Real-time ML

源数据流

  • 利用 java.util.Random产生满足高斯分布的随机数据,再通过 breeze放入到 vector中,作为特征值
  • generateNoisyData中,将这个 vectorinner product, 并加入一点噪声数据,作为 label标签

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    val MaxEvents = 100
    val NumFeatures = 100
    val random = new Random()
    def generateRandomArray(n: Int) = Array.tabulate(n)(_ => random.nextGaussian())
    val w = new DenseVector(generateRandomArray(NumFeatures))
    val intercept = random.nextGaussian() * 10
    def generateNoisyData(n: Int) = {
    (1 to n).map { i =>
    val x = new DenseVector(generateRandomArray(NumFeatures))
    // inner product
    val y: Double = w.dot(x)
    val noisy = y + intercept
    (noisy, x)
    }
    }
  • 通过 socket将数据 发送到指定端口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    if (args.length != 2) {
    System.err.println("Usage: <port> <millisecond>")
    System.exit(1)
    }
    val listener = new ServerSocket(args(0).toInt)
    while (true) {
    val socket = listener.accept()
    new Thread() {
    override def run = {
    println("Got client connected from: " + socket.getInetAddress)
    val out = new PrintWriter(socket.getOutputStream(), true)
    while (true) {
    Thread.sleep(args(1).toLong)
    val num = random.nextInt(MaxEvents)
    val data = generateNoisyData(num)
    data.foreach { case (y, x) =>
    val xStr = x.data.mkString(",")
    val content = s"$y\t$xStr"
    println(content)
    out.write(content)
    out.write("\n")
    }
    }
    socket.close()
    }
    }.start()
    }

实时 Machine Learning模型

  • 指定 spark-master / interval 等参数,创建 StreamingContext (此处可以利用 local[n] 快速开发)

    1
    2
    3
    4
    5
    6
    if (args.length < 4) {
    System.err.println("Usage: WindowCounter <master> <hostname> <port> <interval> \n" +
    "In local mode, <master> should be 'local[n]' with n > 1")
    System.exit(1)
    }
    val ssc = new StreamingContext(args(0), "ML Analysis", Seconds(args(3).toInt))
  • 获取到发送过来的 源数据

    1
    val stream = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
  • 利用 DenseVector.zeros[Double] 创建全零的初始矩阵

  • 使用 StreamingLinearRegressionWithSGD 创建 流式随机递归下降的线性回归 模型

    目前 MLlib 只支持 Streaming ( KMeans / LinearRegression / LinearRegressionWithSGD ) in Spark 1.4.1
    Streaming MLlib 和普通的 MLlib 没有本质上的区别,只是输入的训练集是 DStream,需要使用 foreachRDD/map 进行 训练 / 预测

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val NumFeatures = 100
    val zeroVector = DenseVector.zeros[Double](NumFeatures)
    val model = new StreamingLinearRegressionWithSGD()
    .setInitialWeights(Vectors.dense(zeroVector.data))
    .setNumIterations(1)
    .setStepSize(0.01)
    val labeledStream = stream.map { event =>
    val split = event.split("\t")
    val y = split(0).toDouble
    val features = split(1).split(",").map(_.toDouble)
    LabeledPoint(label = y, features = Vectors.dense(features))
    }
  • 利用 模型 进行 train / predict

    1
    2
    3
    4
    5
    6
    7
    8
    model.trainOn(labeledStream)
    val predictAndTrue = labeledStream.transform { rdd =>
    val latest = model.latestModel()
    rdd.map { point =>
    val predict = latest.predict(point.features)
    (predict - point.label)
    }
    }
  • 通过 MSE (Mean Squared Error) 均方差 和 RMSE (Root Mean Squared Error) 均方根误差 对模型的性能进行评估 (这里也可以使用 RegressionMetrics 来实现)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    predictAndTrue.foreachRDD { (rdd, time) =>
    val mse = rdd.map { case (err) => err * err }.mean()
    val rmse = math.sqrt(mse)
    println( s"""
    |-------------------------------------------
    |Time: $time
    |-------------------------------------------
    """.stripMargin)
    println(s"MSE current batch: Model : $mse")
    println(s"RMSE current batch: Model : $rmse")
    println("...\n")
    }
  • 启动 Spark 上下文

    1
    2
    ssc.start()
    ssc.awaitTermination()

一劳永逸了?Not at all!

 一个优秀的 ML 模型,是要结合具体业务,从对数据流入的清洗,特征值维度的考量,模型类型的选择,到最终的性能的评估、监控、持续优化,都需要仔细地考究,最终才能打造出高效、稳定、精准的数据模型

数据

 对目标数据集进行处理之前,首先就是对数据的类型进行归类,是数值型、类别型、文本型,还是其他一些多媒体、地理信息等
 针对不同的数据,分别采取不同的处理手段,对于类别型常用 1-of-k encoding 对每个类别进行编码
 对于文本型,则会采用 分词、移除 stop words (的、这、地; the/and/but …)、向量化、标准化 (避免度量单位的影响)

1
2
3
4
5
6
7
8
9
import numpy as np
np.random.seed(42)
x = np.random.randn(10)
norm_x_2 = np.linalg.norm(x)
normalized_x = x / norm_x_2
print "x:\n%s" % x
print "2-Norm of x: %2.4f" % norm_x_2
print "Normalized x:\n%s" % normalized_x
print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)

 还有还多常用的数据处理方式,如 平均值、中位数、总和、方差、差值、最大值、最小值
 对 time的处理的方式,还有可以加上 “时间戳”

1
2
3
4
5
6
7
8
9
10
11
12
def assign_tod(hr):
times_of_day = {
'morning' : range(7, 12),
'lunch' : range(12, 14),
'afternoon' : range(14, 18),
'evening' : range(18, 23),
'night' : range(23, 7)
}
for k, v in times_of_day.iteritems():
if hr in v:
return k
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr))

特征维度

 常见的一个影响模型的因素,便是没有对特征 进行标准化

1
(element-wise - the preceding mean vector from the feature vector) / the vector of feature standard deviations

 利用 StandarScaler 完成标准化工作

1
2
3
4
import org.apache.spark.mllib.feature.StandardScaler
val scaler = new StandardScaler(withMean = true, withStd = true).fit(vectors)
val scaledData = data.map(lp => LabeledPoint(lp.label, scaler.transform(lp.features)))

调整模型

 首先需要在众多的模型 和 对应的算法 中找到最为适用的选择
 模型的类别主要有,推荐引擎分类模型回归模型聚类模型
 相应的实现算法,又有 (线性/逻辑/多元) 回归、(随机森林) 决策树、(朴素/高斯/多项式/伯努利/信念网络) 贝叶斯
 在选择的时候,更多会考虑 特征值是否多维 (可以尝试降维),目标类别是 multiclassbinary,还是 probability (连续值)

  • 根据 数据集的 稀疏程度 对正则化 (Regularizer) 进行调整

    1
    2
    3
    4
    5
    6
    7
    zero: 没有任何正规化操作
    L1: SGDStochastic gradient descent,随机梯度下降)
    L2: LBFGSLimited-memory BFGS,受限的 BFGS
    L2 相比 L1 更为平滑(同样,L1 可以让 稀疏的数据集 得到更 直观的模型)
    还有其它 求最优解 的方法,如 求全局最优解的 BGDBatch gradient descent,批量梯度下降)
    但是,由于每次迭代都需要依据训练集中所有的数据,所以速度很慢;
    以及 CGConjugate gradient,共轭梯度法),但还没有被 Spark MLlib 所支持,可以在 Breeze 中找到它
  • 可以通过 setUpdater 将模型的 规则化算法 设置为 L1 (默认为 L2)

    1
    2
    3
    4
    5
    6
    7
    8
    import org.apache.spark.mllib.optimization.L1Updater
    val svmAlg = new SVMWithSGD()
    svmAlg.optimizer.
    setNumIterations(200).
    setRegParam(0.1).
    setUpdater(new L1Updater)
    val modelL1 = svmAlg.run(training)
  • 当然,还有举不胜举的优化方式 sorry for the limit of article’s lenght :-)

性能评估指标

  • 针对不同的业务,对性能评测的手段,也需要相应取舍,毕竟有些 “宁可错杀一千” 的变态 防护系统,就需要对 recall 有较高的要求,而 precision 则相对宽松些

    • 这时便可采用 ROC (Receiver Operating Characteristic) curve 评测引擎:
      1
      2
      3
      4
      5
      6
      7
      8
      // binary classification
      val metrics = Seq(lrModel, svmModel).map { model =>
      val scoreAndLabels = data.map { point =>
      (model.predict(point.features), point.label)
      }
      val metrics = new BinaryClassificationMetrics(scoreAndLabels)
      (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
      }
  • 如果是 贝叶斯 / 决策树 的数据模型,则可以用 0.5对其进行划分,转换为 binary

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // naive bayes
    val nbMetrics = Seq(nbModel).map { model =>
    val scoreAndLabels = nbData.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    // decision tree
    val dtMetrics = Seq(dtModel).map { model =>
    val scoreAndLabels = data.map { point =>
    val score = model.predict(point.features)
    (if (score > 0.5) 1.0 else 0.0, point.label)
    }
    val metrics = new BinaryClassificationMetrics(scoreAndLabels)
    (model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
    }
    val allMetrics = metrics ++ nbMetrics ++ dtMetrics
    allMetrics.foreach { case (m, pr, roc) =>
    println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
    }
  • 然而,如果是一些推荐系统,更多的希望能够了解到 大体的预测精度,则可以采用 MAP (Mean Average Precision) 平均精度均值 进行评估

    1
    MAP 同时还解决了 precision,recall,F-measure 的单点值局限性


_至此,相信你已经对 Spark 这个生态圈有了大致了解了,下面就是一步一步地 在 实践 和 深入学习中,体验大数据的乐趣啦 O(∩_∩)O~~_

更多资源,欢迎加入,一起交流学习

QQ group: (人工智能 1020982 (高级) & 1217710 (进阶) | BigData 1670647)