下述文章是參考知乎“枯芒草”寫的"ray框架學(xué)習(xí)筆記"所寫
原文網(wǎng)址:[Modern Parallel and Distributed Python: A Quick Tutorial on Ray | by Robert Nishihara | Towards Data Science]

(https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8) [需要科學(xué)上網(wǎng)瀏覽]
2019年2月11日,Robert Nishihara 發(fā)表于Towards Data Science。
[水平有限,翻譯的并不十分準(zhǔn)確,如果有條件還是看原文比較好]
Modern Parallel and Distributed Python: A Quick Tutorial on Ray
A fast, simple framework for distributed applications
Ray 是一個用于并行和分布式 Python 的開源項目。
平行和分布式計算是現(xiàn)代應(yīng)用的主要特征。我們需要利用多個核心或多臺機器來加速應(yīng)用程序或大規(guī)模運行它們。用于抓取網(wǎng)絡(luò)和響應(yīng)搜索查詢的基礎(chǔ)設(shè)施不是運行在某人筆記本電腦上的單線程程序,而是相互通信和交互的服務(wù)集合。
[pic.1]:云承諾在所有方向(內(nèi)存、計算、存儲等)都具有無限的可伸縮性。實現(xiàn)這一承諾需要新的工具來編寫云和構(gòu)建分布式應(yīng)用程序
這篇文章將描述如何使用 Ray 輕松地構(gòu)建可以從筆記本電腦擴展到大型集群的應(yīng)用程序。
Why Ray?
許多教程解釋了如何使用 Python 的多處理模塊。遺憾的是,多處理模塊在處理現(xiàn)代應(yīng)用程序的需求方面受到嚴(yán)重限制。這些要求包括:
? 在多臺計算機上運行相同的代碼
? 構(gòu)建具有狀態(tài)且可以通信的微服務(wù)和參與者
? 優(yōu)雅地處理機器故障
? 高效處理大型項目和海量數(shù)據(jù)
Ray 解決了所有這些問題,使簡單的事情變得簡單,使復(fù)雜的行為成為可能。

Necessary Concepts
傳統(tǒng)編程依賴于兩個核心概念: 函數(shù)和類。使用這些構(gòu)建塊,編程語言允許我們構(gòu)建無數(shù)的應(yīng)用程序。
但是,當(dāng)我們將應(yīng)用程序遷移到分布式設(shè)置時,概念通常會發(fā)生變化。
一方面,我們有 OpenMPI、 Python 多處理和 ZeroMQ 等工具,它們提供用于發(fā)送和接收消息的低級原語。這些工具非常強大,但它們提供了不同的抽象,因此必須從頭開始重寫單線程應(yīng)用程序才能使用它們。
另一方面,我們有專門針對領(lǐng)域的工具,比如用于模型培訓(xùn)的 TensorFlow、用于數(shù)據(jù)處理和 SQL 的 Spark 以及用于流處理的 Flink。這些工具提供更高層次的抽象,如神經(jīng)網(wǎng)絡(luò)、數(shù)據(jù)集和流。但是,由于它們與串行編程所使用的抽象不同,因此必須重新編寫應(yīng)用程序以利用它們。

Ray 占據(jù)了一個獨特的中間地帶。而不是引入新的概念。Ray 獲取函數(shù)和類的現(xiàn)有概念,并將它們作為任務(wù)和參與者轉(zhuǎn)換為分布式設(shè)置。這種 API 選擇允許串行應(yīng)用程序并行化,而不需要進行重大修改。
Starting Ray
ray.init()命令啟動所有相關(guān)的 Ray 進程。在集群上,這是唯一需要更改的行(我們需要傳遞集群地址)。這些程序包括:
? 許多用于并行執(zhí)行 Python 函數(shù)的工作進程(大約每個 CPU 內(nèi)核一個工作進程)
? 為 workers (和其他機器)分配“任務(wù)”的調(diào)度程序進程。任務(wù)是 Ray 調(diào)度的工作單元,對應(yīng)于一個函數(shù)調(diào)用或方法調(diào)用
? 共享內(nèi)存對象存儲,用于在工作線程之間高效共享對象(無需創(chuàng)建副本)
? 存儲在機器出現(xiàn)故障時重新運行任務(wù)所需的元數(shù)據(jù)的內(nèi)存數(shù)據(jù)庫
與線程相比,Ray workers 是獨立的進程,因為由于全局解釋器鎖 GIL 的限制,Python 對多線程的支持非常有限。
Parallelism with Tasks
為了將 Python 函數(shù) f 轉(zhuǎn)換為“遠程函數(shù)”(可以遠程異步執(zhí)行的函數(shù)) ,我們使用@ray.remote 聲明該函數(shù)。然后,通過 f.remote () 進行的函數(shù)調(diào)用將立即返回未來(future 是對最終輸出的引用) ,實際的函數(shù)執(zhí)行將在后臺進行(我們將此執(zhí)行稱為任務(wù))。
'''在 Python 中運行并行任務(wù)的代碼'''
import ray
import time
# Start Ray.
ray.init()
@ray.remote
def f(x):
time.sleep(1)
return x
# Start 4 tasks in parallel.
result_ids = []
for i in range(4):
result_ids.append(f.remote(i))
# Wait for the tasks to complete and retrieve the results.
# With at least 4 cores, this will take 1 second.
results = ray.get(result_ids) # [0, 1, 2, 3]
因為對 f.remote (i)的調(diào)用立即返回,所以只需要運行該行四次,就可以并行執(zhí)行 f 的四個副本。
Task Dependencies
任務(wù)還可以依賴于其他任務(wù)。接下來,multi_ Matrix 任務(wù)使用兩個 create _ Matrix 任務(wù)的輸出,因此在前兩個任務(wù)執(zhí)行完畢之前它不會開始執(zhí)行。前兩個任務(wù)的輸出將自動作為參數(shù)傳遞給第三個任務(wù),期貨將被替換為相應(yīng)的值)。通過這種方式,可以將任務(wù)與任意 DAG 依賴關(guān)系組合在一起。
'''說明三個任務(wù)的代碼,其中第三個任務(wù)取決于前兩個任務(wù)的輸出'''
import numpy as np
@ray.remote
def create_matrix(size):
return np.random.normal(size=size)
@ray.remote
def multiply_matrices(x, y):
return np.dot(x, y)
x_id = create_matrix.remote([1000, 1000])
y_id = create_matrix.remote([1000, 1000])
z_id = multiply_matrices.remote(x_id, y_id)
# Get the results.
z = ray.get(z_id)
Aggregating Values Efficiently
任務(wù)依賴關(guān)系可以以更復(fù)雜的方式使用。例如,假設(shè)我們希望將8個值聚合在一起。此示例使用整數(shù)相加,但在許多應(yīng)用程序中,跨多臺機器聚合大型向量可能是一個瓶頸。在這種情況下,更改單行代碼可以將聚合的運行時間從線性變?yōu)榫酆现禂?shù)量的對數(shù)。

如上所述,要將一個任務(wù)的輸出作為輸入提供給后續(xù)任務(wù),只需將第一個任務(wù)返回的將來作為參數(shù)傳遞給第二個任務(wù)。Ray 的調(diào)度程序?qū)⒆詣涌紤]這個任務(wù)依賴關(guān)系。第二個任務(wù)在第一個任務(wù)完成之前不會執(zhí)行,第一個任務(wù)的輸出將自動發(fā)送到正在執(zhí)行第二個任務(wù)的機器上。
'''以線性方式與以樹結(jié)構(gòu)方式聚合值的代碼'''
import time
@ray.remote
def add(x, y):
time.sleep(1)
return x + y
# Aggregate the values slowly. This approach takes O(n) where n is the
# number of values being aggregated. In this case, 7 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
# Aggregate the values in a tree-structured pattern. This approach
# takes O(log(n)). In this case, 3 seconds.
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
上面的代碼非常明確,但是請注意,可以使用 while 循環(huán)以更簡潔的方式實現(xiàn)這兩種方法。
'''
兩個聚合方案的更簡潔的實現(xiàn)。
這兩個代碼塊之間的唯一區(qū)別是“ add.remote”的輸出是放在列表的前面還是后面
'''
# Slow approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = [add.remote(values[0], values[1])] + values[2:]
result = ray.get(values[0])
# Fast approach.
values = [1, 2, 3, 4, 5, 6, 7, 8]
while len(values) > 1:
values = values[2:] + [add.remote(values[0], values[1])]
result = ray.get(values[0])
From Classes to Actors
在不使用類的情況下編寫有趣的應(yīng)用程序是具有挑戰(zhàn)性的,在分布式設(shè)置中,這一點與在單個核上一樣正確。
Ray 允許您使用 Python 類并使用@Ray.remote 聲明它。遠程裝潢師。每當(dāng)類被實例化時,Ray 就創(chuàng)建一個新的“ actor”,這是一個在集群中某處運行并保存對象副本的進程。對該參與者的方法調(diào)用轉(zhuǎn)換為在該參與者進程上運行的任務(wù),并且可以訪問和更改該參與者的狀態(tài)。通過這種方式,參與者允許在多個任務(wù)之間共享可變狀態(tài),而遠程函數(shù)則不允許。
各個參與者按順序執(zhí)行方法(每個方法都是原子的) ,因此不存在競態(tài)條件。并行可以通過創(chuàng)建多個參與者來實現(xiàn)。
'''將 Python 類實例化為參與者的代碼示例'''
@ray.remote
class Counter(object):
def __init__(self):
self.x = 0
def inc(self):
self.x += 1
def get_value(self):
return self.x
# Create an actor process.
c = Counter.remote()
# Check the actor's counter value.
print(ray.get(c.get_value.remote())) # 0
# Increment the counter twice and check the value again.
c.inc.remote()
c.inc.remote()
print(ray.get(c.get_value.remote())) # 2
上面的例子是參與者最簡單的可能用法。Counter.remote ()行創(chuàng)建一個新的 Actor 進程,該進程具有 Counter 對象的副本。對 c.get_value.remote() 和 c.inc.remote() 的調(diào)用在遠程執(zhí)行組件進程上執(zhí)行任務(wù)并改變執(zhí)行組件的狀態(tài)。
Actor Handles
在上面的示例中,我們只從主 Python 腳本調(diào)用了角色上的方法。角色最強大的一個方面是,我們可以將句柄傳遞給角色,這允許其他角色或其他任務(wù)調(diào)用同一個角色上的所有方法。
下面的示例創(chuàng)建一個存儲消息的參與者。幾個工作任務(wù)重復(fù)地將消息推送給參與者,主 Python 腳本定期讀取消息。
'''從多個并發(fā)任務(wù)調(diào)用參與者方法的代碼'''
import time
@ray.remote
class MessageActor(object):
def __init__(self):
self.messages = []
def add_message(self, message):
self.messages.append(message)
def get_and_clear_messages(self):
messages = self.messages
self.messages = []
return messages
# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
for i in range(100):
time.sleep(1)
message_actor.add_message.remote(
"Message {} from worker {}.".format(i, j))
# Create a message actor.
message_actor = MessageActor.remote()
# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]
# Periodically get the messages and print them.
for _ in range(100):
new_messages = ray.get(message_actor.get_and_clear_messages.remote())
print("New messages:", new_messages)
time.sleep(1)
# This script prints something like the following:
# New messages: []
# New messages: ['Message 0 from worker 1.', 'Message 0 from worker 0.']
# New messages: ['Message 0 from worker 2.', 'Message 1 from worker 1.', 'Message 1 from worker 0.', 'Message 1 from worker 2.']
# New messages: ['Message 2 from worker 1.', 'Message 2 from worker 0.', 'Message 2 from worker 2.']
# New messages: ['Message 3 from worker 2.', 'Message 3 from worker 1.', 'Message 3 from worker 0.']
# New messages: ['Message 4 from worker 2.', 'Message 4 from worker 0.', 'Message 4 from worker 1.']
# New messages: ['Message 5 from worker 2.', 'Message 5 from worker 0.', 'Message 5 from worker 1.']
Actor 是非常強大的。它們允許您獲取一個 Python 類,并將其實例化為一個微服務(wù),可以從其他角色和任務(wù)甚至其他應(yīng)用程序查詢這個微服務(wù)。
任務(wù)和參與者是 Ray 提供的核心抽象。這兩個概念非常普遍,可以用來實現(xiàn)復(fù)雜的應(yīng)用程序,包括Ray的內(nèi)置強化學(xué)習(xí)庫、超參數(shù)調(diào)優(yōu)、加速Pandas等等。
Learn More About Ray
- Check out the code on GitHub.
- View the Ray documentation.
- Ask and answer questions on the Ray forum.
- Check out libraries built with Ray for scaling reinforcement learning, scaling hyperparameter tuning, scaling model serving, and scaling data processing.