盡管Spark ML管道提供了各種各樣的算法,你仍可能想要額外的功能,并且不脫離管道模型。在Spark Mllib中,這算不上什么問(wèn)題,你可以通過(guò)RDD的變換來(lái)實(shí)現(xiàn)你自己的算法,并繼續(xù)下去。對(duì)于Spark ML 管道來(lái)說(shuō),同樣的方法是可行的,但是我們會(huì)失去一些管道所具備的優(yōu)良特性,包括自動(dòng)執(zhí)行元算法的能力,例如交叉驗(yàn)證的參數(shù)搜索。在本文中,你會(huì)從標(biāo)準(zhǔn)的wordcount例子入手(在大數(shù)據(jù)方面,你是不可能真正躲開(kāi)wordcount例子的),了解到如何擴(kuò)展Spark ML 管道模型。
為了將你自己的算法加入Spark管道中來(lái),你需要實(shí)現(xiàn)Estimator或者是Transformer,它們都實(shí)現(xiàn)了PipelineStage接口。對(duì)于那些不需要訓(xùn)練的算法,你可以實(shí)現(xiàn)Transformer接口,而對(duì)于那些需要訓(xùn)練的算法,你需要實(shí)現(xiàn)Estimator接口,它們都定義在org.apache.spark.ml下(都實(shí)現(xiàn)了基類(lèi)?PipelineStage)。要說(shuō)明的是,訓(xùn)練并不是只限于復(fù)雜的機(jī)器學(xué)習(xí)模型,即使是最大最小值區(qū)間縮放器也需要訓(xùn)練來(lái)確定范圍。如果你的算法需要訓(xùn)練,它們必須以Estimator來(lái)構(gòu)建而不是Transformer。
注:直接使用PipelineStage是不可行的,因?yàn)楣艿纼?nèi)部使用了反射機(jī)制,假定了所有的管道stage要么是一個(gè)Estimator,要么就是Transformer。
除了顯而易見(jiàn)的transform和fit方法,所有的管道的stage需要提供transformSchema,以及一個(gè)copy構(gòu)造器或者實(shí)現(xiàn)一個(gè)可以為你提供這些功能的類(lèi)。copy是用來(lái)制作一個(gè)當(dāng)前stage的拷貝,合并入任何新指定的參數(shù),可以簡(jiǎn)稱為defaultCopy(除非你的類(lèi)對(duì)構(gòu)造器有特別的考慮)。
class HardCodedWordCountStage(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID(“hardcodedwordcount”))
def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
一個(gè)管道stage的起始以及拷貝代理如下:transformSchema?必須基于任何參數(shù)和一個(gè)輸入模式產(chǎn)生你的管道stage的期望輸出??紤]到已有字段可能會(huì)被使用到,大部分管道stage只增加新的字段,很少的一些會(huì)去掉之前的一些字段。這有時(shí)候會(huì)導(dǎo)致輸出的結(jié)果包含比下游所需的數(shù)據(jù)多,反而會(huì)降低性能。如果發(fā)現(xiàn)你的管道中有這樣的問(wèn)題,那么你可以創(chuàng)建你自己的stage來(lái)去掉不需要的字段。
除了產(chǎn)生輸出模式之外,transformSchema 方法還應(yīng)該驗(yàn)證輸入模式是否適合于該stage(例如,輸入列是否是期望的類(lèi)型)。
這里也是你應(yīng)該對(duì)stage的參數(shù)進(jìn)行驗(yàn)證的地方。一個(gè)簡(jiǎn)單的輸入為字符串輸出為向量的并且寫(xiě)死編碼的輸出和輸入列的transformSchema如下所示:
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex(“happy_pandas”)
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField(“happy_panda_counts”, IntegerType, false))
}
不需要訓(xùn)練的算法可以通過(guò)Transformer接口非常容易地實(shí)現(xiàn)。由于這是最簡(jiǎn)單的管道stage,你可以從實(shí)現(xiàn)一個(gè)簡(jiǎn)單的transformer開(kāi)始,計(jì)算在輸入列中單詞的數(shù)量。
def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(” “).size }
df.select(col(“*”),
wordcount(df.col(“happy_pandas”)).as(“happy_panda_counts”))
}
為了獲得大部分的管道接口,你可能會(huì)想要使你的管道stage可以通過(guò)參數(shù)接口來(lái)達(dá)到可配置化。
盡管參數(shù)接口是公開(kāi)的,不幸的是,常用的Spark中的默認(rèn)參數(shù)都是私有的,所以你最后不得不寫(xiě)大段重復(fù)的代碼。除了允許用戶指定的值,參數(shù)也可以包含一些基本的驗(yàn)證邏輯(例如,正則化的參數(shù)必須是一個(gè)非負(fù)值)。兩個(gè)最常用的參數(shù)是輸入列和輸出列,可以十分簡(jiǎn)單地加到你的模型上去。
除了字符串參數(shù),其他的類(lèi)型也可以使用。包括字符串列表來(lái)接收停止詞,或浮點(diǎn)數(shù)來(lái)接收停止詞。
class ConfigurableWordCount(override val uid: String) extends Transformer {
final val inputCol= new Param[String](this, “inputCol”, “The input column”)
final val outputCol = new Param[String](this, “outputCol”, “The output column”)
; def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
def this() = this(Identifiable.randomUID(“configurablewordcount”))
def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(” “).size }
df.select(col(“*”), wordcount(df.col($(inputCol))).as($(outputCol)))
}
}
不需要訓(xùn)練的算法可以通過(guò)Estimator接口來(lái)實(shí)現(xiàn),盡管對(duì)于許多算法而言,?org.apache.spark.ml.Predictor?或者?org.apache.spark.ml.classificationClassifier?這些幫助類(lèi)更容易實(shí)現(xiàn)。Estimator?和?Transformer接口的主要區(qū)別是,它不再直接在輸入上進(jìn)行變換操作,而是會(huì)首先在一個(gè)train 方法里面進(jìn)行一個(gè)步驟——訓(xùn)練。一個(gè)字符串索引器是你可以實(shí)現(xiàn)的最簡(jiǎn)單的estimator之一。盡管在Spark中可以直接使用了,它仍然是用于說(shuō)明如何使用estimator接口的非常好的例子。
trait SimpleIndexerParams extends Params {
final val inputCol= new Param[String](this, “inputCol”, “The input column”)
final val outputCol = new Param[String](this, “outputCol”, “The output column”)
}
class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {
def setInputCol(value: String) = set(inputCol, value)
def setOutputCol(value: String) = set(outputCol, value)
def this() = this(Identifiable.randomUID(“simpleindexer”))
override def copy(extra: ParamMap): SimpleIndexer = {
defaultCopy(extra)
}
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
override def fit(dataset: Dataset[_]): SimpleIndexerModel = {
import dataset.sparkSession.implicits._
val words = dataset.select(dataset($(inputCol)).as[String]).distinct
.collect()
new SimpleIndexerModel(uid, words)
; }
}
class SimpleIndexerModel(
override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {
override def copy(extra: ParamMap): SimpleIndexerModel = {
defaultCopy(extra)
}
private val labelToIndex: Map[String, Double] = words.zipWithIndex.
map{case (x, y) => (x, y.toDouble)}.toMap
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
override def transform(dataset: Dataset[_]): DataFrame = {
val indexer = udf { label: String => labelToIndex(label) }
dataset.select(col(“*”),
indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))
}
}
如果你正在實(shí)現(xiàn)一個(gè)迭代算法,你可能希望將還沒(méi)有緩存的輸入數(shù)據(jù)緩存起來(lái),或者允許用戶來(lái)指定一個(gè)持久化等級(jí)。
Predictor 接口增加了兩個(gè)最常用的參數(shù)(輸入和輸出列)作為標(biāo)記列、特征列和預(yù)測(cè)列——并且自動(dòng)地幫我們處理模式的變換。
Classifier 接口基本上如出一轍,除了它還增加了一個(gè)rawPredictionColumn ,并且提供了工具來(lái)檢測(cè)類(lèi)別的數(shù)量(getNumClasses方法)以及將輸入的?DataFrame?轉(zhuǎn)化為一個(gè)LabeledPoints的RDD(使其更容易來(lái)封裝傳統(tǒng)的Mllib分類(lèi)算法)。
如果你正在實(shí)現(xiàn)一個(gè)回歸或者聚類(lèi)接口,目前沒(méi)有公開(kāi)的基本接口可以使用,所以你需要使用通用的Estimator接口。
// Simple Bernouli Naive Bayes classifier – no sanity checks for brevity
// Example only – not for production use.
class SimpleNaiveBayes(val uid: String)
extends Classifier[Vector, SimpleNaiveBayes, SimpleNaiveBayesModel] {
def this() = this(Identifiable.randomUID(“simple-naive-bayes”))
override def train(ds: Dataset[_]): SimpleNaiveBayesModel = {
import ds.sparkSession.implicits._
ds.cache()
// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead
// Using the RDD approach is common when integrating with legacy machine learning code
// or iterative algorithms which can create large query plans.
// Here we use Datasets since neither of those apply.
// Compute the number of documents
val numDocs = ds.count
// Get the number of classes.
// Note this estimator assumes they start at 0 and go to numClasses
val numClasses = getNumClasses(ds)
// Get the number of features by peaking at the first row
val numFeatures: Integer = ds.select(col($(featuresCol))).head
.get(0).asInstanceOf[Vector].size
// Determine the number of records for each class
val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)
val classCounts = groupedByLabel.agg(count(“*”).as[Long])
.sort(col(“value”)).collect().toMap
// Select the labels and features so we can more easily map over them.
// Note: we do this as a DataFrame using the untyped API because the Vector
// UDT is no longer public.
val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))
// Figure out the non-zero frequency of each feature for each label and
// output label index pairs using a case clas to make it easier to work with.
val labelCounts: Dataset[LabeledToken] = df.flatMap {
case Row(label: Double, features: Vector) =>
features.toArray.zip(Stream from 1)
.filter{vIdx => vIdx._2 == 1.0}
.map{case (v, idx) => LabeledToken(label, idx)}
}
// Use the typed Dataset aggregation API to count the number of non-zero
// features for each label-feature index.
val aggregatedCounts: Array[((Double, Integer), Long)] = labelCounts
.groupByKey(x => (x.label, x.index))
.agg(count(“*”).as[Long]).collect()
val theta = Array.fill(numClasses)(new Array[Double](numFeatures))
// Compute the denominator for the general prioirs
val piLogDenom = math.log(numDocs + numClasses)
// Compute the priors for each class
val pi = classCounts.map{case(_, cc) =>
math.log(cc.toDouble) – piLogDenom }.toArray
// For each label/feature update the probabilities
aggregatedCounts.foreach{case ((label, featureIndex), count) =>
// log of number of documents for this label + 2.0 (smoothing)
val thetaLogDenom = math.log(
classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)
theta(label.toInt)(featureIndex) = math.log(count + 1.0) – thetaLogDenom
}
// Unpersist now that we are done computing everything
ds.unpersist()
// Construct a model
new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),
new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))
}
override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
}
// Simplified Naive Bayes Model
case class SimpleNaiveBayesModel(
override val uid: String,
override val numClasses: Int,
override val numFeatures: Int,
val pi: Vector,
val theta: DenseMatrix) extends
ClassificationModel[Vector, SimpleNaiveBayesModel] {
override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
// We have to do some tricks here because we are using Spark’s
// Vector/DenseMatrix calculations – but for your own model don’t feel
// limited to Spark’s native ones.
val negThetaArray = theta.values.map(v => math.log(1.0 – math.exp(v)))
val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)
val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)
.map{case (v, nv) => v – nv}
val thetaMinusNegTheta = new DenseMatrix(
numClasses, numFeatures, thetaMinusNegThetaArray, true)
val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray
// Here is the prediciton functionality you need to implement – for ClassificationModels
// transform automatically wraps this – but if you might benefit from broadcasting your model or
// other optimizations you can also override transform.
def predictRaw(features: Vector): Vector = {
// Toy implementation – use BLAS or similar instead
// the summing of the three vectors but the functionality isn’t exposed.
Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)
.map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}
)
}
}
注:如果你只是需要修改一個(gè)已有的算法,你可以(通過(guò)假裝在org.apache.spark項(xiàng)目中來(lái))擴(kuò)展它。
現(xiàn)在你知道如何用你自己的管道stage來(lái)擴(kuò)展Spark的ML管道API。如果你找不到頭緒,一個(gè)好的參考是Spark本身內(nèi)部的算法。盡管有時(shí)候使用了內(nèi)部的API,但是大部分情況下它們實(shí)現(xiàn)公開(kāi)接口的方式與你想要做的是同樣的。
Holden Karau
Holden Karau是一個(gè)加拿大籍跨性別者和一位積極的開(kāi)源軟件貢獻(xiàn)者。當(dāng)不在舊金山的IBM Spark技術(shù)中心作為一位軟件工程師工作期間,Holden會(huì)在全球宣講Spark,并在家里和咖啡館里提供Spark的技術(shù)指導(dǎo)時(shí)間。她經(jīng)常為Spark貢獻(xiàn)代碼,專注于PySpark和機(jī)器學(xué)習(xí)部分。在加入IBM之前,她在Alpine、Databrick、谷歌、Foursquare和亞馬遜等公司參與了許多與分布式查詢和分類(lèi)相關(guān)問(wèn)題的工作。她于滑鐵盧大學(xué)獲得計(jì)算機(jī)專業(yè)的數(shù)學(xué)學(xué)士學(xué)位。除了軟件開(kāi)發(fā)之外,她還喜歡玩火、焊接、玩踏板車(chē)、肉汁乳酪薯?xiàng)l和跳舞。


更多內(nèi)容可以參考Strata北京2017的相關(guān)議題。