91精品国产综合久久四虎久久_国产成人午夜高潮毛片_99er视频精品免费观看_2020亚洲熟女在线观看_日本女优人体写真_国内黄色毛片_年轻的老师中文版在线_丰满女邻居做爰_久久久久久精品成人免费图片

使用Apache Kafka和Apache Pulsar創(chuàng)建任務(wù)隊(duì)列
編者注:敬請(qǐng)關(guān)注2019年6月18日至21日在北京舉行的人工智能大會(huì)上的相關(guān)議題。

使用Kafka和Pulsar的一個(gè)常見用例是創(chuàng)建任務(wù)隊(duì)列。這兩種技術(shù)為實(shí)現(xiàn)此用例提供了不同的實(shí)現(xiàn)。我將討論用Kafka和Pulsar實(shí)現(xiàn)任務(wù)隊(duì)列的方法,以及每個(gè)任務(wù)隊(duì)列各自的相對(duì)優(yōu)勢(shì)。

什么是任務(wù)隊(duì)列?

任務(wù)隊(duì)列正在使用消息傳遞技術(shù)通過發(fā)布消息來添加工作單位。此消息將由另一個(gè)進(jìn)程(最好是一組進(jìn)程)使用,然后對(duì)其進(jìn)行某種處理。

任務(wù)隊(duì)列與其他處理的時(shí)間量不同。大多數(shù)常規(guī)處理(如ETL或簡(jiǎn)單處理)的數(shù)據(jù)應(yīng)用毫秒度量,最長(zhǎng)不超過1秒。任務(wù)隊(duì)列將處理更長(zhǎng)的時(shí)間,以秒、分鐘、小時(shí)度量。

work_queue

這也稱為分布式任務(wù)隊(duì)列。這是因?yàn)閱蝹€(gè)機(jī)器或流程不足以滿足需求。我們必須在許多不同的進(jìn)程和計(jì)算機(jī)上分發(fā)處理流程。隨著對(duì)分布式技術(shù)的這種需求,任務(wù)的復(fù)雜性增加了10-15倍。

任務(wù)隊(duì)列的例子

為了幫助您理解任務(wù)隊(duì)列,讓我舉幾個(gè)我在現(xiàn)實(shí)世界中看到的簡(jiǎn)單例子。所有例子的共同點(diǎn)是需要長(zhǎng)時(shí)間處理,并盡快返回結(jié)果。

視頻轉(zhuǎn)碼

某些用例要求用戶上傳視頻。此視頻將保存到存儲(chǔ)桶中。上傳視頻后,Web服務(wù)將發(fā)布將由消費(fèi)者群集使用的消息。該消息將包含要轉(zhuǎn)碼的視頻的存儲(chǔ)區(qū)URL。這些消費(fèi)者集群進(jìn)程將對(duì)視頻進(jìn)行轉(zhuǎn)碼或預(yù)處理成網(wǎng)絡(luò)友好的格式準(zhǔn)備播放。轉(zhuǎn)碼需要幾分鐘到幾小時(shí)才能完成。視頻完成后,轉(zhuǎn)碼過程應(yīng)發(fā)布視頻已準(zhǔn)備好的消息。

語(yǔ)音識(shí)別與情感分析

在一些例子中,處理呼叫中心的電話呼叫數(shù)據(jù)需要被處理。呼叫完成后,需要進(jìn)行多個(gè)處理流程。首先,呼叫的語(yǔ)音對(duì)話需要進(jìn)行語(yǔ)音識(shí)別才能將音頻更改為文本。接下來,文本將對(duì)其進(jìn)行各種NLP或情緒分析。整個(gè)處理過程需要1-60分鐘。處理完成后,需要發(fā)布一條消息來標(biāo)記對(duì)話、或?qū)?duì)話進(jìn)行打分。

任務(wù)隊(duì)列有什么困難?

最開始碰到的困難是任務(wù)負(fù)載平衡。您需要確保一個(gè)有一個(gè)長(zhǎng)時(shí)間運(yùn)行的進(jìn)程不備份隊(duì)列的其余部分。其余的任務(wù)需要繼續(xù)有增無減。您還需要能夠隨著流量上下浮動(dòng)自動(dòng)擴(kuò)展群集。

任務(wù)隊(duì)列的難點(diǎn)問題是容錯(cuò):

  • 你怎么知道一個(gè)進(jìn)程是否掛掉,如果掛掉了,你如何知道它何時(shí)掛掉?
  • 你如何重啟任務(wù)處理?
  • 進(jìn)程死亡時(shí),你如何檢測(cè)?

這些問題的答案是針對(duì)特定技術(shù)的。當(dāng)您處理任務(wù)隊(duì)列時(shí),這些問題的答案對(duì)您的技術(shù)選擇至關(guān)重要。

為什么不批處理?

一個(gè)常見的問題是為什么要使用實(shí)時(shí)系統(tǒng)而不是批處理系統(tǒng)?批處理系統(tǒng)將具有固有的周轉(zhuǎn)時(shí)間。對(duì)于30秒的處理時(shí)間,您可以花費(fèi)5-10秒等待分布式系統(tǒng)分配和啟動(dòng)資源。實(shí)時(shí)任務(wù)隊(duì)列的關(guān)鍵之一是結(jié)果的速度。批處理系統(tǒng)處理此數(shù)據(jù)的效率太低。

使用卡夫卡的任務(wù)隊(duì)列

現(xiàn)在你了解了任務(wù)隊(duì)列以及與它們相關(guān)的難點(diǎn),讓我們專門用Apache Kafka創(chuàng)建任務(wù)隊(duì)列。

高水位和任務(wù)隊(duì)列

在了解如何在Kafka中創(chuàng)建任務(wù)隊(duì)列之前,您需要了解Kafka消費(fèi)者如何標(biāo)記他們已經(jīng)消費(fèi)了消息??ǚ蚩ㄏM(fèi)者通過提交偏移量來執(zhí)行此任務(wù)。 Kafka消費(fèi)者進(jìn)程使用commitSync或commitAsync方法。這些方法使用關(guān)于主題、數(shù)據(jù)分區(qū)的Map函數(shù),用偏移量(offset)作為參數(shù)。

卡夫卡消費(fèi)者使用所謂的高水位來記錄消費(fèi)。這意味著消費(fèi)者只能說,“我已經(jīng)處理到了這一點(diǎn)”,而不是“我已經(jīng)處理完了這個(gè)消息。” 這是卡夫卡和其他工具的一個(gè)重要區(qū)別。 Kafka沒有內(nèi)置的方式確認(rèn)單個(gè)消息是否處理。

消費(fèi)者偏移量使用的這種高水位方法,意味著無法找到單個(gè)的錯(cuò)誤。例如,如果消費(fèi)者正在處理來自同一分區(qū)的兩個(gè)任務(wù),而其中一個(gè)失敗了,那么Kafka缺乏內(nèi)置的能力來說哪一個(gè)失敗了,哪一個(gè)成功了??ǚ蚩蛻舳丝梢哉f我已經(jīng)處理到了這一點(diǎn),而無關(guān)工作成功與否。

要解決這種局限性,你需要將每個(gè)分區(qū)視為它自己的工作“線程”。每個(gè)分區(qū)將限制為一次處理一件事。當(dāng)消費(fèi)者完成這項(xiàng)工作時(shí),它會(huì)調(diào)用commitSync將處理標(biāo)記為已完成(提高水位)。

由于您要在分區(qū)中保持長(zhǎng)時(shí)間運(yùn)行的工作,因此您必須創(chuàng)建更多分區(qū)來有效地處理數(shù)據(jù)。雖然您可能已經(jīng)開始使用20-30個(gè)分區(qū),但您可能會(huì)使用100個(gè)分區(qū)。這個(gè)分區(qū)的數(shù)量是為Kafka準(zhǔn)備的,因此消費(fèi)者組將有足夠的分區(qū)來有效地分配負(fù)載。

不言而喻的是,您需要根據(jù)您將要做的工作量來擴(kuò)大您的消費(fèi)者集群。

管理自己的提交

你會(huì)注意到我多次使用“內(nèi)置”這個(gè)詞。這是因?yàn)檫€有另一種選擇不是Kafka內(nèi)置的。你必須編寫所有這些代碼,處理方式是你自定義的。

如您所見,Kafka消費(fèi)者的問題是他們的高水位限制。您可以通過編程方式開始處理消費(fèi)者的偏移量。最簡(jiǎn)單的方法是使用數(shù)據(jù)庫(kù)。你會(huì)關(guān)閉Kafka的自動(dòng)偏移提交。你可以在數(shù)據(jù)庫(kù)而不是Kafka中進(jìn)行偏移的更新插入操作。這些更新插入將基于每個(gè)主題、分區(qū)、偏移量以及當(dāng)前狀態(tài)來完成。

當(dāng)消費(fèi)者進(jìn)程重新啟動(dòng)時(shí),消費(fèi)者需要知道其分區(qū)是如何指派的。它會(huì)進(jìn)行數(shù)據(jù)庫(kù)查找以查找最后一個(gè)偏移量及其狀態(tài)。如果最后一個(gè)偏移量有錯(cuò)誤,消費(fèi)者進(jìn)程將開始處理該消息。

雖然這會(huì)增加更多的編程開銷,但這是我與團(tuán)隊(duì)合作時(shí)推薦的方法。

Pulsar的任務(wù)隊(duì)列

既然您已經(jīng)了解了如何使用Kafka創(chuàng)建任務(wù)隊(duì)列,那么讓我們看看它和Apache Pulsar比較起來是怎樣的。

Pulsar中的選擇性確認(rèn)(Selective Acking,SACK)

我們之前了解過卡夫卡的高水位特性。 Pulsar支持這種類型的確認(rèn),另一種類型稱為選擇性確認(rèn)。選擇性確認(rèn)允許消費(fèi)者僅確認(rèn)單個(gè)消息。您可以在此了解有關(guān)選擇性確認(rèn)的更多信息。

當(dāng)談到任務(wù)隊(duì)列時(shí),選擇性確認(rèn)確實(shí)改變了游戲規(guī)則。通過任務(wù)隊(duì)列,我們??能夠確認(rèn)我們已經(jīng)處理了該消息。為此,我們將使用acknowledgement方法。

要獲取失敗的消息,可以調(diào)用redeliverUnacknowledgedMessages(重新投遞未確認(rèn)消息)方法。這將使Pulsar重新獲得所有未經(jīng)確認(rèn)的消息。另一個(gè)名為ackTimeout(確認(rèn)超時(shí))的設(shè)置將自動(dòng)重新發(fā)送超過超時(shí)閾值的所有消息。

任務(wù)隊(duì)列有另一個(gè)好處,我以前沒有談過。即使有許多不同的分區(qū),一些分區(qū)仍然可能是熱點(diǎn),或接收大量的任務(wù)。 Pulsar使用共享訂閱更好地解決了這個(gè)問題。共享訂閱允許跨消費(fèi)者進(jìn)行循環(huán)分發(fā)。這樣可以比Kafka更均勻地分配工作。

對(duì)于Pulsar中的任務(wù)隊(duì)列,您將發(fā)布消息。此消息將由許多不同的消費(fèi)者進(jìn)程中的一個(gè)上的共享訂閱使用。消費(fèi)者將開始實(shí)際處理數(shù)據(jù)。一旦完成該處理,消費(fèi)者將選擇性地確認(rèn)該消息。它會(huì)產(chǎn)生一條消息,表明處理已經(jīng)完成。

注意:Pulsar旨在將任務(wù)隊(duì)列作為它的一種特定用例。它在雅虎是用于此目的的。這是我們看到它們差異如此巨大的重要原因。

創(chuàng)建分布式任務(wù)隊(duì)列

您選擇的消息傳遞技術(shù)確實(shí)會(huì)改變您實(shí)現(xiàn)分布式任務(wù)隊(duì)列的方式。雖然可以使用任一解決方案創(chuàng)建任務(wù)隊(duì)列,但Kafka和Pulsar有不同的創(chuàng)建方法。使用Pulsar創(chuàng)建分布式任務(wù)隊(duì)列要容易得多。

如果您有任務(wù)隊(duì)列用例,請(qǐng)確保正確使用工具。這些用例很難自己編碼去處理實(shí)際操作時(shí)與它們相關(guān)的問題。

Jesse Anderson

Jesse Anderson是Big Data Institute(大數(shù)據(jù)學(xué)院)的數(shù)據(jù)工程師,創(chuàng)意工程師和常務(wù)董事。 Jesse為員工提供大數(shù)據(jù)培訓(xùn),培訓(xùn)內(nèi)容包括Apache Kafka,Apache Hadoop和Apache Spark等尖端技術(shù)。 他教過成千上萬的學(xué)生,這些學(xué)生遍布從初創(chuàng)到財(cái)富100強(qiáng)的各種公司,從他這里獲得了數(shù)據(jù)工程師的技能。 他被廣泛認(rèn)為是該領(lǐng)域的專家,并因其新穎的教學(xué)實(shí)踐而受到廣泛認(rèn)可。 Jesse受到O’Reilly及Pragmatic Programmers的宣傳,并且吸引了類似Wall Street Journal, CNN, BBC, NPR, Engadget, and Wired這種主流媒體的報(bào)道。你可以在Jesse-Anderson.com 了解關(guān)于他的更多信息。

Cours de philosophie à Paris Grandes chroniques de France (source: Wikimedia Commons)