91精品国产综合久久四虎久久_国产成人午夜高潮毛片_99er视频精品免费观看_2020亚洲熟女在线观看_日本女优人体写真_国内黄色毛片_年轻的老师中文版在线_丰满女邻居做爰_久久久久久精品成人免费图片

為Spark ML擴(kuò)展結(jié)構(gòu)化流計(jì)算
集成樸素貝葉斯機(jī)器學(xué)習(xí)方法和定制化導(dǎo)出(sink)的實(shí)驗(yàn)方法

Spark的新的ALPHA結(jié)構(gòu)化流計(jì)算API已經(jīng)引起了廣泛的興趣。因?yàn)樗袲ataset、DataFrame和SQL的API都引入了流計(jì)算上下文。然而在這個(gè)初始版本的結(jié)構(gòu)化流計(jì)算里面,機(jī)器學(xué)習(xí)的API并沒有被集成進(jìn)來。但這并沒有阻止我們愉快地去探索集成這兩個(gè)部分(請注意,這里介紹的工作都是探索性的,未來的版本里有可能會變化)。

為了集成結(jié)構(gòu)化流計(jì)算和機(jī)器學(xué)習(xí),我們啟動(dòng)了一個(gè)初步的概念驗(yàn)證項(xiàng)目。項(xiàng)目結(jié)果可以在spark-structured-streaming-ml庫里找到。如果你對支持結(jié)構(gòu)化流計(jì)算的Spark ML管道感興趣,我希望你關(guān)注SPARK-16424改進(jìn)項(xiàng)目,并對我們的早期設(shè)計(jì)版本提出你的意見和想法。

能夠依賴于結(jié)構(gòu)化流計(jì)算實(shí)現(xiàn)的最簡單的流計(jì)算機(jī)器學(xué)習(xí)算法之一就是樸素貝葉斯算法。因?yàn)檫@個(gè)算法的很多計(jì)算都可以被簡化為分組與聚合。實(shí)現(xiàn)的主要挑戰(zhàn)來源于采用什么樣的方法來收集聚合好的數(shù)據(jù)以便做預(yù)測?,F(xiàn)有的流計(jì)算樸素貝葉斯算法所采用的方法并不能直接用。因?yàn)樵赟park結(jié)構(gòu)化流計(jì)算的ForeachSink方法是在Worker上執(zhí)行的,所以無法用最新的計(jì)數(shù)值來更新本地的數(shù)據(jù)結(jié)構(gòu)。

為此,需要使用(如下所示的)Spark結(jié)構(gòu)化流計(jì)算提供的一個(gè)內(nèi)存表的輸出格式來存儲聚合值。

// 使用Dataset的變換操作來計(jì)算計(jì)數(shù)值

val counts = ds.flatMap{

case LabeledPoint(label, vec) =>

vec.toArray.zip(Stream from 1).map(value => LabeledToken(label, value))

}.groupBy($”label”, $”value”).agg(count($”value”).alias(“count”))

.as[LabeledTokenCounts]

// 創(chuàng)建一個(gè)表名來存儲輸出

val tblName = “qbsnb” + java.util.UUID.randomUUID.toString.filter(_ != ‘-‘).toString

// 把聚合結(jié)果以完整的形式寫入內(nèi)存表

val query = counts.writeStream.outputMode(OutputMode.Complete())

.format(“memory”).queryName(tblName).start()

val tbl = ds.sparkSession.table(tblName).as[LabeledTokenCounts]

而這個(gè)實(shí)現(xiàn)樸素貝葉斯算法的方法并不容易被推廣到其他的算法,因?yàn)椴皇撬械乃惴ǘ际峭ㄟ^聚合Dataset里的數(shù)據(jù)來實(shí)現(xiàn)的。不過通過回顧早期基于DStream的Spark流計(jì)算API,我們能夠獲得一個(gè)可能的解決方法的一些思路。如果說你可以采用某種update的機(jī)制來把新數(shù)據(jù)并入到已有的模型里,DStream的foreachRDD方法就可以讓你接觸到底層的微批次的數(shù)據(jù)。不幸的是,在結(jié)構(gòu)化流計(jì)算里面,并沒有foreachRDD的對等方法。不過使用一個(gè)定制化的導(dǎo)出(sink),你是可以在結(jié)構(gòu)化流計(jì)算里面得到類似的行為。

如下所示的導(dǎo)出API是由StreamSinkProvider和Sink類定義的。StreamSinkProvider主要基于指定的SQLContext和導(dǎo)出設(shè)置來創(chuàng)建一個(gè)Sink的實(shí)例;而繼承自Sink類則是提供了方法來批次處理實(shí)際的數(shù)據(jù)。

abstract class ForeachDatasetSinkProvider extends StreamSinkProvider {

def func(df: DataFrame): Unit

def createSink(

sqlContext: SQLContext,

parameters: Map[String, String],

partitionColumns: Seq[String],

outputMode: OutputMode): ForeachDatasetSink = {

new ForeachDatasetSink(func)

}

}

case class ForeachDatasetSink(func: DataFrame => Unit)

extends Sink {

override def addBatch(batchId: Long, data: DataFrame): Unit = {

func(data)

}

}

與把DataFrame寫出到定制化的格式類似,為了使用一個(gè)第三方的導(dǎo)出,你可以指定導(dǎo)出的全類名。因?yàn)槟阈枰付ǜ袷降娜惷?,你必須保證任意SinkProvider的實(shí)例都能更新這個(gè)模型,因?yàn)槟銦o法直接操作這個(gè)構(gòu)造的導(dǎo)出對象。而且你必須要讓模型處于導(dǎo)出類的定義的外部。

object SimpleStreamingNaiveBayes {

val model = new StreamingNaiveBayes()

}

class StreamingNaiveBayesSinkProvider extends ForeachDatasetSinkProvider {

override def func(df: DataFrame) {

val spark = df.sparkSession

SimpleStreamingNaiveBayes.model.update(df)

}

}

在等待Spark ML來更新結(jié)構(gòu)化流計(jì)算的API期間,你就可以用上面所示的定制化導(dǎo)出來集成機(jī)器學(xué)習(xí)算法到結(jié)構(gòu)化流計(jì)算里。

// 使用SimpleStreamNaiveBayes對象內(nèi)的模型來訓(xùn)練

// 如果同時(shí)多個(gè)流都使用這個(gè)對象,則大家都會去更新這個(gè)模型

// 或者使用寫死的查詢名字來防止多個(gè)流的同時(shí)更新。

def train(ds: Dataset[_]) = {

ds.writeStream.format(

“com.highperformancespark.examples.structuredstreaming.” +

“StreamingNaiveBayesSinkProvider”)

.queryName(“trainingnaiveBayes”)

.start()

}

如果你對此還有疑慮,你可以查看這里的Spark內(nèi)部版本是如何構(gòu)建一個(gè)sink來讓它的行為更像原始的foreachRDD的操作的。如果你對定制化的導(dǎo)出支持有興趣,你可以關(guān)注SPARK-16407改進(jìn)項(xiàng)目或者這個(gè)PR(Pull Request,拉取請求)。

很酷的是,無論你是否想使用Spark內(nèi)部版本的API,你現(xiàn)在都可以用Spark早期流計(jì)算機(jī)器學(xué)習(xí)的實(shí)現(xiàn)方法來處理批次更新了。

盡管這個(gè)增強(qiáng)項(xiàng)目肯定還沒有完備到為生產(chǎn)系統(tǒng)所使用,你還是可以發(fā)現(xiàn)結(jié)構(gòu)化流計(jì)算API已經(jīng)提供了多種不同的方法來擴(kuò)展并支持機(jī)器學(xué)習(xí)算法了。

你可以從《高性能Spark:擴(kuò)展和優(yōu)化阿帕奇Spark的最佳實(shí)踐》里了解到更多內(nèi)容。

Holden Karau

Holden Karau是一個(gè)加拿大籍跨性別者和一位積極的開源軟件貢獻(xiàn)者。當(dāng)不在舊金山的IBM Spark技術(shù)中心作為一位軟件工程師工作期間,Holden會在全球宣講Spark,并在家里和咖啡館里提供Spark的技術(shù)指導(dǎo)時(shí)間。她經(jīng)常為Spark貢獻(xiàn)代碼,專注于PySpark和機(jī)器學(xué)習(xí)部分。在加入IBM之前,她在Alpine、Databrick、谷歌、Foursquare和亞馬遜等公司參與了許多與分布式查詢和分類相關(guān)問題的工作。她于滑鐵盧大學(xué)獲得計(jì)算機(jī)專業(yè)的數(shù)學(xué)學(xué)士學(xué)位。除了軟件開發(fā)之外,她還喜歡玩火、焊接、玩踏板車、肉汁乳酪薯?xiàng)l和跳舞。

Spain, Cordoba, Mosque-Cathedral. (source: Berthold Werner on Wikimedia Commons).