Scale a CPU intensive python task using Ray

本文旨在嘗試使用 Ray 將一個運行在單機環(huán)境,非常消耗 CPU 且運行時間較長的算法任務,改成分布式環(huán)境運行的程序,以達到同時降低單臺機器的負載和提高任務整體運行的速度的作用

1.Setup environment

1.1 Miniconda

我們知道 python 有自帶的包管理工具 pip, 為什么我們需要 conda,還有 miniconda 和 conda 的關系是什么?

http://blog.sina.com.cn/s/blog_8a122dcf0102x9vn.html

安裝 miniconda 可以參考

https://developers.google.com/earth-engine/python_install-conda

Conda 國內(nèi)鏡像加速

conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes

使用 conda 創(chuàng)建專用于 ray 的環(huán)境

conda create -n ray python=3.7
conda activate ray

# 如果發(fā)現(xiàn) python 的版本不符合自己的要求
conda uninstall python
conda install python=3.7

# 將 ray 設置為 conda 登錄默認環(huán)境
printf '\n# add path to conda\nexport PATH="$HOME/miniconda3/bin:$PATH"\n' >> ~/.bashrc
echo 'source activate ray' >> ~/.bashrc

1.1 Ray

pip install -U ray

1.2 Pandas

pip install -U pandas

1.3 Facebook Faiss

# CPU version only
conda install faiss-cpu -c pytorch

# GPU version
conda install faiss-gpu cudatoolkit=8.0 -c pytorch # For CUDA8
conda install faiss-gpu cudatoolkit=9.0 -c pytorch # For CUDA9
conda install faiss-gpu cudatoolkit=10.0 -c pytorch # For CUDA10

1.4 Ray cluster

# start head
ray start --head --redis-port=6379
# add worker
ray start --address='xxxxxxxx:6379' --redis-password='5241590000000000'

2.Rewrite python script

faiss_query_actor_pool.py

import ray
import load
from ray.util import ActorPool
import faiss_index
import sys
import pandas as pd

ray.init(include_webui=False)

@ray.remote(memory=1500 * 1024 * 1024)
class FaissQuery(object):
    def __init__(self, goods_embed):
        self.TOPN = 1000
        self.index = faiss_index.FaissIndex(goods_embed)

    def search(self, rows):
        #slow operation
        rs = self.index.search_by_vectors(toVectors(rows), self.TOPN)
        return extractResult(rs)        

if __name__ == "__main__":
    goods_file = sys.argv[1]
    query_file = sys.argv[2]
    output_file = sys.argv[3]
    BATCH_SIZE = int(sys.argv[4])
    POOL_SIZE = int(sys.argv[5])

    goods_embed = load.load_embedding(goods_file)
    print('Load goods  ' + str(len(goods_embed)))
    query_embed = pd.read_csv(query_file, sep='\t', header=None, names=['query','embed']).to_dict("records")
    print('Load queries ' + str(len(query_embed)))

    print(f'BATCH_SIZE={BATCH_SIZE}, RAY_ACTOR_POOL_SIZE={POOL_SIZE}')

    actorPool = ActorPool([FaissQuery.remote(goods_embed) for i in range(POOL_SIZE)])

    index = 0
    while index < len(query_embed):
        rows = query_embed[index : index + BATCH_SIZE]

        actorPool.submit(lambda a, v: a.search.remote(rows), rows)

        index += BATCH_SIZE

    with open(output_file, "a") as f:
        while actorPool.has_next():
            f.write(actorPool.get_next())

3.Testing

可以看到當 actor 的數(shù)量增加時,計算時間在減少

$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 1

Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=1

real    0m21.716s
user    0m4.979s
sys 0m1.908s
$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 3

Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=3

real    0m11.756s
user    0m4.168s
sys 0m1.539s

$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 6

2020-06-18 17:01:27,406 INFO resource_spec.py:212 -- Starting Ray with 12.84 GiB memory available for workers and up to 6.44 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=6

real    0m8.698s
user    0m4.129s
sys 0m1.414s
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容