使用 Ray 用 15 行 Python 代碼實(shí)現(xiàn)一個(gè)參數(shù)服務(wù)器

使用 Ray 用 15 行 Python 代碼實(shí)現(xiàn)一個(gè)參數(shù)服務(wù)器

參數(shù)服務(wù)器是很多機(jī)器學(xué)習(xí)應(yīng)用的核心部分。其核心作用是存放機(jī)器學(xué)習(xí)模型的參數(shù)(如,神經(jīng)網(wǎng)絡(luò)的權(quán)重)和提供服務(wù)將參數(shù)傳給客戶端(客戶端通常是處理數(shù)據(jù)和計(jì)算參數(shù)更新的 workers)

參數(shù)服務(wù)器(如同數(shù)據(jù)庫)是正常構(gòu)建并 shipped 像一個(gè)單一系統(tǒng)。這個(gè)文章講解如何使用 Ray 來用幾行代碼實(shí)現(xiàn)參數(shù)服務(wù)器。

通過將參數(shù)服務(wù)器從一個(gè)“系統(tǒng)”調(diào)整為一個(gè)“應(yīng)用”,這個(gè)方法將量級的 orders 變得更加簡單來部署一個(gè)參數(shù)服務(wù)器應(yīng)用。類似地,通過讓應(yīng)用和庫實(shí)現(xiàn)自身的參數(shù)服務(wù)器,這個(gè)方法讓參數(shù)服務(wù)器的行為更加可配置和靈活(因?yàn)檫@個(gè)應(yīng)用可以輕松地修改實(shí)現(xiàn))

什么是 Ray? Ray 是一個(gè)用于并行和分布式的通用框架。Ray 提供了一個(gè)統(tǒng)一的任務(wù)并行和actor抽象,并且通過共享內(nèi)存、零復(fù)制序列化和分布式調(diào)度達(dá)到了高的性能。Ray 也包含了針對人工智能應(yīng)用(如超參數(shù)調(diào)優(yōu)和強(qiáng)化學(xué)習(xí))的高性能庫。

什么是一個(gè)參數(shù)服務(wù)器?

一個(gè)參數(shù)服務(wù)器是一個(gè)用來在集群上訓(xùn)練機(jī)器學(xué)習(xí)模型的鍵值對。其(values)是機(jī)器學(xué)習(xí)模型的參數(shù)(如一個(gè)神經(jīng)網(wǎng)絡(luò))。其(keys)索引了模型參數(shù)。

例如,在一個(gè)電影的推薦系統(tǒng)中,可能會(huì)針對每個(gè)用戶、每個(gè)電影都有相應(yīng)的鍵。對每個(gè)用戶和電影,有對應(yīng)的以用戶特屬和以電影特屬的參數(shù)。在語言建模的應(yīng)用中,詞可能會(huì)作為鍵而其嵌入則可能為值。在最簡單的形式中,參數(shù)服務(wù)器可能會(huì)隱式地有一個(gè)單個(gè)鍵,允許你所有的參數(shù)被獲取并一次性更新。我們展示了如何作為一個(gè) Ray 的 actor 實(shí)現(xiàn)一個(gè)參數(shù)服務(wù)器。

import numpy as np
import ray

@ray.remote
class ParameterServer(object):
  def __init__(self, dim):
    # params 可以是一個(gè)將鍵映射到數(shù)組的字典
    self.params = np.zeros(dim)

  def get_params(self):
    return self.params
    
  def update_params(self, grad):
    self.params += grad

@ray.remote 裝飾器定義了一個(gè)服務(wù)。以類 ParameterServer 為‘輸入’并使之作為一個(gè)遠(yuǎn)程服務(wù)或者 actor 被實(shí)例化。

這里,我們假設(shè)更新是一個(gè)梯度,這個(gè)被加到參數(shù)的向量上。這僅僅是最簡單可能例子,可以有很多不同的選擇。

參數(shù)服務(wù)器一般作為遠(yuǎn)程進(jìn)程或者服務(wù)存在 并通過遠(yuǎn)程過程調(diào)用來和客戶端交互。為了實(shí)例化參數(shù)服務(wù)器為一個(gè)遠(yuǎn)程 actor,我們可以這樣:

ray.init()

ps = ParameterServer.remote(10)

Actor 方法調(diào)用返回 futures。如果我們想要檢索實(shí)際值,我們可以使用一個(gè) blocking 的 ray.get 調(diào)用,如:

>>> ps = ParameterServer.remote(10)
>>> params_id = ps.get_params.remote()
>>> params_id
ObjectID(4e9c8ac9a6d3dbf20c625f8d36c93beb07ca45d0)

>>> ray.get(params_id)
array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

現(xiàn)在,假設(shè)我們想要啟動(dòng)某些 worker 任務(wù)連續(xù)地計(jì)算梯度和更新模型的參數(shù)。每個(gè) worker 將會(huì)循環(huán)地執(zhí)行下面任務(wù):

  1. 獲取最新的參數(shù)
  2. 計(jì)算對參數(shù)的一個(gè)更新
  3. 更新參數(shù)

作為一個(gè) Ray 遠(yuǎn)程函數(shù)(盡管 worker 也可以被看做一個(gè) actor),如下:

import time

# 注意 worker 函數(shù)獲取參數(shù)服務(wù)器作為參數(shù),使得 worker 任務(wù)激活參數(shù)服務(wù)器 actor 的方法

@ray.remote
def worker(ps):
  for _ in range(100):
    params_id = ps.get_params.remote() # 這個(gè)方法調(diào)用是非阻塞的,返回一個(gè) future
    params = ray.get(params_id) # 這是一個(gè)阻塞的調(diào)用,等待任務(wù)完成,并獲取結(jié)果
    
    # 計(jì)算梯度更新。這里我們僅做一個(gè)假的更新,但在實(shí)際環(huán)境中,這里會(huì)使用一個(gè)庫,如 tensorflow,也會(huì)獲取一個(gè)批量的數(shù)據(jù)為輸入
    grad = np.ones(10)
    time.sleep(0.2) # 這個(gè)是一個(gè)偽造的作為計(jì)算的占位符
    
    # 更新參數(shù)
    ps.update_params.remote(grad)

然后我們可以啟動(dòng)幾個(gè) worker 任務(wù):

for _ in range(2):
  worker.remote(ps)

接著我們可以從驅(qū)動(dòng)進(jìn)程中檢索到參數(shù),并看到他們由 workers 進(jìn)行更新

>>> ray.get(ps.get_params.remote())
array([164., 164., 164., 164., 164., 164., 164., 164., 164., 164.])
>>> ray.get(ps.get_params.remote())
array([198., 198., 198., 198., 198., 198., 198., 198., 198., 198.])
>>> 
>>> ray.get(ps.get_params.remote())
array([200., 200., 200., 200., 200., 200., 200., 200., 200., 200.])
>>> ray.get(ps.get_params.remote())
array([200., 200., 200., 200., 200., 200., 200., 200., 200., 200.])
>>> ray.get(ps.get_params.remote())
array([200., 200., 200., 200., 200., 200., 200., 200., 200., 200.])

Ray 這里加上的值一部分原因是 Ray 讓其變得簡單來啟動(dòng)一個(gè)遠(yuǎn)程服務(wù)或者 actor 因?yàn)檫@是定義了一個(gè) Python 類。actor 的 Handles 可以被傳遞給其他的 actors 和任務(wù),來保證可以進(jìn)行任意和直覺的消息傳遞和通信模式。目前的替代物更多。例如,考慮等價(jià)運(yùn)行時(shí)刻服務(wù)創(chuàng)建和用 GRPC 來進(jìn)行 handle 的傳遞。

擴(kuò)展

這里我們給出一些設(shè)計(jì)上的重要變化。我們描述了額外的自然擴(kuò)展。

多參數(shù)服務(wù)器的分片 sharding 當(dāng)你的參數(shù)很大和集群很大時(shí),單個(gè)參數(shù)武器可能不能滿足要求,因?yàn)閼?yīng)用會(huì)被網(wǎng)絡(luò)帶寬限制,進(jìn)入和流出參數(shù)服務(wù)器所在的機(jī)器(特別是有很多的 workers 時(shí)候)

一個(gè)自然的解決方法是對多參數(shù)服務(wù)器上的參數(shù)進(jìn)行分片。這個(gè)可以被簡單地開啟多個(gè)參數(shù)服務(wù)器 actors 達(dá)成。例如我們底下給出的代碼那樣。

控制 actor 放置 特定 actors 和任務(wù)在不同機(jī)器上的放置可以使用 Ray 對任意的資源需求支持指定。例如,如果 worker 需要一個(gè) GPU,那么它的遠(yuǎn)程裝飾器可以被聲明為 @ray.remote(num_gpus=1)。任意定制資源可以同樣定義。

統(tǒng)一任務(wù)和 actors

Ray 支持參數(shù)服務(wù)器應(yīng)用高效大部分原因是其統(tǒng)一的任務(wù)并行和 actor 抽象。

流行的數(shù)據(jù)處理系統(tǒng)如 Apache Spark,可以有無狀態(tài)的任務(wù)(沒有 side effects 的函數(shù))在不可變動(dòng)的數(shù)據(jù)上操作。這個(gè)假設(shè)簡化了整體系統(tǒng)的設(shè)計(jì),讓驗(yàn)證正確性變得簡單。

但是,可變狀態(tài)在很多的任務(wù)中存在,機(jī)器學(xué)習(xí)領(lǐng)域中反復(fù)出現(xiàn)。狀態(tài)可能是一個(gè)神經(jīng)網(wǎng)絡(luò)的權(quán)重,第三方模擬器的狀態(tài),或者物理世界的交互的封裝。

為了支持這些類型的應(yīng)用,Ray 引入了 actor 抽象。一個(gè) actor 會(huì)序列化地執(zhí)行方法(使得沒有并發(fā)的問題),每個(gè)任務(wù)可以任意地改變 actor 的內(nèi)部狀態(tài)。方法可以有其他的 actors 和任務(wù)激活(甚至由在同樣的集群上的其他應(yīng)用)

讓 Ray 變得很強(qiáng)大的一點(diǎn)是它統(tǒng)一了 actor 抽象和任務(wù)并行抽像,繼承了兩者的優(yōu)點(diǎn)。Ray 使用了底層的動(dòng)態(tài)任務(wù)圖在同樣的框架中來實(shí)現(xiàn) actors 和無狀態(tài)任務(wù)。所以,這兩個(gè)抽象其實(shí)完全整合在一起。任務(wù)和 actors 可以從其他任務(wù)和 actors 中進(jìn)行創(chuàng)建。兩者返回的future可以被傳遞給其他的任務(wù)或者 actor 方法來引入調(diào)度和數(shù)據(jù)依賴。所以,Ray 應(yīng)用進(jìn)程了這兩個(gè)的好的特性。

底層基礎(chǔ)

動(dòng)態(tài)任務(wù)圖 在底層,遠(yuǎn)程函數(shù)激活和 actor 方法激活創(chuàng)建了任務(wù)被加入到一個(gè)動(dòng)態(tài)增長的任務(wù)圖上。Ray 的后端管理調(diào)度和在集群上執(zhí)行這些任務(wù)(或者在一個(gè)單機(jī)多核機(jī)器上)。任務(wù)可以被 driver 應(yīng)用或者其他任務(wù)創(chuàng)建。

數(shù)據(jù) Ray 使用 Apache Arrow data layout 來高效地序列化數(shù)據(jù)。對象在 workers 和 actors 之間通過共享內(nèi)存在同樣的機(jī)器上進(jìn)行共享,這就避免了復(fù)制和去序列化的需要。這樣的優(yōu)化絕對是達(dá)到好的性能的關(guān)鍵。

調(diào)度 Ray 使用了一個(gè)分布式調(diào)度方法。每個(gè)機(jī)器有其自身的調(diào)度器,這個(gè)東西管理這臺機(jī)器上的 workers 和 actors。任務(wù)被應(yīng)用和 workers 提交給同一機(jī)器上的調(diào)度器。這讓 Ray 達(dá)成比一個(gè)中心化的調(diào)度器達(dá)到的更高的任務(wù)吞吐量,這對機(jī)器學(xué)習(xí)應(yīng)用非常重要。

總結(jié)

參數(shù)服務(wù)器通常是做一個(gè)單一系統(tǒng)實(shí)現(xiàn)和 shipped。讓這個(gè)方法很強(qiáng)大的是我們能夠用少量代碼實(shí)現(xiàn)參數(shù)服務(wù)器為一個(gè)應(yīng)用。這個(gè)方法讓部署使用參數(shù)服務(wù)器的應(yīng)用和修改參數(shù)服務(wù)器的行為更加簡單。例如,如果我們希望對參數(shù)服務(wù)器進(jìn)行分片,改變更新規(guī)則,在同步和異步更新之間切換,或略 straggler workers,或者任何其他的定制,我們可以用少量的代碼達(dá)成。

這個(gè)文章描述了如何使用 Ray 的 actors 來實(shí)現(xiàn)參數(shù)服務(wù)器。然而,actors 是更加通用的概念,可以用來進(jìn)行很多包含狀態(tài)計(jì)算的應(yīng)用。logging,streaming,simulation,model serving, graph processing,和其他應(yīng)用。

運(yùn)行代碼

為了運(yùn)行完整的應(yīng)用,首先安裝 Ray pip install ray。然后能運(yùn)行下面的代碼,這段代碼實(shí)現(xiàn)了一個(gè)共享的參數(shù)服務(wù)器。

import numpy as np
import ray
import time

# Start Ray.
ray.init()


@ray.remote
class ParameterServer(object):
    def __init__(self, dim):
        # Alternatively, params could be a dictionary mapping keys to arrays.
        self.params = np.zeros(dim)

    def get_params(self):
        return self.params

    def update_params(self, grad):
        self.params += grad


@ray.remote
def worker(*parameter_servers):
    for _ in range(100):
        # Get the latest parameters.
        parameter_shards = ray.get(
          [ps.get_params.remote() for ps in parameter_servers])
        params = np.concatenate(parameter_shards)

        # Compute a gradient update. Here we just make a fake
        # update, but in practice this would use a library like
        # TensorFlow and would also take in a batch of data.
        grad = np.ones(10)
        time.sleep(0.2)  # This is a fake placeholder for some computation.
        grad_shards = np.split(grad, len(parameter_servers))

        # Send the gradient updates to the parameter servers.
        for ps, grad in zip(parameter_servers, grad_shards):
            ps.update_params.remote(grad)


# Start two parameter servers, each with half of the parameters.
parameter_servers = [ParameterServer.remote(5) for _ in range(2)]

# Start 2 workers.
workers = [worker.remote(*parameter_servers) for _ in range(2)]

# Inspect the parameters at regular intervals.
for _ in range(5):
    time.sleep(1)
    print(ray.get([ps.get_params.remote() for ps in parameter_servers]))

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容