Ray是一個(gè)用于在計(jì)算集群上編程的通用框架。 Ray使開發(fā)人員能夠輕松地并行化他們的Python應(yīng)用程序,構(gòu)建新的應(yīng)用,在任意大小規(guī)模的集群上(從筆記本電腦到大型集群)運(yùn)行。 Ray提供了一個(gè)高度靈活,極簡主義、易于使用的API。 表1 介紹了此API的核心功能。
在這篇博客中,我們描述了幾個(gè)小技巧,可以幫助首次使用Ray的用戶避免一些可能嚴(yán)重影響其程序性能的常見錯(cuò)誤。
| API | 描述 | 示例 |
| ray.init() | 初始化Ray執(zhí)行上下文。 | |
| @ ray.remote | 函數(shù)或類裝飾器,指定 函數(shù)將作為不同進(jìn)程中的任務(wù)執(zhí)行,或者類作為不同進(jìn)程中的actor定義。 | @ray.remote? ? ? [email protected]
def fun(x):? ? ? ? ? ?class Actor(object): …? ? ? ? ? ? ? ? ? ? ? ? ? ? def method(y) … |
| .remote | Postfix到每個(gè)遠(yuǎn)程函數(shù),遠(yuǎn)程聲明類,或遠(yuǎn)程進(jìn)行類方法的調(diào)用。 遠(yuǎn)程操作是異步。 | ret_id = fun.remote(x)
a = Actor.remote() ret_id = a.method.remote(y) |
| ray.put() | 將對象存儲(chǔ)在對象庫中,并返回其ID。 此ID可用于將對象作為參數(shù)傳遞給任何遠(yuǎn)程函數(shù)或方法調(diào)用。 這是一種同步的操作。 | x_id = ray.put(x) |
| ray.get() | 從對象ID或?qū)ο驣D列表返回對象或?qū)ο罅斜怼?這是一種 同步 (即阻塞)操作。 | x = ray.get(x_id)
… objects = ray.get(object_ids) |
| ray.wait() | 從對象ID列表返回(1)準(zhǔn)備好的對象的ID列表,以及(2)尚未準(zhǔn)備好的對象的ID列表。 默認(rèn)情況下,它一次返回一個(gè)已準(zhǔn)備好的對象ID。 | ready_ids, not_ready_ids = ???ray.wait(object_ids) |
表1 :我們在此博客中使用的核心Ray API。 完整的API可 在此處獲得
本博客中報(bào)告的所有結(jié)果都是在配備2.7 GHz Core i7 CPU和16GB RAM的13英寸MacBook Pro上獲得的。 雖然 ray.init() 在單個(gè)機(jī)器上運(yùn)行時(shí)會(huì)自動(dòng)檢測核心數(shù),但為了減少運(yùn)行下面代碼時(shí)在機(jī)器上觀察到的結(jié)果的變化性,這里我們指定num_cpus = 4,即一臺(tái)機(jī)器指定使用4個(gè)CPU。 由于每個(gè)任務(wù)默認(rèn)請求一個(gè)CPU,因此該設(shè)置允許我們并行執(zhí)行最多四個(gè)任務(wù)。 因此,我們的Ray系統(tǒng)由一個(gè)執(zhí)行程序的驅(qū)動(dòng)程序和最多四個(gè)運(yùn)行遠(yuǎn)程任務(wù)或actor的工作程序組成。
延遲執(zhí)行ray.get()
使用Ray,每個(gè)遠(yuǎn)程操作(例如,執(zhí)行任務(wù),定義actor方法)的調(diào)用都是異步的。 這意味著操作 立即 返回 promise / future,它實(shí)際上是操作結(jié)果的標(biāo)識(shí)符(ID)。 這是實(shí)現(xiàn)并行性的關(guān)鍵,因?yàn)樗试S驅(qū)動(dòng)程序并行啟動(dòng)多個(gè)操作。 要獲得實(shí)際結(jié)果,程序員需要 在結(jié)果的ID上 調(diào)用 ray.get() 。 此調(diào)用將阻塞,直到結(jié)果可用。 作為副作用,此操作還會(huì)阻止驅(qū)動(dòng)程序調(diào)用其他操作,這可能會(huì)損害并行性。
不幸的是,新的Ray用戶會(huì)很自然地?zé)o意中使用了 ray.get()。 為了說明這一點(diǎn),請考慮以下簡單的Python代碼,該代碼調(diào)用 do_some_work() 函數(shù)四次,每次調(diào)用大約需要1秒:
import time
def do_some_work(x):
? ? time.sleep(1) # 把這句替換為你想執(zhí)行的代碼.
? ? return x
start = time.time()
results = [do_some_work(x) for x in range(4)]
print(“duration =”, time.time() – start, “\nresults = “, results)
程序執(zhí)行的輸出如下。 正如預(yù)期的那樣,該計(jì)劃大約需要4秒:
duration = 4.0149290561676025?
results =? [0, 1, 2, 3]
現(xiàn)在,讓我們將上述程序在Ray中并行化。 一些初次使用的用戶只做了一個(gè)改動(dòng),就是把函數(shù)設(shè)置為遠(yuǎn)程執(zhí)行,即
import time
import ray
ray.init(num_cpus = 4) # 此處定義了系統(tǒng)有4個(gè)CPU核.
@ray.remote
def do_some_work(x):
? ? time.sleep(1) # 把這句.
? ? return x
start = time.time()
results = [do_some_work.remote(x) for x in range(4)]
print(“duration =”, time.time() – start, “\nresults = “, results)
但是,執(zhí)行上述程序時(shí),會(huì)得到:
duration = 0.0003619194030761719?
results =? [ObjectID(0100000000bdf683fc3e45db42685232b19d2a61), ObjectID(01000000da69c40e1c2f43b391443ce23de46cda), ObjectID(010000007fe0954ac2b3c0ab991538043e8f37e0), ObjectID(01000000cf47d5ecd1e26b42624454c795abe89b)]
在查看此輸出時(shí),顯示了兩件事情。 首先,程序立即完成,即在不到1毫秒內(nèi)完成。 其次,我們得到一堆標(biāo)識(shí)符,而不是預(yù)期的結(jié)果(即[0,1,2,3])。 當(dāng)然,這應(yīng)該不足為奇。 回想一下,遠(yuǎn)程操作是異步的,它們返回future(即對象ID)而不是結(jié)果本身。 這正是我們在這里看到的。 我們只測量調(diào)用啟動(dòng)任務(wù)所花費(fèi)的時(shí)間,而不是它們的運(yùn)行時(shí)間,并且我們得到與四個(gè)任務(wù)相對應(yīng)的結(jié)果的ID。
為了得到實(shí)際結(jié)果,我們需要使用 ray.get() ,這里的第一個(gè)靈感是 在遠(yuǎn)程操作調(diào)用上 調(diào)用 ray.get() ,即替換掉這一行“ results = [do_some_work .remote (x) for x in range(4)] “,把它用
?results = [ray.get(do_some_work.remote(x)) for x in range(4)]
來代替。( 注意:您必須只運(yùn)行一次ray.init() 。如果您在Python解釋器的同一實(shí)例中第二次運(yùn)行它, 您將收到錯(cuò)誤 。 )
通過在此更改后重新運(yùn)行程序,我們得到:
duration = 4.018050909042358?
results =? [0, 1, 2, 3]
所以現(xiàn)在結(jié)果是正確的,但它仍然需要4秒,所以沒有加速! 這是怎么回事? 細(xì)心的讀者已經(jīng)有了答案: ray.get() 是阻塞的,所以在每次遠(yuǎn)程操作后調(diào)用它意味著我們等待該操作完成,這實(shí)際上意味著我們一次執(zhí)行一個(gè)操作,因此沒有并行性!
要啟用并行性,我們需要 在 調(diào)用所有任務(wù) 后 調(diào)用 ray.get() 。 我們可以在我們的示例中通過使用以下代碼替換“ results = [do_some_work .remote (x)for x in range(4)] ”來輕松完成此操作:
? results = ray.get([do_some_work.remote(x)for x in range(4)])
通過在此更改后重新運(yùn)行程序,我們現(xiàn)在得到:
?持續(xù)時(shí)間= 1.0064549446105957?
結(jié)果= [0,1,2,3]
最后,成功! 我們的Ray程序現(xiàn)在只運(yùn)行1秒,這意味著 do_some_work()的 所有調(diào)用 都是并行運(yùn)行的。
總而言之,請記住, ray.get() 是一個(gè)阻塞操作,因此如果急切地調(diào)用它會(huì)損害并行性。 相反,您應(yīng)該嘗試編寫程序,以便盡可能晚地調(diào)用ray.get()。
| 提示1: 盡可能延遲調(diào)用ray.get()。 |
規(guī)避過于微小的任務(wù)
當(dāng)?shù)谝淮伍_發(fā)人員想要將他們的代碼與Ray并行化時(shí),自然的本能就是使每個(gè)函數(shù)或類都遠(yuǎn)程。 不幸的是,這會(huì)導(dǎo)致不良后果; 如果任務(wù)非常小,則Ray程序可能 比等效的Python程序 花費(fèi) 更長的時(shí)間 。
讓我們再次考慮上面的例子,但這次我們使任務(wù)更加短?。?,每個(gè)只需0.1ms),并將任務(wù)調(diào)用的數(shù)量急劇增加到100,000。
import time
def tiny_work(x):
? ? time.sleep(0.0001) # 把這行用你的代碼替換
? ? return x
start = time.time()
results = [tiny_work(x) for x in range(100000)]
print(“duration =”, time.time() – start)
通過運(yùn)行此程序,我們得到:
duration = 13.36544418334961
這個(gè)結(jié)果應(yīng)該是可預(yù)期的,因?yàn)閳?zhí)行100,000個(gè)任務(wù)的下限每個(gè)需要0.1毫秒,這里顯示是10秒,算上其他開銷,如函數(shù)調(diào)用等等結(jié)果是合理的。
現(xiàn)在讓我們使用Ray并行調(diào)用 do_some_work() 遠(yuǎn)程 調(diào)用此代碼 :
import time
import ray
ray.init(num_cpus = 4)
@ray.remote
def tiny_work(x):
? ? time.sleep(0.0001) # 把這行用你的代碼替換.
? ? return x
start = time.time()
result_ids = [tiny_work.remote(x) for x in range(100000)]
results = ray.get(result_ids)
print(“duration =”, time.time() – start)
運(yùn)行此代碼的結(jié)果是:
duration = 27.46447515487671
令人驚訝的是,不僅Ray沒有改善執(zhí)行時(shí)間,而且Ray程序?qū)嶋H上比順序程序慢! 這是怎么回事? 嗯,這里的問題是每個(gè)任務(wù)調(diào)用都有一個(gè)無法忽略的開銷(例如,調(diào)度,進(jìn)程間通信,更新系統(tǒng)狀態(tài)),這個(gè)開銷占據(jù)了執(zhí)行任務(wù)所需的實(shí)際時(shí)間。
加速此程序的一種方法是使遠(yuǎn)程任務(wù)更大,以便分?jǐn)傉{(diào)用開銷。 這是一個(gè)可能的解決方案,我們 在一個(gè)更大的遠(yuǎn)程函數(shù) mega_work()中 聚合1000個(gè) tiny_work() 函數(shù)調(diào)用 :
import time
import ray
ray.init(num_cpus = 4)
def tiny_work(x):
? ? time.sleep(0.0001) # 把這行用你的代碼替換
? ? return x
@ray.remote
def mega_work(start, end):
? ? return [tiny_work(x) for x in range(start, end)]
start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print(“duration =”, time.time() – start)
現(xiàn)在,如果我們運(yùn)行上述程序,我們得到:
duration = 3.2539820671081543
這大約是順序執(zhí)行的四分之一,符合我們的預(yù)期(回想一下,我們可以并行執(zhí)行四個(gè)任務(wù))。 當(dāng)然,一個(gè)很自然的疑問是,對于任務(wù)來說,分?jǐn)傔h(yuǎn)程調(diào)用開銷的大小是多大。 找到這個(gè)的一種方法是運(yùn)行以下簡單程序來估計(jì)每個(gè)任務(wù)的調(diào)用開銷:
?@ray.remote
def no_work(x):
? ? return x?
start = time.time()
num_calls = 1000
[ray.get(no_work.remote(x)) for x in range(num_calls)]
print(“per task overhead (ms) =”, (time.time() – start)*1000/num_calls)
運(yùn)行上述程序顯示:
per task overhead (ms) = 0.4739549160003662?
換句話說,執(zhí)行空任務(wù)需要將近半毫秒。 這表明我們需要確保任務(wù)至少花費(fèi)幾毫秒來分?jǐn)傉{(diào)用開銷。 需要注意的是,每個(gè)任務(wù)的開銷因機(jī)器而異,以及在同一臺(tái)機(jī)器上運(yùn)行的任務(wù)與遠(yuǎn)程運(yùn)行的任務(wù)之間會(huì)有所不同。 這就是說,在開發(fā)Ray程序時(shí),確保任務(wù)至少需要幾毫秒是一個(gè)很好的經(jīng)驗(yàn)法則。
| 提示2: 為了高效利用Ray的并行性,遠(yuǎn)程任務(wù)應(yīng)該至少需要幾毫秒。 |
避免將同一對象重復(fù)傳遞給遠(yuǎn)程任務(wù)
當(dāng)我們將一個(gè)大對象作為參數(shù)傳遞給遠(yuǎn)程函數(shù)時(shí),Ray調(diào)用引擎 下的 ray.put() 將該對象存儲(chǔ)在本地對象庫中。 當(dāng)遠(yuǎn)程任務(wù)在本地執(zhí)行時(shí),這可以顯著提高遠(yuǎn)程任務(wù)調(diào)用的性能,因?yàn)樗斜镜厝蝿?wù)都共享對象存儲(chǔ)。 但是,有時(shí)在任務(wù)調(diào)用時(shí)自動(dòng)調(diào)用ray.put()會(huì)導(dǎo)致性能問題。 例如, 重復(fù) 傳遞相同的大對象作為參數(shù) ,如下面的程序所示:
import time?
import numpy as np?
import ray?
ray.init(num_cpus = 4)?
@ray.remote?
def no_work(a):?
? ? return?
start = time.time()?
a = np.zeros((10000, 2000))?
result_ids = [no_work.remote(a) for x in range(10)]?
results = ray.get(result_ids)?
print(“duration =”, time.time() – start)?
該程序輸出:
duration = 1.0699057579040527?
( 注意:如果此程序花費(fèi)的時(shí)間超過1秒,可能是因?yàn)槟臋C(jī)器沒有足夠的內(nèi)存。如果是這種情況,請停止此程序并運(yùn)行下一個(gè)程序。 )
對于只調(diào)用10個(gè)不執(zhí)行任何操作的遠(yuǎn)程任務(wù)的程序,此運(yùn)行時(shí)間非常大。 這種出人意料的高運(yùn)行時(shí)間的原因是每次調(diào)用 no_work(a)時(shí), Ray都會(huì)調(diào)用 ray.put(a) ,這會(huì)導(dǎo)致將數(shù)組 a 復(fù)制 到對象存儲(chǔ)庫。 由于數(shù)組a 有2000萬行,復(fù)制它需要花費(fèi)很多時(shí)間。 為了避免每次調(diào)用 no_work() 時(shí) 復(fù)制數(shù)組 ,一個(gè)簡單的解決方案是顯式調(diào)用 ray.put(a) ,然后將 一個(gè) ID 傳遞 給 no_work() ,如下所示:
import time?
import numpy as np?
import ray?
ray.init(num_cpus = 4)?
@ray.remote?
def no_work(a):?
? ? return?
start = time.time()?
a_id = ray.put(np.zeros((10000, 2000)))?
result_ids = [no_work.remote(a_id) for x in range(10)]?
results = ray.get(result_ids)?
print(“duration =”, time.time() – start)?
運(yùn)行此程序僅需:
?duration = 0.12425804138183594
這比預(yù)期的原始程序快8倍,因?yàn)檎{(diào)用 no_work(a) 的主要開銷 是將數(shù)組a復(fù)制到對象存儲(chǔ),現(xiàn)在只發(fā)生一次。
可以說,避免同一對象的多個(gè)副本到對象存儲(chǔ)器的一個(gè)更重要的優(yōu)點(diǎn)是它排除了對象存儲(chǔ)器過早填滿并導(dǎo)致對象回收的成本。
| 技巧3: 將同一個(gè)對象作為參數(shù)重復(fù)傳遞給遠(yuǎn)程操作時(shí),使用ray.put()將其存儲(chǔ)在對象庫中一次,然后傳遞其ID。 |
將數(shù)據(jù)處理形成管線
如果我們對多個(gè)任務(wù)的結(jié)果使用ray.get() ,我們將不得不等到 這些任務(wù)中 的 最后 一個(gè)完成。 如果任務(wù)花費(fèi)的時(shí)間差異很大,這可能會(huì)成為一個(gè)問題。 為了說明這個(gè)問題,請考慮以下示例,其中我們 并行 運(yùn)行四個(gè) do_some_work() 任務(wù),每個(gè)任務(wù)在0到4秒之間均勻分布。 接下來,假設(shè)這些任務(wù)的結(jié)果由 process_results()處理 ,每個(gè)結(jié)果需要1秒。 然后,預(yù)期的運(yùn)行時(shí)間是(1)執(zhí)行最慢的 do_some_work() 任務(wù)所花費(fèi)的時(shí)間加上(2)4秒,這是執(zhí)行 process_results()所 花費(fèi)的時(shí)間 。
import time?
import random?
import ray?
ray.init(num_cpus = 4)?
@ray.remote?
def do_some_work(x):?
? ? time.sleep(random.uniform(0, 4)) # 將此行替換為你要執(zhí)行的代碼.?
? ? return x?
def process_results(results):?
? ? sum = 0?
? ? for x in results:?
? ? ? ? time.sleep(1) # 將此行替換為你要執(zhí)行的代碼.?
? ? ? ? sum += x?
? ? return sum?
start = time.time()?
data_list = ray.get([do_some_work.remote(x) for x in range(4)])?
sum = process_results(data_list)?
print(“duration =”, time.time() – start, “\nresult = “, sum)?
程序的輸出顯示運(yùn)行需要接近8秒:
duration = 7.82636022567749
result = 6
當(dāng)其他任務(wù)可能早已完成時(shí),等待最后一個(gè)任務(wù)完成不必要地增加了程序運(yùn)行時(shí)間。 更好的解決方案是在數(shù)據(jù) 可用后立即處理 。 幸運(yùn)的是,Ray允許您通過 在對象ID列表上 調(diào)用 ray.wait() 來完成此操作 。 在不指定任何其他參數(shù)的情況下,只要參數(shù)列表中的對象準(zhǔn)備就緒,此函數(shù)就會(huì)返回。 此調(diào)用有兩個(gè)返回:(1)就緒對象的ID,以及(2)包含尚未準(zhǔn)備好的對象的ID的列表。 修改后的程序如下。 請注意,我們需要做的一件事就是將 process_results() 替換 為一次處理一個(gè)結(jié)果的 process_incremental() 。
進(jìn)口時(shí)間?
隨機(jī)導(dǎo)入?
導(dǎo)入光線?
import time?
import random?
import ray?
ray.init(num_cpus = 4)?
@ray.remote?
def do_some_work(x):?
? ? time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.?
? ? return x?
def process_incremental(sum, result):?
? ? time.sleep(1) # Replace this with some processing code.?
? ? return sum + result?
start = time.time()?
result_ids = [do_some_work.remote(x) for x in range(4)]?
sum = 0?
while len(result_ids):?
? ? done_id, result_ids = ray.wait(result_ids)?
? ? sum = process_incremental(sum, ray.get(done_id[0]))?
print(“duration =”, time.time() – start, “\nresult = “, sum)?
這個(gè)程序現(xiàn)在只需要4.8秒了,是一個(gè)顯著改進(jìn):
duration = 4.852453231811523?
result = 6?
( 注意:不同的運(yùn)行可能需要不同的時(shí)間,但“duration”仍應(yīng)顯著小于8秒。 )
為了幫助直覺理解, 圖1 顯示了兩種情況下的執(zhí)行時(shí)間表:使用 ray.get() 等待所有結(jié)果變?yōu)榭捎煤笤偬幚恚蛘呤褂?ray.wait() 在所有結(jié)果全部可用前就開始處理。

圖1 :(a) 在調(diào)用 process_results() 之前 使用 ray.get() 等待 do_some_work() 任務(wù)的所有結(jié)果完成后再處理的執(zhí)行時(shí)間表 。 (b)使用 ray.wait() 在部分結(jié)果可用時(shí)立即處理的 行時(shí)間表
| 技巧4: 使用ray.wait()可以在結(jié)果可用時(shí)立即處理。 |
其他資源
Ray教程是了解有關(guān)Ray編程的更多信息的絕佳資源。
ION STOICA
Ion Stoica是加州大學(xué)伯克利分校電子工程和計(jì)算機(jī)科學(xué)(EECS)系的教授,他在那里研究云計(jì)算和網(wǎng)絡(luò)計(jì)算機(jī)系統(tǒng)。之前他從事動(dòng)態(tài)數(shù)據(jù)包狀態(tài)、弦DHT、網(wǎng)間迂回基礎(chǔ)設(shè)施(i3)、聲明性網(wǎng)絡(luò)及大型系統(tǒng),包括Apache Spark、Apache Mesos和Alluxio。 他是Databricks及Conviva的聯(lián)合創(chuàng)始人,Databricks是一家將Apache Spark商業(yè)化的初創(chuàng)公司,后者是一家將大規(guī)模視頻分發(fā)技術(shù)商業(yè)化的初創(chuàng)公司。Ion是一位ACM研究員,并獲得了眾多獎(jiǎng)項(xiàng),包括SIGOPS名人堂(2015年)、SIGCOMM時(shí)間測試獎(jiǎng)(2011年)和ACM博士論文獎(jiǎng)(2001年)。

