到目前為止,我還僅僅只是描述了一些把數(shù)據(jù)從一個(gè)地方拷貝到其他地方的多種的方法。然而,在存儲(chǔ)系統(tǒng)間挪動(dòng)字節(jié)并不是故事的結(jié)尾。實(shí)際上我們發(fā)現(xiàn),“日志”是“流”的另外一種說法,而日志(的處理)是流計(jì)算處理的核心。
但是先等一下,到底什么是流計(jì)算處理?
如果你是上世紀(jì)九十年代末和二十一世紀(jì)初的數(shù)據(jù)庫或者數(shù)據(jù)基礎(chǔ)設(shè)施產(chǎn)品的粉絲,你可能會(huì)把流計(jì)算處理和那些通過SQL引擎或者用“流程圖”界面來進(jìn)行數(shù)據(jù)驅(qū)動(dòng)的處理過程聯(lián)系起來。
而如果你是追隨著爆炸性增長的開源數(shù)據(jù)系統(tǒng)的人,你可能就會(huì)把流計(jì)算處理和諸如Storm、Akka、S4和Samza這樣的系統(tǒng)聯(lián)系起來。很多人會(huì)把這些系統(tǒng)看成是一個(gè)異步消息處理系統(tǒng),和那些基于集群的RPC層上的應(yīng)用沒什么區(qū)別(事實(shí)上有些系統(tǒng)確實(shí)是這樣)。我還曾經(jīng)聽有人過把流計(jì)算描述成一種模式,即立刻處理數(shù)據(jù),隨后就丟棄。
上述兩種觀點(diǎn)都有失偏頗。流計(jì)算處理與SQL毫無關(guān)系;同時(shí)也不局限于實(shí)時(shí)處理系統(tǒng)。沒有任何的理由來限制你去用多種語言來處理昨天或者一個(gè)月以前的數(shù)據(jù)流;也沒有說你必須(或者應(yīng)該)把獲得的原始數(shù)據(jù)丟棄掉。
我對(duì)流計(jì)算處理的看法則更加寬泛,即能做持續(xù)數(shù)據(jù)處理的基礎(chǔ)設(shè)施。我認(rèn)為流計(jì)算處理的計(jì)算模型可以是如同MapReduce那樣的分布式處理框架一樣的通用,只要它能提供低延遲的結(jié)果就可以。
而真正來驅(qū)動(dòng)(或決定)處理模型的則是數(shù)據(jù)收集的方法。通過批次收集的數(shù)據(jù)則自然由按批次處理。對(duì)于持續(xù)流入的數(shù)據(jù),就用持續(xù)的實(shí)時(shí)處理方式。
美國國家統(tǒng)計(jì)局的人口普查數(shù)據(jù)是一個(gè)按批次收集數(shù)據(jù)的好例子。統(tǒng)計(jì)局會(huì)定期啟動(dòng)人口普查,派專人上門去挨家挨戶的收集美國公民的人口數(shù)據(jù)。這種方式對(duì)于在第一次普查開始的1790年(參見圖1-1)來說是有道理的。那時(shí)候的數(shù)據(jù)收集在本質(zhì)上就是批次的,因?yàn)橐T馬去走訪,再在紙上填寫好統(tǒng)計(jì)記錄,再把這些記錄一批批地送到中心點(diǎn)去由人工來累加計(jì)算。而今時(shí)今日,在你給別人說這個(gè)人口普查的過程的時(shí)候,人們會(huì)立刻質(zhì)疑為什么我們不記錄一個(gè)出生和死亡的記錄,然后可以隨時(shí)隨地的用任何的粒度來計(jì)算人口總數(shù)。

圖1-1. 第一次美國人口普查是批次收集數(shù)據(jù)的,因?yàn)槭芟抻诋?dāng)時(shí)的技術(shù)條件。然而,在一個(gè)數(shù)字化、網(wǎng)絡(luò)化的世界里,批次數(shù)據(jù)收集早已不是必須的了
當(dāng)然,這(人口普查)是一個(gè)極端的例子。但是現(xiàn)在很多的數(shù)據(jù)傳輸過程依然依賴于定期的收集和批次化傳輸與集成。顯然批次收集數(shù)據(jù)的最自然的處理就是批次處理。隨著這些過程逐步被實(shí)時(shí)輸入收集所替代,我們也要相應(yīng)的開始進(jìn)行實(shí)時(shí)的數(shù)據(jù)處理,從而能平滑所需的處理資源,并降低延遲。
一個(gè)現(xiàn)在的互聯(lián)網(wǎng)公司并不需要有任何批次數(shù)據(jù)收集。網(wǎng)站產(chǎn)生的數(shù)據(jù)或是用戶行為數(shù)據(jù)或是數(shù)據(jù)庫的改變,而兩者都是持續(xù)發(fā)生的。事實(shí)上,如果你仔細(xì)想想,幾乎任何的業(yè)務(wù)的本質(zhì)的機(jī)制都幾乎是一個(gè)持續(xù)的過程。如杰克 鮑爾所說,事件總是隨時(shí)發(fā)生。出現(xiàn)是批次收集數(shù)據(jù)的情況,一般都是因?yàn)橐恍┦止さ倪^程或者是缺乏數(shù)字化,或者是因?yàn)闅v史原因造成的沒法自動(dòng)化或者數(shù)字化的材料。這時(shí)傳遞和對(duì)數(shù)據(jù)做出反饋一般都非常慢,如果整個(gè)過程需要運(yùn)送紙張并由人來做處理。剛剛實(shí)現(xiàn)自動(dòng)化后,一般還是會(huì)保留原來的處理流程,因此即便是媒介發(fā)生了改變,而(這種)流程還是會(huì)持續(xù)很長時(shí)間。
每天運(yùn)行的批次處理數(shù)據(jù)的產(chǎn)品經(jīng)常是有效地模擬了用一天為時(shí)間窗口的持續(xù)計(jì)算。而底層的數(shù)據(jù)當(dāng)然是總在改變。
上面的討論有助于厘清對(duì)于流計(jì)算處理的常見誤解。通常認(rèn)為某些種類的數(shù)據(jù)處理不適合用流計(jì)算系統(tǒng)來實(shí)現(xiàn),而必須用批處理系統(tǒng)。我聽過的一個(gè)典型的例子就是計(jì)算百分位、最大值、均值和其他類似的需要用所有的數(shù)據(jù)來做的聚合統(tǒng)計(jì)。但是這往往帶來了某種誤解。確實(shí),類似計(jì)算最大值這樣的塊操作需要用時(shí)間窗口內(nèi)的所有數(shù)據(jù)。然后這樣的計(jì)算絕對(duì)能夠通過流計(jì)算系統(tǒng)來實(shí)現(xiàn)。事實(shí)上,如果你查看早期的流計(jì)算的學(xué)術(shù)文章,一般它們完成的第一件事就是給出簡潔的語義來定義時(shí)間窗口,以便于針對(duì)于窗口的操作成為可能。
看到這里,很容易能統(tǒng)一我對(duì)于流計(jì)算處理的觀點(diǎn),即流計(jì)算是更寬泛。 它和是不是塊和非塊并沒有關(guān)系,僅僅只是一個(gè)底層數(shù)據(jù)里包含了時(shí)間定義的處理機(jī)制,并不要求對(duì)于處理的數(shù)據(jù)需要有一個(gè)靜態(tài)的快照。這意味著流計(jì)算處理系統(tǒng)以一個(gè)用戶控制的頻率來產(chǎn)出結(jié)果,而不是一直等到數(shù)據(jù)全部到達(dá)。從這個(gè)角度看,流計(jì)算是批次計(jì)算的一個(gè)更泛化的操作。考慮到現(xiàn)在實(shí)時(shí)數(shù)據(jù)的的普及,這應(yīng)該是一個(gè)更重要的泛化。
為什么這種傳統(tǒng)的對(duì)于流計(jì)算處理的觀點(diǎn)成為一個(gè)先進(jìn)的應(yīng)用。我認(rèn)為最大的原因是因?yàn)槿狈?shí)時(shí)數(shù)據(jù)收集的方法,從而讓持續(xù)處理成為某種理論上的想法。
我確實(shí)認(rèn)為缺乏實(shí)時(shí)數(shù)據(jù)收集的方法是商用流計(jì)算處理系統(tǒng)的夢魘。它們的客戶依然是在做面向文件的日復(fù)一日的ETL和數(shù)據(jù)集成。構(gòu)建流計(jì)算處理系統(tǒng)的公司一般專注于提供計(jì)算引擎來處理實(shí)時(shí)數(shù)據(jù),但卻發(fā)現(xiàn)現(xiàn)實(shí)中很少有客戶有實(shí)時(shí)數(shù)據(jù)流。事實(shí)上在我在領(lǐng)英的早期時(shí)光,有個(gè)公司試圖賣給我們一套非??岬牧饔?jì)算處理系統(tǒng),但因?yàn)楫?dāng)時(shí)我們所有的數(shù)據(jù)都是按小時(shí)收集的文件,所以我們所能想到的就是把這些小時(shí)文件在每小時(shí)結(jié)束的時(shí)候喂給這個(gè)系統(tǒng)。這個(gè)公司的工程師發(fā)現(xiàn)這是一個(gè)非常常見的問題。唯一真實(shí)的例外就是金融界。在這個(gè)領(lǐng)域里流計(jì)算處理有一些成功的案例,而恰恰是因?yàn)檫@個(gè)領(lǐng)域里實(shí)時(shí)流數(shù)據(jù)才是主流,而如何處理這些實(shí)時(shí)數(shù)據(jù)流才是主要關(guān)注點(diǎn)。
即使是在健康的批處理生態(tài)系統(tǒng)里,實(shí)際上流計(jì)算處理作為基礎(chǔ)架構(gòu)類型的適用性也是很強(qiáng)的。它涵蓋了實(shí)時(shí)處理/相應(yīng)業(yè)務(wù)和離線批處理業(yè)務(wù)的基礎(chǔ)架構(gòu)上的鴻溝。對(duì)于現(xiàn)代的互聯(lián)網(wǎng)企業(yè),我認(rèn)為大約25%的代碼是關(guān)于這種鴻溝的。
現(xiàn)在發(fā)現(xiàn)日志(log)解決了流計(jì)算處理里的一些非常關(guān)鍵的技術(shù)問題。后面我會(huì)陸續(xù)介紹這些問題,但其中最大的問題它解決的就是它讓數(shù)據(jù)成為了實(shí)時(shí)的多訂閱者的數(shù)據(jù)導(dǎo)入機(jī)制。
對(duì)那些希望能更多了解日志和流計(jì)算處理間的關(guān)系人,我們提供了開源的Samza,一個(gè)專門為這些想法構(gòu)建的實(shí)時(shí)流計(jì)算處理系統(tǒng)。在這個(gè)鏈接里面我們很詳細(xì)地介紹了這些想法的應(yīng)用。但這不是專門為了某個(gè)特定的流計(jì)算處理系統(tǒng)的,因?yàn)閹缀跛械闹饕饔?jì)算處理系統(tǒng)都和Kafka有某種程度的集成,讓Kafak來作為數(shù)據(jù)的日志來進(jìn)行處理。
數(shù)據(jù)流圖
關(guān)于流計(jì)算處理的最有趣的方面就是它和一個(gè)流計(jì)算處理系統(tǒng)的內(nèi)部機(jī)制沒有任何關(guān)系,想反的是,相關(guān)的是他擴(kuò)展了我們前面數(shù)據(jù)集成討論里的數(shù)據(jù)源的觀點(diǎn)。我們主要討論了主要數(shù)據(jù)源和主要數(shù)據(jù)的日志化。即事件和數(shù)據(jù)都是直接由各種應(yīng)用運(yùn)行中生成的。流計(jì)算處理讓我們可以也包含從其他數(shù)據(jù)源里計(jì)算出的數(shù)據(jù)源。 這些計(jì)算出來的數(shù)據(jù)源對(duì)消費(fèi)者而言與用來計(jì)算其的其他數(shù)據(jù)源沒什么區(qū)別(請(qǐng)參看圖1-2)。

圖1-2.來自多日志的多路流處理圖
這些計(jì)算出來數(shù)據(jù)源可能會(huì)包含相當(dāng)復(fù)雜和智力的成分在其處理過程里,因此也是極具價(jià)值的。例如,谷歌在這里描述了它是如何在一個(gè)流計(jì)算處理系統(tǒng)上重構(gòu)它的網(wǎng)頁爬取、處理和建索引的管道的過程。這可能是這個(gè)行星上最復(fù)雜、最大規(guī)模的數(shù)據(jù)處理系統(tǒng)之一了。
所以什么是流計(jì)算處理過程?對(duì)于我們的目的而言,一個(gè)流計(jì)算處理工作就是那些從日志中讀取并輸出到日志或其他系統(tǒng)的任務(wù)。那些作為輸入和輸出的日志把整個(gè)流程連接成了一個(gè)處理階段的圖。使用這種中心化日志的形式,你就能觀察所有機(jī)構(gòu)的數(shù)據(jù)的獲取、轉(zhuǎn)換和流動(dòng),其實(shí)就是一系列日志以及從他們中讀出和寫入他們的過程。
一個(gè)流計(jì)算處理過程并不必需要有一個(gè)時(shí)髦的框架。它可以是任何一個(gè)或多個(gè)讀取和寫入日志的過程。額外的基礎(chǔ)架構(gòu)和支持能夠幫助管理和擴(kuò)展這種近乎實(shí)時(shí)的處理過程程序,而這也就是流計(jì)算框架所做的。
日志和流計(jì)算處理
為什么你在所有的流計(jì)算處理里需要日志?為什么不是讓處理單元通過簡單的TCP或者其他輕量級(jí)的消息協(xié)議來更直接的通信?有多個(gè)理由來支持這一(日志)模式。
首先,這種模式可以讓每個(gè)數(shù)據(jù)集都能為多訂閱者所用。每個(gè)流處理過程的輸入對(duì)任何需要的處理器都可用;同時(shí)每個(gè)輸出也都對(duì)任何需要的都可用。這一點(diǎn)不僅對(duì)生產(chǎn)數(shù)據(jù)流很好用,而且也在復(fù)雜的數(shù)據(jù)處理管道里調(diào)試和監(jiān)控階段很有幫助。能快速的進(jìn)入一個(gè)輸出流并檢查它的有效性,同時(shí)計(jì)算一些監(jiān)控的統(tǒng)計(jì)數(shù)據(jù),或者僅僅只是看看數(shù)據(jù)長什么樣,這些都使得開發(fā)變的非常有可追蹤性。
其次,這樣使用日志能確保每個(gè)數(shù)據(jù)消費(fèi)者處理過程中順序可以被保留。某些事件數(shù)據(jù)可能被按時(shí)間戳松散地排序了,但是不是每種事件數(shù)據(jù)都這樣??紤]從來自數(shù)據(jù)庫的一個(gè)更新流,我們可能有一系列的流處理任務(wù)來處理這些數(shù)據(jù)并準(zhǔn)備為搜索索引來做索引。如果對(duì)同一個(gè)記錄同時(shí)做兩次更新,那么我們最后可能在索引的最終結(jié)果出錯(cuò)。
這樣使用日志的最后一個(gè)可能也是最重要(可探討)的原因是它提供了緩存和對(duì)每個(gè)處理過程的隔離。如果一個(gè)處理器產(chǎn)生結(jié)果的速度比它后續(xù)的消費(fèi)程序的處理能力快,我們可以有三種選擇:
- 我們可以先暫停上游的處理任務(wù),直到下游的任務(wù)可以處理。如果只用TCP而沒有使用日志,這種情況是最可能發(fā)生的。
- 我們就把數(shù)據(jù)丟棄掉。
- 我們可以在兩個(gè)處理任務(wù)間緩存數(shù)據(jù)。
丟棄數(shù)據(jù)在某些場合可能沒什么。但是基本都是不可接受的,也從來不被希望這樣做。
暫停(上游)處理聽起來似乎是一個(gè)可接受的選擇。但實(shí)際中這會(huì)成為一個(gè)很大的問題??紤]到我們需要的不僅僅是對(duì)單一的應(yīng)用流程建立模型,而是為整個(gè)機(jī)構(gòu)建立全套的數(shù)據(jù)流模型。這就將不可避免的形成一個(gè)復(fù)雜的數(shù)據(jù)處理流網(wǎng)絡(luò),由不同的部門團(tuán)隊(duì)的不同的數(shù)據(jù)處理器來組成,并支持不同的SLA。在這樣復(fù)雜的數(shù)據(jù)處理網(wǎng)絡(luò)里,如果因?yàn)楹罄m(xù)處理能力不足或者失敗而導(dǎo)致上游的數(shù)據(jù)產(chǎn)生器被暫停,這都會(huì)級(jí)聯(lián)地影響上游數(shù)據(jù)流程序,從而使得的很多處理器都被暫停。
這樣看來唯一可用的選擇:緩存。日志可以是非常非常大的緩存,可以讓處理程序被重啟,或者即使失效了也不會(huì)影響處理圖里的其他部分。這也意味著某個(gè)數(shù)據(jù)消費(fèi)程序可以停機(jī)很長時(shí)間,而不會(huì)影響上游的程序。只要在它重啟后能及時(shí)處理完緩存的數(shù)據(jù),大家都皆大歡喜。
在其他地方,這也不是一個(gè)不尋常的模式。巨大、復(fù)雜的MapReuce流就使用了文件作為檢查點(diǎn),并共享他們的中間處理結(jié)果。巨大、復(fù)雜的SQL處理管道也是創(chuàng)建了很多中間的臨時(shí)表。這里僅僅只是運(yùn)用了這種模式的抽象-日志-使得它適合于處理運(yùn)動(dòng)中的數(shù)據(jù)。
Storm和Samza是兩種基于這個(gè)模式構(gòu)建的流技術(shù)處理系統(tǒng),也能使用Kafka或者其他類似的系統(tǒng)作為他們的日志部分。
處理數(shù)據(jù):Lambda架構(gòu)和一個(gè)可替換的方案
一個(gè)基于這種日志數(shù)據(jù)模型的有趣的應(yīng)用就是Lambda架構(gòu),由內(nèi)森·馬茲提出。他寫了一個(gè)廣為傳播的博客(《如何打敗CAP定理》),其中介紹把流處理系統(tǒng)和批次處理相結(jié)合的方法。這個(gè)架構(gòu)被證明是一個(gè)非常流行的想法。已經(jīng)有專門的網(wǎng)站和書籍了。
什么是Lambda架構(gòu)?如何運(yùn)用?
Lambda架構(gòu)一般類似于圖1-3。

圖1-3. Lambda架構(gòu)
它工作的方式是不可變的一系列數(shù)據(jù)記錄被采集,并同時(shí)并行地送給批處理和流處理系統(tǒng)。數(shù)據(jù)轉(zhuǎn)換的邏輯被實(shí)現(xiàn)兩次,一次是在批處理系統(tǒng)里,一次是在流處理系統(tǒng)里。然后把二者處理的結(jié)果在查詢的時(shí)候合并,給出一個(gè)完整的答案。
這個(gè)方式有很多中變形,這里我是有意地簡化了許多。例如你可以使用多種類似的系統(tǒng),如Kafka、Storm和Hadoop。而大家也經(jīng)常會(huì)使用兩種不同的數(shù)據(jù)庫來存儲(chǔ)輸出結(jié)果,一種是專門為實(shí)時(shí)處理優(yōu)化的數(shù)據(jù)庫,而其他的則是為批處理所準(zhǔn)備的。
Lambda架構(gòu)的優(yōu)點(diǎn)?
Lambda架構(gòu)強(qiáng)調(diào)保留原始輸入數(shù)據(jù)不變。我認(rèn)為這是一個(gè)非常重要的特性。處理復(fù)雜數(shù)據(jù)流的能力得到了很大的加強(qiáng),因?yàn)槟軌蚩吹綌?shù)據(jù)是什么樣和輸出是什么樣。
我還喜歡這個(gè)架構(gòu)強(qiáng)調(diào)了重復(fù)處理數(shù)據(jù)的問題。能重復(fù)處理數(shù)據(jù)對(duì)流計(jì)算系統(tǒng)來說是一個(gè)重大挑戰(zhàn),但卻經(jīng)常被忽略。
對(duì)于重復(fù)處理,我的意思是再次處理輸入的數(shù)據(jù)從而再次計(jì)算出結(jié)果。這是一個(gè)太明顯,但也經(jīng)常被忽略的需求。代碼總是在變。所以如果你的代碼已經(jīng)從輸入流中計(jì)算出了結(jié)果,當(dāng)代碼改變后,你需要再次計(jì)算輸出來檢查代碼修改的效果。
為什么代碼會(huì)變?也許是因?yàn)槟愕膽?yīng)用在演進(jìn),你又想計(jì)算一些之前不需要的新的輸出項(xiàng)?;蛘呤悄惆l(fā)現(xiàn)了一個(gè)代碼缺陷并修好了。無論如何,當(dāng)這件事發(fā)生的時(shí)候,你就需要重新計(jì)算你的輸出結(jié)果。我發(fā)現(xiàn)很多的人試圖去構(gòu)建一個(gè)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),但根本不去仔細(xì)思考這個(gè)問題,并最終導(dǎo)致系統(tǒng)不能很快的演進(jìn),僅是因?yàn)闆]有一個(gè)好的方法來解決重復(fù)處理的需求。
它的缺點(diǎn)
Lambda架構(gòu)的一個(gè)問題就是需要維護(hù)兩套復(fù)雜的分布式系統(tǒng)的代碼,這看起來就很頭疼。但我不認(rèn)為這個(gè)問題不能解決。
如Storm和Hadoop這樣的分布式框架的編程是很復(fù)雜。不可避免的是代碼最后會(huì)專門為所使用的框架而特殊地構(gòu)建。由此導(dǎo)致的運(yùn)維的復(fù)雜性是被所有使用這個(gè)框架的人所一致同意的。
解決這一問題的一個(gè)方法是對(duì)實(shí)時(shí)和批次框架抽象出一種語言或框架。用這個(gè)高級(jí)框架來寫你的代碼,然后去“編譯”成或是流計(jì)算處理代碼,或是MapReduce的批處理代碼。Summingbird就是這樣做的一個(gè)框架。它確實(shí)讓事情好了一些,但我不認(rèn)為它真正解決了問題。最終,即便你能避免為應(yīng)用寫兩套代碼,運(yùn)維兩套系統(tǒng)的負(fù)擔(dān)還是很重的。新的抽象僅僅是提供了兩套系統(tǒng)的交集所支持的特征。更糟的是,使用這一統(tǒng)一的框架就把整個(gè)生態(tài)系統(tǒng)里那些使得Hadoop非常強(qiáng)大的工具和語言(如Hive、Pig、Crunch、Cascading、Oozie等)給排除在外了。
打個(gè)比方,想想那個(gè)使跨數(shù)據(jù)庫對(duì)象關(guān)系映射(ORM)透明化的臭名昭著的難題。而這也還是對(duì)非常相同的系統(tǒng)進(jìn)行抽象來用標(biāo)準(zhǔn)接口語言提供相同的能力哦。那么去抽象化兩個(gè)完全不同的構(gòu)建于剛剛穩(wěn)定化的分布式系統(tǒng)上的編程模式將會(huì)更加的難。
一個(gè)備選方案
作為一個(gè)基礎(chǔ)架構(gòu)的設(shè)計(jì)者,我認(rèn)為真正有意義的問題是:為什么不去改進(jìn)流計(jì)算系統(tǒng)來解決所有的問題集?為什么你需要再粘貼一個(gè)別的系統(tǒng)(批處理)?為什么你不去把實(shí)時(shí)處理和在代碼改變時(shí)需要的重復(fù)處理一起解決?流計(jì)算處理系統(tǒng)已經(jīng)有了并行的概念,為什么不是去通過增加并行性來解決重復(fù)處理的問題,并很快的再現(xiàn)歷史?答案是你可以這么做。我認(rèn)為這就是如果你需要構(gòu)建這樣一個(gè)系統(tǒng)的一個(gè)合理的備選方案。
當(dāng)我和別人討論這個(gè)思想的時(shí)候,有時(shí)他們會(huì)告訴我流計(jì)算處理對(duì)于高吞吐率處理歷史數(shù)據(jù)并不合適。但這是他們基于已經(jīng)使用的系統(tǒng)的缺陷的直覺反應(yīng)。這些系統(tǒng)要不就是很難擴(kuò)展,或者是根本就沒存歷史數(shù)據(jù)。但沒有理由認(rèn)為這就是對(duì)的。流計(jì)算處理系統(tǒng)的基本抽象就是數(shù)據(jù)流的有向無環(huán)圖(DAG)。這和傳統(tǒng)的數(shù)據(jù)倉庫(如Volcano)的基本抽象是一樣的,并和MapReduce的后繼Tez的基本底層抽象也是一樣。流計(jì)算處理僅僅是對(duì)這一數(shù)據(jù)流模型的泛化,即對(duì)中間結(jié)果提供檢查點(diǎn)(checkpointing)并持續(xù)地輸出到最終的用戶。
那么我們?cè)趺床拍軓奈覀兊牧魈幚砣蝿?wù)里完成重復(fù)處理?我最喜歡的方法其實(shí)非常的簡單。
1. 使用Kafka或者其他的一些能幫你保存你想重復(fù)處理的數(shù)據(jù)的全部日志的系統(tǒng),這些系統(tǒng)還要能支持多訂閱者功能。例如,如果你想重復(fù)處理之前30天的數(shù)據(jù),那就把Kafka的保存時(shí)間設(shè)成30天。
2. 當(dāng)你想重復(fù)處理數(shù)據(jù)時(shí),啟動(dòng)第二個(gè)你的流計(jì)算系統(tǒng)的實(shí)例,從保留數(shù)據(jù)的開始再次處理這些數(shù)據(jù),不過把結(jié)束輸出到新的表。
3. 當(dāng)?shù)诙€(gè)實(shí)例已經(jīng)可以趕上現(xiàn)有數(shù)據(jù)的進(jìn)度了,就把應(yīng)用定向到從新的輸出表里讀取數(shù)據(jù)。
4. 停止原有版本的任務(wù)實(shí)例,并刪除舊的輸出表。
這個(gè)架構(gòu)類似圖1-4里所示。

圖1-4. Lambda架構(gòu)的一個(gè)備選替換方案,移除了批處理系統(tǒng)。
與Lambda架構(gòu)不同,這個(gè)方法里你只是當(dāng)你代碼改變而確實(shí)需要再計(jì)算結(jié)果的時(shí)候你才需要重復(fù)處理。當(dāng)然這樣的重復(fù)計(jì)算只是用你代碼的改進(jìn)版本,使用相同的框架,并處理相同的數(shù)據(jù)。
很自然的,你希望能多給你的并行的重復(fù)處理任務(wù)更多的資源以便于讓它能非??斓赝瓿?。
當(dāng)然,你可以進(jìn)一步優(yōu)化這個(gè)方法。很多情況下你可以把兩個(gè)輸出表合并。然而我認(rèn)為讓兩個(gè)輸出表并存一段時(shí)間是有不少好處的。這可以及時(shí)回退回舊的邏輯,而你所需要做的僅僅只是把應(yīng)用再重定向到舊的表。另外,對(duì)于一些非常重要的場景里,你可以使用自動(dòng)的A/B測試或者多臂強(qiáng)盜算法來控制這個(gè)切換,保證新的代碼確實(shí)是比舊有邏輯有改進(jìn)而不是變的更差了。
需要注意的是,這個(gè)方法并不是說你的數(shù)據(jù)不能輸出到Hadoop里。它僅僅是說你不必要在Hadoop里面做重復(fù)處理。Kafka和Hadoop有非常好的集成,所以從中導(dǎo)出任何Kafka的topic都很簡單。通常把一個(gè)流計(jì)算處理任務(wù)的輸出甚至是中間結(jié)果鏡像到Hadoop中,從而讓一些例如Hive這樣的分析工具來處理,或者是作為其他人的輸入,或是為離線數(shù)據(jù)處理流服務(wù)。這都是很有用的。
我們已經(jīng)記錄了如何實(shí)現(xiàn)這個(gè)方法,包括使用Samza實(shí)現(xiàn)重復(fù)處理的框架的其他變形。
這兩種方法所對(duì)應(yīng)的效率和資源之間的權(quán)衡是值得討論的。Lambda架構(gòu)需要同時(shí)一直運(yùn)行重復(fù)處理和實(shí)時(shí)處理任務(wù)。而我提出的方法僅僅只是在需要重復(fù)處理的時(shí)候才運(yùn)行第二個(gè)任務(wù)的實(shí)例。然而我的提議要求有臨時(shí)的額外的輸出數(shù)據(jù)庫的存儲(chǔ)。同時(shí)數(shù)據(jù)庫也要能支持這樣大容量的寫操作。兩種情況下,重復(fù)處理所造成的額外任務(wù)都可以被平均分布出去。如果你有很多這樣的任務(wù),他們不需要一下都重復(fù)處理完。所以在一個(gè)共享的集群里,如果有很多個(gè)這樣的任務(wù),你需要預(yù)留一部分容量來處理這些隨時(shí)會(huì)發(fā)生的任務(wù)。
我提議方法的真正的優(yōu)勢不是效率,而是能讓大家在一個(gè)單一的處理框架里開發(fā)、測試、調(diào)試和運(yùn)維他們的系統(tǒng)。
所以在簡介是很重要的場景里,可以把我的方法作為Lambda架構(gòu)的一個(gè)備選方案。
有狀態(tài)的實(shí)時(shí)處理
日志和流計(jì)算處理之間的關(guān)系不僅僅限于重復(fù)處理。如果實(shí)際的流計(jì)算處理系統(tǒng)需要維護(hù)狀態(tài)信息,這時(shí)使用日志就可以有另一個(gè)新的用處了。
一些實(shí)時(shí)流處理系統(tǒng)僅僅是無狀態(tài)的一次性數(shù)據(jù)轉(zhuǎn)換。但是很多場景下都是比較復(fù)雜的計(jì)數(shù)、匯聚或窗口間的連接等操作。例如,你可能希望對(duì)事件流(比如點(diǎn)擊流)進(jìn)行增強(qiáng),比如通過連接點(diǎn)擊流和用戶帳號(hào)數(shù)據(jù)庫來給點(diǎn)擊加上用戶信息。不可避免的,類似這樣的處理最終都會(huì)需要保存某種程度的狀態(tài)信息。例如當(dāng)計(jì)數(shù)的時(shí)候,你需要保留此前的計(jì)數(shù)值。這樣的狀態(tài)信息怎么樣才能保留下來當(dāng)處理器自身會(huì)發(fā)生失效?
最簡單的方法就是把狀態(tài)信息放到內(nèi)存里。然而如果處理器崩潰了,這個(gè)信息就丟失了。如果狀態(tài)信息僅僅是在一個(gè)窗口里維護(hù)的,這個(gè)處理器就可以從這個(gè)窗口開始的點(diǎn)重新再來。但是如果計(jì)算一小時(shí)的計(jì)數(shù),這個(gè)方法可能就不行了。
另外一個(gè)方法就是把所有的狀態(tài)信息都存儲(chǔ)到一個(gè)異地系統(tǒng),并通過網(wǎng)絡(luò)獲取。這個(gè)方法的問題是沒有本地化的數(shù)據(jù),還需要很多的網(wǎng)絡(luò)傳輸。
我們?cè)趺茨苤С忠恍┤绨岩粡埍矸制教幚砝锏牟僮鳎?/p>
回顧之前對(duì)于表和日志的二元性的討論,這給我們提供了能把流轉(zhuǎn)換成表并和我們的處理并存的方法。這還提供了一個(gè)對(duì)于表失效的解決機(jī)制。
流處理器可以把它的狀態(tài)保持到一個(gè)本地的表或者索引里,如dbd和RocksDN,或者一些更不尋常的機(jī)制,比如Lucene或fastbit索引等。這些存儲(chǔ)的內(nèi)容是從輸入流里導(dǎo)入的(可能是首先做一個(gè)強(qiáng)制轉(zhuǎn)換后)。它對(duì)本地索引可以記錄下修改日志(changelog),并保存這些修改日志,從而在系統(tǒng)奔潰或重啟后可以恢復(fù)出狀態(tài)信息來。這就可以提供一個(gè)通用的機(jī)制來保存狀態(tài)信息在一個(gè)索引類型的本地內(nèi)。
當(dāng)(流)處理過程失效,它就從修改日志里恢復(fù)索引。日志在這里就變成了把本地的狀態(tài)信息轉(zhuǎn)換成一個(gè)增長的隨時(shí)記錄的備份。
這個(gè)狀態(tài)管理的方法有一個(gè)優(yōu)雅的特性,即處理器的狀態(tài)也被維護(hù)成了一個(gè)日志。我們可以把這個(gè)日志想成是數(shù)據(jù)庫表的修改日志。事實(shí)上,處理器有一些伴隨著它的非常類似于同分片表的東西。因?yàn)闋顟B(tài)本身就是日志,其他的處理器也可以訂閱它,這就可以非常有用。比如整個(gè)處理過程的目標(biāo)就是更新輸出的最終狀態(tài)的場景。
當(dāng)把從數(shù)據(jù)庫里輸出的日志結(jié)合起來看,日志/表二元性的威力就非常清晰了。修改日志(changlog)可以從數(shù)據(jù)庫里抽取,并被不同的流處理器用不同的方式檢索來和事件流所連接。
我們提供了在Samza里面使用這種類型的狀態(tài)管理的細(xì)節(jié),以及很多實(shí)際的例子。
日志壓縮
當(dāng)然,我們不能希望能保持所有時(shí)間的狀態(tài)改變的完整日志。除非你有無限的空間,不然日志總是要被清理的。我會(huì)介紹如何在Kafka里面實(shí)現(xiàn)這個(gè)功能的。
在Kafka里,清理有兩個(gè)選擇,取決于數(shù)據(jù)是僅包含純事件數(shù)據(jù)還是鍵值化的更新。對(duì)于事件數(shù)據(jù),我的意思是沒有相關(guān)的情形發(fā)生,比如網(wǎng)頁瀏覽、點(diǎn)擊或者其他你會(huì)在一個(gè)應(yīng)用的日志里發(fā)現(xiàn)的東西。對(duì)于鍵值化的更新,我的意思是事件有特別記錄的狀態(tài)改變,而這被用某些鍵值所識(shí)別。數(shù)據(jù)庫的修改就是一個(gè)典型的鍵值化更新的例子。
對(duì)于事件數(shù)據(jù),Kafka支持保存數(shù)據(jù)的窗口。這個(gè)窗口可以是用時(shí)間(以天)或者是空間(以GB)為單位。大部分人僅僅只是使用它默認(rèn)的一個(gè)星期為保存窗口。如果你希望有無限的保存期,就把這個(gè)窗口設(shè)成無限,你的數(shù)據(jù)就永遠(yuǎn)不會(huì)丟失。
然而,對(duì)于鍵值化的數(shù)據(jù),一個(gè)完整日志的非常好的特征就是你可以重現(xiàn)源系統(tǒng)的狀態(tài)。即,如果你有這個(gè)修改的日志,你可以在另外一個(gè)數(shù)據(jù)庫里再現(xiàn)這個(gè)表,并重建這個(gè)表的任意時(shí)間點(diǎn)的狀態(tài)。這對(duì)于不同系統(tǒng)也適用。你可以在另外一個(gè)數(shù)據(jù)庫里再現(xiàn)源數(shù)據(jù)庫里的更新,并維護(hù)數(shù)據(jù)的主鍵(一個(gè)搜索的索引、一個(gè)本地庫等等)。
然而,隨著時(shí)間的延長,保存完整的日志會(huì)消耗越來越多的空間,再現(xiàn)過程也會(huì)越來越久。因此在Kafka里,為了支持這種應(yīng)用場景,我們支持不同類型的保存。其中一個(gè)例子就展示在圖1-5里。不是簡單地完全丟棄舊的日志,我們收集了日志末尾的部分里過時(shí)的記錄作為垃圾。任何在日志末尾的記錄有最近的更新就會(huì)適合于清理(只保留最新的更新)。這么做就可以保證日志保存了一個(gè)源系統(tǒng)的完整的備份,但是我們現(xiàn)在不必在完全重建所有的之前的狀態(tài)了,而僅僅只是最近更新的狀態(tài)。我們把這個(gè)特性稱為日志壓縮。

圖1-5.日志壓縮可以確保日志里只保留每個(gè)鍵值的最新的更新。這對(duì)模型更新成可變數(shù)據(jù)的日志是很有用的
Jay Kreps
Jay Kreps是Confluent的聯(lián)合創(chuàng)始人和CEO。Confluent專注于Apache Kafka。在此之前,Jay是領(lǐng)英的主要架構(gòu)師之一,專注于數(shù)據(jù)基礎(chǔ)架構(gòu)和數(shù)據(jù)驅(qū)動(dòng)的產(chǎn)品。他是多個(gè)可擴(kuò)展的數(shù)據(jù)系統(tǒng)空間的開源項(xiàng)目的作者之一,包括Voldemort、Azkaban、Kafka和Samza。

