91精品国产综合久久四虎久久_国产成人午夜高潮毛片_99er视频精品免费观看_2020亚洲熟女在线观看_日本女优人体写真_国内黄色毛片_年轻的老师中文版在线_丰满女邻居做爰_久久久久久精品成人免费图片

Ray編程:給新用戶的小指南
編者注:本文作者將在北京人工智能大會(huì)上帶來教學(xué)輔導(dǎo)課Building reinforcement learning models and AI applications with Ray。

Ray是一個(gè)用于在計(jì)算集群上編程的通用框架。 Ray使開發(fā)人員能夠輕松地并行化他們的Python應(yīng)用程序,構(gòu)建新的應(yīng)用,在任意大小規(guī)模的集群上(從筆記本電腦到大型集群)運(yùn)行。 Ray提供了一個(gè)高度靈活,極簡主義、易于使用的API。 表1 介紹了此API的核心功能。

在這篇博客中,我們描述了幾個(gè)小技巧,可以幫助首次使用Ray的用戶避免一些可能嚴(yán)重影響其程序性能的常見錯(cuò)誤。

API 描述 示例
ray.init() 初始化Ray執(zhí)行上下文。
@ ray.remote 函數(shù)或類裝飾器,指定 函數(shù)將作為不同進(jìn)程中的任務(wù)執(zhí)行,或者類作為不同進(jìn)程中的actor定義。 @ray.remote? ? ? [email protected]

def fun(x):? ? ? ? ? ?class Actor(object):

…? ? ? ? ? ? ? ? ? ? ? ? ? ? def method(y)

.remote Postfix到每個(gè)遠(yuǎn)程函數(shù),遠(yuǎn)程聲明類,或遠(yuǎn)程進(jìn)行類方法的調(diào)用。 遠(yuǎn)程操作是異步。 ret_id = fun.remote(x)

a = Actor.remote()

ret_id = a.method.remote(y)

ray.put() 將對象存儲(chǔ)在對象庫中,并返回其ID。 此ID可用于將對象作為參數(shù)傳遞給任何遠(yuǎn)程函數(shù)或方法調(diào)用。 這是一種同步的操作。 x_id = ray.put(x)
ray.get() 從對象ID或?qū)ο驣D列表返回對象或?qū)ο罅斜怼?這是一種 同步 (即阻塞)操作。 x = ray.get(x_id)

objects = ray.get(object_ids)

ray.wait() 從對象ID列表返回(1)準(zhǔn)備好的對象的ID列表,以及(2)尚未準(zhǔn)備好的對象的ID列表。 默認(rèn)情況下,它一次返回一個(gè)已準(zhǔn)備好的對象ID。 ready_ids, not_ready_ids = ???ray.wait(object_ids)
1 :我們在此博客中使用的核心Ray API。 完整的API可 在此處獲得

本博客中報(bào)告的所有結(jié)果都是在配備2.7 GHz Core i7 CPU和16GB RAM的13英寸MacBook Pro上獲得的。 雖然 ray.init() 在單個(gè)機(jī)器上運(yùn)行時(shí)會(huì)自動(dòng)檢測核心數(shù),但為了減少運(yùn)行下面代碼時(shí)在機(jī)器上觀察到的結(jié)果的變化性,這里我們指定num_cpus = 4,即一臺(tái)機(jī)器指定使用4個(gè)CPU。 由于每個(gè)任務(wù)默認(rèn)請求一個(gè)CPU,因此該設(shè)置允許我們并行執(zhí)行最多四個(gè)任務(wù)。 因此,我們的Ray系統(tǒng)由一個(gè)執(zhí)行程序的驅(qū)動(dòng)程序和最多四個(gè)運(yùn)行遠(yuǎn)程任務(wù)或actor的工作程序組成。

延遲執(zhí)行ray.get()

使用Ray,每個(gè)遠(yuǎn)程操作(例如,執(zhí)行任務(wù),定義actor方法)的調(diào)用都是異步的。 這意味著操作 立即 返回 promise / future,它實(shí)際上是操作結(jié)果的標(biāo)識(shí)符(ID)。 這是實(shí)現(xiàn)并行性的關(guān)鍵,因?yàn)樗试S驅(qū)動(dòng)程序并行啟動(dòng)多個(gè)操作。 要獲得實(shí)際結(jié)果,程序員需要 在結(jié)果的ID上 調(diào)用 ray.get() 。 此調(diào)用將阻塞,直到結(jié)果可用。 作為副作用,此操作還會(huì)阻止驅(qū)動(dòng)程序調(diào)用其他操作,這可能會(huì)損害并行性。

不幸的是,新的Ray用戶會(huì)很自然地?zé)o意中使用了 ray.get()。 為了說明這一點(diǎn),請考慮以下簡單的Python代碼,該代碼調(diào)用 do_some_work() 函數(shù)四次,每次調(diào)用大約需要1秒:

import time

def do_some_work(x):

? ? time.sleep(1) # 把這句替換為你想執(zhí)行的代碼.

? ? return x

start = time.time()

results = [do_some_work(x) for x in range(4)]

print(“duration =”, time.time() – start, “\nresults = “, results)

程序執(zhí)行的輸出如下。 正如預(yù)期的那樣,該計(jì)劃大約需要4秒:

duration = 4.0149290561676025?

results =? [0, 1, 2, 3]

現(xiàn)在,讓我們將上述程序在Ray中并行化。 一些初次使用的用戶只做了一個(gè)改動(dòng),就是把函數(shù)設(shè)置為遠(yuǎn)程執(zhí)行,即

import time

import ray

ray.init(num_cpus = 4) # 此處定義了系統(tǒng)有4個(gè)CPU核.

@ray.remote

def do_some_work(x):

? ? time.sleep(1) # 把這句.

? ? return x

start = time.time()

results = [do_some_work.remote(x) for x in range(4)]

print(“duration =”, time.time() – start, “\nresults = “, results)

但是,執(zhí)行上述程序時(shí),會(huì)得到:

duration = 0.0003619194030761719?

results =? [ObjectID(0100000000bdf683fc3e45db42685232b19d2a61), ObjectID(01000000da69c40e1c2f43b391443ce23de46cda), ObjectID(010000007fe0954ac2b3c0ab991538043e8f37e0), ObjectID(01000000cf47d5ecd1e26b42624454c795abe89b)]

在查看此輸出時(shí),顯示了兩件事情。 首先,程序立即完成,即在不到1毫秒內(nèi)完成。 其次,我們得到一堆標(biāo)識(shí)符,而不是預(yù)期的結(jié)果(即[0,1,2,3])。 當(dāng)然,這應(yīng)該不足為奇。 回想一下,遠(yuǎn)程操作是異步的,它們返回future(即對象ID)而不是結(jié)果本身。 這正是我們在這里看到的。 我們只測量調(diào)用啟動(dòng)任務(wù)所花費(fèi)的時(shí)間,而不是它們的運(yùn)行時(shí)間,并且我們得到與四個(gè)任務(wù)相對應(yīng)的結(jié)果的ID。

為了得到實(shí)際結(jié)果,我們需要使用 ray.get() ,這里的第一個(gè)靈感是 在遠(yuǎn)程操作調(diào)用上 調(diào)用 ray.get() ,即替換掉這一行“ results = [do_some_work .remote (x) for x in range(4)] “,把它用

?results = [ray.get(do_some_work.remote(x)) for x in range(4)]

來代替。( 注意:您必須只運(yùn)行一次ray.init() 。如果您在Python解釋器的同一實(shí)例中第二次運(yùn)行它, 您將收到錯(cuò)誤 。 )

通過在此更改后重新運(yùn)行程序,我們得到:

duration = 4.018050909042358?

results =? [0, 1, 2, 3]

所以現(xiàn)在結(jié)果是正確的,但它仍然需要4秒,所以沒有加速! 這是怎么回事? 細(xì)心的讀者已經(jīng)有了答案: ray.get() 是阻塞的,所以在每次遠(yuǎn)程操作后調(diào)用它意味著我們等待該操作完成,這實(shí)際上意味著我們一次執(zhí)行一個(gè)操作,因此沒有并行性!

要啟用并行性,我們需要 在 調(diào)用所有任務(wù) 后 調(diào)用 ray.get() 。 我們可以在我們的示例中通過使用以下代碼替換“ results = [do_some_work .remote (x)for x in range(4)] ”來輕松完成此操作:

? results = ray.get([do_some_work.remote(x)for x in range(4)])

通過在此更改后重新運(yùn)行程序,我們現(xiàn)在得到:

?持續(xù)時(shí)間= 1.0064549446105957?

結(jié)果= [0,1,2,3]

最后,成功! 我們的Ray程序現(xiàn)在只運(yùn)行1秒,這意味著 do_some_work()的 所有調(diào)用 都是并行運(yùn)行的。

總而言之,請記住, ray.get() 是一個(gè)阻塞操作,因此如果急切地調(diào)用它會(huì)損害并行性。 相反,您應(yīng)該嘗試編寫程序,以便盡可能晚地調(diào)用ray.get()。

提示1: 盡可能延遲調(diào)用ray.get()。

規(guī)避過于微小的任務(wù)

當(dāng)?shù)谝淮伍_發(fā)人員想要將他們的代碼與Ray并行化時(shí),自然的本能就是使每個(gè)函數(shù)或類都遠(yuǎn)程。 不幸的是,這會(huì)導(dǎo)致不良后果; 如果任務(wù)非常小,則Ray程序可能 比等效的Python程序 花費(fèi) 更長的時(shí)間 。

讓我們再次考慮上面的例子,但這次我們使任務(wù)更加短?。?,每個(gè)只需0.1ms),并將任務(wù)調(diào)用的數(shù)量急劇增加到100,000。

import time

def tiny_work(x):

? ? time.sleep(0.0001) # 把這行用你的代碼替換

? ? return x

start = time.time()

results = [tiny_work(x) for x in range(100000)]

print(“duration =”, time.time() – start)

通過運(yùn)行此程序,我們得到:

duration = 13.36544418334961

這個(gè)結(jié)果應(yīng)該是可預(yù)期的,因?yàn)閳?zhí)行100,000個(gè)任務(wù)的下限每個(gè)需要0.1毫秒,這里顯示是10秒,算上其他開銷,如函數(shù)調(diào)用等等結(jié)果是合理的。

現(xiàn)在讓我們使用Ray并行調(diào)用 do_some_work() 遠(yuǎn)程 調(diào)用此代碼 :

import time

import ray

ray.init(num_cpus = 4)

@ray.remote

def tiny_work(x):

? ? time.sleep(0.0001) # 把這行用你的代碼替換.

? ? return x

start = time.time()

result_ids = [tiny_work.remote(x) for x in range(100000)]

results = ray.get(result_ids)

print(“duration =”, time.time() – start)

運(yùn)行此代碼的結(jié)果是:

duration = 27.46447515487671

令人驚訝的是,不僅Ray沒有改善執(zhí)行時(shí)間,而且Ray程序?qū)嶋H上比順序程序慢! 這是怎么回事? 嗯,這里的問題是每個(gè)任務(wù)調(diào)用都有一個(gè)無法忽略的開銷(例如,調(diào)度,進(jìn)程間通信,更新系統(tǒng)狀態(tài)),這個(gè)開銷占據(jù)了執(zhí)行任務(wù)所需的實(shí)際時(shí)間。

加速此程序的一種方法是使遠(yuǎn)程任務(wù)更大,以便分?jǐn)傉{(diào)用開銷。 這是一個(gè)可能的解決方案,我們 在一個(gè)更大的遠(yuǎn)程函數(shù) mega_work()中 聚合1000個(gè) tiny_work() 函數(shù)調(diào)用 :

import time

import ray

ray.init(num_cpus = 4)

def tiny_work(x):

? ? time.sleep(0.0001) # 把這行用你的代碼替換

? ? return x

@ray.remote

def mega_work(start, end):

? ? return [tiny_work(x) for x in range(start, end)]

start = time.time()

result_ids = []

[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]

results = ray.get(result_ids)

print(“duration =”, time.time() – start)

現(xiàn)在,如果我們運(yùn)行上述程序,我們得到:

duration = 3.2539820671081543

這大約是順序執(zhí)行的四分之一,符合我們的預(yù)期(回想一下,我們可以并行執(zhí)行四個(gè)任務(wù))。 當(dāng)然,一個(gè)很自然的疑問是,對于任務(wù)來說,分?jǐn)傔h(yuǎn)程調(diào)用開銷的大小是多大。 找到這個(gè)的一種方法是運(yùn)行以下簡單程序來估計(jì)每個(gè)任務(wù)的調(diào)用開銷:

?@ray.remote

def no_work(x):

? ? return x?

start = time.time()

num_calls = 1000

[ray.get(no_work.remote(x)) for x in range(num_calls)]

print(“per task overhead (ms) =”, (time.time() – start)*1000/num_calls)

運(yùn)行上述程序顯示:

per task overhead (ms) = 0.4739549160003662?

換句話說,執(zhí)行空任務(wù)需要將近半毫秒。 這表明我們需要確保任務(wù)至少花費(fèi)幾毫秒來分?jǐn)傉{(diào)用開銷。 需要注意的是,每個(gè)任務(wù)的開銷因機(jī)器而異,以及在同一臺(tái)機(jī)器上運(yùn)行的任務(wù)與遠(yuǎn)程運(yùn)行的任務(wù)之間會(huì)有所不同。 這就是說,在開發(fā)Ray程序時(shí),確保任務(wù)至少需要幾毫秒是一個(gè)很好的經(jīng)驗(yàn)法則。

提示2: 為了高效利用Ray的并行性,遠(yuǎn)程任務(wù)應(yīng)該至少需要幾毫秒。

避免將同一對象重復(fù)傳遞給遠(yuǎn)程任務(wù)

當(dāng)我們將一個(gè)大對象作為參數(shù)傳遞給遠(yuǎn)程函數(shù)時(shí),Ray調(diào)用引擎 下的 ray.put() 將該對象存儲(chǔ)在本地對象庫中。 當(dāng)遠(yuǎn)程任務(wù)在本地執(zhí)行時(shí),這可以顯著提高遠(yuǎn)程任務(wù)調(diào)用的性能,因?yàn)樗斜镜厝蝿?wù)都共享對象存儲(chǔ)。 但是,有時(shí)在任務(wù)調(diào)用時(shí)自動(dòng)調(diào)用ray.put()會(huì)導(dǎo)致性能問題。 例如, 重復(fù) 傳遞相同的大對象作為參數(shù) ,如下面的程序所示:

import time?

import numpy as np?

import ray?

ray.init(num_cpus = 4)?

@ray.remote?

def no_work(a):?

? ? return?

start = time.time()?

a = np.zeros((10000, 2000))?

result_ids = [no_work.remote(a) for x in range(10)]?

results = ray.get(result_ids)?

print(“duration =”, time.time() – start)?

該程序輸出:

duration = 1.0699057579040527?

( 注意:如果此程序花費(fèi)的時(shí)間超過1秒,可能是因?yàn)槟臋C(jī)器沒有足夠的內(nèi)存。如果是這種情況,請停止此程序并運(yùn)行下一個(gè)程序。 )

對于只調(diào)用10個(gè)不執(zhí)行任何操作的遠(yuǎn)程任務(wù)的程序,此運(yùn)行時(shí)間非常大。 這種出人意料的高運(yùn)行時(shí)間的原因是每次調(diào)用 no_work(a)時(shí), Ray都會(huì)調(diào)用 ray.put(a) ,這會(huì)導(dǎo)致將數(shù)組 a 復(fù)制 到對象存儲(chǔ)庫。 由于數(shù)組a 有2000萬行,復(fù)制它需要花費(fèi)很多時(shí)間。 為了避免每次調(diào)用 no_work() 時(shí) 復(fù)制數(shù)組 ,一個(gè)簡單的解決方案是顯式調(diào)用 ray.put(a) ,然后將 一個(gè) ID 傳遞 給 no_work() ,如下所示:

import time?

import numpy as np?

import ray?

ray.init(num_cpus = 4)?

@ray.remote?

def no_work(a):?

? ? return?

start = time.time()?

a_id = ray.put(np.zeros((10000, 2000)))?

result_ids = [no_work.remote(a_id) for x in range(10)]?

results = ray.get(result_ids)?

print(“duration =”, time.time() – start)?

運(yùn)行此程序僅需:

?duration = 0.12425804138183594

這比預(yù)期的原始程序快8倍,因?yàn)檎{(diào)用 no_work(a) 的主要開銷 是將數(shù)組a復(fù)制到對象存儲(chǔ),現(xiàn)在只發(fā)生一次。

可以說,避免同一對象的多個(gè)副本到對象存儲(chǔ)器的一個(gè)更重要的優(yōu)點(diǎn)是它排除了對象存儲(chǔ)器過早填滿并導(dǎo)致對象回收的成本。

技巧3: 將同一個(gè)對象作為參數(shù)重復(fù)傳遞給遠(yuǎn)程操作時(shí),使用ray.put()將其存儲(chǔ)在對象庫中一次,然后傳遞其ID。

將數(shù)據(jù)處理形成管線

如果我們對多個(gè)任務(wù)的結(jié)果使用ray.get() ,我們將不得不等到 這些任務(wù)中 的 最后 一個(gè)完成。 如果任務(wù)花費(fèi)的時(shí)間差異很大,這可能會(huì)成為一個(gè)問題。 為了說明這個(gè)問題,請考慮以下示例,其中我們 并行 運(yùn)行四個(gè) do_some_work() 任務(wù),每個(gè)任務(wù)在0到4秒之間均勻分布。 接下來,假設(shè)這些任務(wù)的結(jié)果由 process_results()處理 ,每個(gè)結(jié)果需要1秒。 然后,預(yù)期的運(yùn)行時(shí)間是(1)執(zhí)行最慢的 do_some_work() 任務(wù)所花費(fèi)的時(shí)間加上(2)4秒,這是執(zhí)行 process_results()所 花費(fèi)的時(shí)間 。

import time?

import random?

import ray?

ray.init(num_cpus = 4)?

@ray.remote?

def do_some_work(x):?

? ? time.sleep(random.uniform(0, 4)) # 將此行替換為你要執(zhí)行的代碼.?

? ? return x?

def process_results(results):?

? ? sum = 0?

? ? for x in results:?

? ? ? ? time.sleep(1) # 將此行替換為你要執(zhí)行的代碼.?

? ? ? ? sum += x?

? ? return sum?

start = time.time()?

data_list = ray.get([do_some_work.remote(x) for x in range(4)])?

sum = process_results(data_list)?

print(“duration =”, time.time() – start, “\nresult = “, sum)?

程序的輸出顯示運(yùn)行需要接近8秒:

duration = 7.82636022567749

result = 6

當(dāng)其他任務(wù)可能早已完成時(shí),等待最后一個(gè)任務(wù)完成不必要地增加了程序運(yùn)行時(shí)間。 更好的解決方案是在數(shù)據(jù) 可用后立即處理 。 幸運(yùn)的是,Ray允許您通過 在對象ID列表上 調(diào)用 ray.wait() 來完成此操作 。 在不指定任何其他參數(shù)的情況下,只要參數(shù)列表中的對象準(zhǔn)備就緒,此函數(shù)就會(huì)返回。 此調(diào)用有兩個(gè)返回:(1)就緒對象的ID,以及(2)包含尚未準(zhǔn)備好的對象的ID的列表。 修改后的程序如下。 請注意,我們需要做的一件事就是將 process_results() 替換 為一次處理一個(gè)結(jié)果的 process_incremental() 。

進(jìn)口時(shí)間?

隨機(jī)導(dǎo)入?

導(dǎo)入光線?

import time?

import random?

import ray?

ray.init(num_cpus = 4)?

@ray.remote?

def do_some_work(x):?

? ? time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.?

? ? return x?

def process_incremental(sum, result):?

? ? time.sleep(1) # Replace this with some processing code.?

? ? return sum + result?

start = time.time()?

result_ids = [do_some_work.remote(x) for x in range(4)]?

sum = 0?

while len(result_ids):?

? ? done_id, result_ids = ray.wait(result_ids)?

? ? sum = process_incremental(sum, ray.get(done_id[0]))?

print(“duration =”, time.time() – start, “\nresult = “, sum)?

這個(gè)程序現(xiàn)在只需要4.8秒了,是一個(gè)顯著改進(jìn):

duration = 4.852453231811523?

result = 6?

( 注意:不同的運(yùn)行可能需要不同的時(shí)間,但“duration”仍應(yīng)顯著小于8秒。 )

為了幫助直覺理解, 圖1 顯示了兩種情況下的執(zhí)行時(shí)間表:使用 ray.get() 等待所有結(jié)果變?yōu)榭捎煤笤偬幚恚蛘呤褂?ray.wait() 在所有結(jié)果全部可用前就開始處理。

sI6-tbAQQ032tvCU7vqQisw

1 :(a) 在調(diào)用 process_results() 之前 使用 ray.get() 等待 do_some_work() 任務(wù)的所有結(jié)果完成后再處理的執(zhí)行時(shí)間表 。 (b)使用 ray.wait() 在部分結(jié)果可用時(shí)立即處理的 行時(shí)間表
技巧4: 使用ray.wait()可以在結(jié)果可用時(shí)立即處理。

其他資源

Ray教程是了解有關(guān)Ray編程的更多信息的絕佳資源。

ION STOICA

Ion Stoica是加州大學(xué)伯克利分校電子工程和計(jì)算機(jī)科學(xué)(EECS)系的教授,他在那里研究云計(jì)算和網(wǎng)絡(luò)計(jì)算機(jī)系統(tǒng)。之前他從事動(dòng)態(tài)數(shù)據(jù)包狀態(tài)、弦DHT、網(wǎng)間迂回基礎(chǔ)設(shè)施(i3)、聲明性網(wǎng)絡(luò)及大型系統(tǒng),包括Apache Spark、Apache Mesos和Alluxio。 他是Databricks及Conviva的聯(lián)合創(chuàng)始人,Databricks是一家將Apache Spark商業(yè)化的初創(chuàng)公司,后者是一家將大規(guī)模視頻分發(fā)技術(shù)商業(yè)化的初創(chuàng)公司。Ion是一位ACM研究員,并獲得了眾多獎(jiǎng)項(xiàng),包括SIGOPS名人堂(2015年)、SIGCOMM時(shí)間測試獎(jiǎng)(2011年)和ACM博士論文獎(jiǎng)(2001年)。

A cluster of center pivot irrigation fields. (source: Soil Science)