機(jī)器學(xué)習(xí)筆記:GBDT的并行化訓(xùn)練

@作者: 機(jī)器學(xué)習(xí)算法 @迪吉老農(nóng)

最近使用GBDT時(shí),想通過分布式進(jìn)行訓(xùn)練,嘗試了一些框架,但原理不太了解。有些東西與同事討論后,也還不甚明了,于是專心看了一下文檔,在此記錄一下。

1、分布式原理

常用分布式訓(xùn)練方式,應(yīng)該是參數(shù)服務(wù)器。worker把sample的統(tǒng)計(jì)結(jié)果推送到單臺參數(shù)服務(wù)器機(jī)器上,參數(shù)服務(wù)器匯總后,再推送到worker端。有點(diǎn)類似于單reducer的方式。

相比于參數(shù)服務(wù)器的中心化方案,這里提到的都是去中心化方案。

LightGBM與XGBoost的其中一個(gè)區(qū)別,是實(shí)現(xiàn)了真正的分布式,原因是LightGBM不依托hive,TaskManager之間可以暴露端口進(jìn)行通信,而Hivemall依托Hive,只能實(shí)現(xiàn)無通信的分布式。

把這幾種機(jī)制梳理一下,目前一共是4種方案,

  • Bagging
  • Feature Parallel
  • Data Parallel
  • Voting

其中,Bagging方式是不帶通信機(jī)制的;另外三種則通過通信,做到了針對GBDT的分布式訓(xùn)練(好像可以類比,數(shù)據(jù)并行,模型并行?)

LightGBM文檔建議,按照下面方式選擇并行方式,

#data is small #data is large
#feature is small Feature Parallel Data Parallel
#feature is large Feature Parallel Voting Parallel

1.1、Bagging

完全非中心化的架構(gòu),通過Data Parallel的方式將數(shù)據(jù)打散為多份,分布式訓(xùn)練,最后訓(xùn)練多顆子樹,在最終階段進(jìn)行Bagging輸出。其實(shí)嚴(yán)格來說,這個(gè)不是GBDT分布式了,而是一個(gè)隨機(jī)森林了。在Hivemall封裝的XGB中,就是這樣實(shí)現(xiàn)的。XGB的文檔中,對于分布式的描述不多,沒有看到其他帶有通信機(jī)制的分布式的介紹。具體訓(xùn)練流程后面介紹。

1.2、Feature Parallel

特征并行的訓(xùn)練方式,流程如下,

  • 數(shù)據(jù)按列切分成不同的worker,
  • 每個(gè)結(jié)點(diǎn)保留全部的數(shù)據(jù),但分裂時(shí)只使用特定的特征,每個(gè)worker使用自己的feature進(jìn)行分裂,輸出候選的分裂集splits
  • worker之間通信,選取gain提升最多的split,然后統(tǒng)一執(zhí)行相同的split
按特征切分

性能層面,由于每次分裂都是用全量數(shù)據(jù),但是列變少了,所以數(shù)據(jù)量是瓶頸,時(shí)間復(fù)雜度O(#data)

1.3、Data Parallel

數(shù)據(jù)并行是類似于Bagging的方式,按照行進(jìn)行分裂,區(qū)別是在于分裂點(diǎn)的計(jì)算上。Bagging的方式中,樹與樹之間是獨(dú)立的,每棵樹的分裂點(diǎn)都是只使用了1/n的數(shù)據(jù)計(jì)算的,前期的分裂應(yīng)該還好,子樣本與總樣本之間統(tǒng)計(jì)差異不大,但是細(xì)分的時(shí)候可能就有點(diǎn)區(qū)別了。

數(shù)據(jù)并行下,每個(gè)worker統(tǒng)計(jì)自己的分裂點(diǎn)的數(shù)據(jù),比如在一種分裂下,worker1內(nèi)部:喜歡10人,不喜歡1人,worker2內(nèi)部:喜歡5人,不喜歡2人,在匯總階段就可以算出一種分裂下,整體是喜歡15人,不喜歡3人。注意,這還只是一種分裂,如果要考慮所有的分裂可能性,那這個(gè)計(jì)算和匯總工作就很大了。

按數(shù)據(jù)切分

具體說下,匯總的這個(gè)操作,是采用All-reduce的方式(將每臺機(jī)器的子樣本結(jié)果,發(fā)送給所有機(jī)器,然后在每臺機(jī)器上都reduce全部的子樣本;常規(guī)的reduce是把子樣本結(jié)果發(fā)送給一臺機(jī)器,在那臺機(jī)器上匯總)。

這樣的計(jì)算方式,通信成本的時(shí)間復(fù)雜度是O(2 * #feature * #bin)

LightGBM文檔中是這么說,也沒太看懂,估計(jì)會有常數(shù)倍的提升

Data Parallel in LightGBM

We reduce communication cost of data parallel in LightGBM:

  1. Instead of “Merge global histograms from all local histograms”, LightGBM uses “Reduce Scatter” to merge histograms of different (non-overlapping) features for different workers. Then workers find the local best split on local merged histograms and sync up the global best split.
  2. As aforementioned, LightGBM uses histogram subtraction to speed up training. Based on this, we can communicate histograms only for one leaf, and get its neighbor’s histograms by subtraction as well.

All things considered, data parallel in LightGBM has time complexity O(0.5 * #feature * #bin).

1.4、Voting Parallel

近似算法,認(rèn)為大樣本情況下,局部最優(yōu)分裂和整體最優(yōu)分類是近似的。文獻(xiàn)引用,摘要如下

Decision tree (and its extensions such as Gradient Boosting Decision Trees and Random Forest) is a widely used machine learning algorithm, due to its practical effectiveness and model interpretability. With the emergence of big data, there is an increasing need to parallelize the training process of decision tree. However, most existing attempts along this line suffer from high communication costs. In this paper, we propose a new algorithm, called \emph{Parallel Voting Decision Tree (PV-Tree)}, to tackle this challenge. After partitioning the training data onto a number of (e.g., M) machines, this algorithm performs both local voting and global voting in each iteration. For local voting, the top-k attributes are selected from each machine according to its local data. Then, the indices of these top attributes are aggregated by a server, and the globally top-2k attributes are determined by a majority voting among these local candidates. Finally, the full-grained histograms of the globally top-2k attributes are collected from local machines in order to identify the best (most informative) attribute and its split point. PV-Tree can achieve a very low communication cost (independent of the total number of attributes) and thus can scale out very well. Furthermore, theoretical analysis shows that this algorithm can learn a near optimal decision tree, since it can find the best attribute with a large probability. Our experiments on real-world datasets show that PV-Tree significantly outperforms the existing parallel decision tree algorithms in the tradeoff between accuracy and efficiency.

2、訓(xùn)練方式

2.1、Hivemall調(diào)用XGBoost

訓(xùn)練數(shù)據(jù)的格式是以hive表的方式保存,格式如下

function create_dataset_on_emr() {
  hive -e "
    USE cv_weibo;
    DROP TABLE IF EXISTS ${T_PRE}_xgb_input;
    CREATE EXTERNAL TABLE ${T_PRE}_xgb_input (
      row_id string, 
      features array<string> comment '特征數(shù)組',
      label string comment '類目'
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
    LOCATION '/yandi/iris/${T_PRE}_xgb_input'
    ;
  "
}

訓(xùn)練過程,調(diào)用Java封裝的UDAF函數(shù)

function train() {
  hive -e "
    use cv_weibo;
    set hive.execution.engine=mr;
    set mapred.reduce.tasks=10;
    set mapreduce.job.maps=20;
    set hivevar:shufflebuffersize=1000;
    set hivevar:xtimes=10;
    SET hivevar:hivemall_jar=hdfs://yandi/udf/hivemall-all-0.6.2-incubating.jar;
    CREATE TEMPORARY FUNCTION train_xgboost AS 'hivemall.xgboost.XGBoostTrainUDTF' USING JAR '\${hivemall_jar}';
    CREATE TEMPORARY FUNCTION rand_amplify as 'hivemall.ftvec.amplify.RandomAmplifierUDTF' USING JAR '\${hivemall_jar}';
    CREATE TEMPORARY FUNCTION amplify as 'hivemall.ftvec.amplify.AmplifierUDTF' USING JAR '\${hivemall_jar}';

    drop table if exists ${T_PRE}_xgb_softmax_model;
    create table ${T_PRE}_xgb_softmax_model 
    as
    select 
      train_xgboost(features, label, 
        '-objective binary:logistic -num_round 50 -num_early_stopping_rounds 30 
        -max_depth 5 -min_child_weight 2') 
        as (model_id, model)
    from (
      select 
        rowid, features, label
      from
        ${T_PRE}_xgb_input
      cluster by rand(43) 
    ) shuffled;
  "
}

訓(xùn)練出來的模型保存在一張hive表里,并且通過base64編碼成了一個(gè)加密字符串。

預(yù)測階段,將數(shù)據(jù)與模型表join在一起,使得每行都保存了完整的模型分裂信息。UDAF可以隨時(shí)加載模型參數(shù),實(shí)現(xiàn)數(shù)據(jù)并行化預(yù)測。如果在訓(xùn)練階段,分成了多個(gè)reducer,則會生成多個(gè)子模型,在預(yù)測時(shí)可以選擇其中一個(gè),也可以選擇所有結(jié)果進(jìn)行bagging(代碼里對應(yīng)majority_vote)。如果訓(xùn)練階段只設(shè)置了一個(gè)reducer,則這里只有一個(gè)單獨(dú)模型

function predict() {
  hive -e "
    use cv_weibo;
    set hive.execution.engine=mr;
    SET hivevar:hivemall_jar=hdfs://yandi/udf/hivemall-all-0.6.2-incubating.jar;
    CREATE TEMPORARY FUNCTION majority_vote as 'hivemall.tools.aggr.MajorityVoteUDAF' USING JAR '\${hivemall_jar}';
    CREATE TEMPORARY FUNCTION xgboost_predict_one AS 'hivemall.xgboost.XGBoostPredictOneUDTF' USING JAR '\${hivemall_jar}';

    drop table if exists ${T_PRE}_xgb_softmax_predicted;
    create table ${T_PRE}_xgb_softmax_predicted
    as
    with yandi_tmp_sample as (
      select * from ${T_PRE}_xgb_input
      where rand() < 0.0001
    )
    select
      rowid,
      majority_vote(cast(predicted as int)) as label
    from (
      select
        xgboost_predict_one(rowid, features, model_id, model) as (rowid, predicted)
      from
        ${T_PRE}_xgb_softmax_model l
        join yandi_tmp_sample r
        on model_id > ''
    ) t
    group by rowid;
  "
}

這里有個(gè)技巧,由于模型表只有1行(n個(gè)bagging的模型則是n行),在預(yù)測階段的worker數(shù)量會卡死在1,沒有按照數(shù)據(jù)行數(shù)進(jìn)行scale。經(jīng)過多次嘗試,通過join代替outer join,可以解決這個(gè)問題,join需要一個(gè)條件,于是加了一個(gè)一定會滿足的條件。

2.2、分布式LightGBM

訓(xùn)練數(shù)據(jù)的格式是以hdfs文件的方式保存,格式需要是tsv,csv,或者空格分列的libsvm格式

dataset_lightgbm() {
  hive -e "
    INSERT OVERWRITE DIRECTORY 'viewfs://c9/yandi/iris/data'
    select concat_ws(' ',
      label,
      concat_ws(' ', features)
    )
    from ${T_PRE}_xgb_input 
  " 
}

訓(xùn)練過程,如果是本地單機(jī),則可以運(yùn)行命令行,

./LightGBM/lightgbm config=train.conf

配置文件如下,

# task type, support train and predict
task = train
# boosting type, support gbdt for now, alias: boosting, boost
boosting_type = gbdt
# application type, support following application
# regression , regression task
# binary , binary classification task
# lambdarank , lambdarank task
# alias: application, app
objective = binary
# eval metrics, support multi metric, delimite by ',' , support following metrics
# l1
# l2 , default metric for regression
# ndcg , default metric for lambdarank
# auc
# binary_logloss , default metric for binary
# binary_error
metric = binary_logloss,auc
# frequence for metric output
metric_freq = 1
# true if need output metric for training data, alias: tranining_metric, train_metric
is_training_metric = true
# number of bins for feature bucket, 255 is a recommend setting, it can save memories, and also has good accuracy.
max_bin = 255
# training data
# if exsting weight file, should name to "binary.train.weight"
# alias: train_data, train
data = data/000000_0
# validation data, support multi validation data, separated by ','
# if exsting weight file, should name to "binary.test.weight"
# alias: valid, test, test_data,
valid_data = data/000001_0
# number of trees(iterations), alias: num_tree, num_iteration, num_iterations, num_round, num_rounds
num_trees = 5
# shrinkage rate , alias: shrinkage_rate
learning_rate = 0.1
# number of leaves for one tree, alias: num_leaf
num_leaves = 15
# type of tree learner, support following types:
# serial , single machine version
# feature , use feature parallel to train
# data , use data parallel to train
# voting , use voting based parallel to train
# alias: tree
tree_learner = feature
# number of threads for multi-threading. One thread will use one CPU, defalut is setted to #cpu.
# num_threads = 8
# feature sub-sample, will random select 80% feature to train on each iteration
# alias: sub_feature
feature_fraction = 0.8
# Support bagging (data sub-sample), will perform bagging every 5 iterations
bagging_freq = 5
# Bagging farction, will random select 80% data on bagging
# alias: sub_row
bagging_fraction = 0.8
# minimal number data for one leaf, use this to deal with over-fit
# alias : min_data_per_leaf, min_data
min_data_in_leaf = 50
# minimal sum hessians for one leaf, use this to deal with over-fit
min_sum_hessian_in_leaf = 5.0
# save memory and faster speed for sparse feature, alias: is_sparse
is_enable_sparse = true
# when data is bigger than memory size, set this to true. otherwise set false will have faster speed
# alias: two_round_loading, two_round
use_two_round_loading = false
# true if need to save data to binary file and application will auto load data from binary file next time
# alias: is_save_binary, save_binary
is_save_binary_file = false
# output model file
output_model = output/LightGBM_model.txt
# support continuous train from trained gbdt model
# input_model= trained_model.txt
# output prediction file for predict task
# output_result= prediction.txt
# support continuous train from initial score file
# input_init_score= init_score.txt
# machines list file for parallel training, alias: mlist
machine_list_filename  = lightGBMlist.txt

# max depth of tree
max_depth = 6
# early stopping
early_stopping_rounds = 5
# scale negative and positive weight
scale_pos_weight = 10  

這里面涉及分布式的選項(xiàng)是tree_learner,需要填寫 feature ,data, voting其中之一。

分布式提交,

train_lightgbm() {
    bin/ml-submit \
    --app-type "distlightgbm" \
    --app-name "distLightGBM-yandi" \
    --files train.conf,lightGBM.sh \
    --worker-num 1 \
    --worker-memory 10G \
    --cacheArchive hdfs:/yandi/udf/LightGBM.zip#LightGBM \
    --input-strategy DOWNLOAD \
    --input viewfs://c9/yandi/iris/data#data \
    --output /yandi/iris/output#output \
    --launch-cmd "sh lightGBM.sh"
}

lightGBM.sh,只是在上面的配置文件后面增加了機(jī)器列表,是安裝部署時(shí)配置好的,不用改

cp train.conf train_real.conf
chmod 777 train_real.conf
echo "num_machines = $LIGHTGBM_NUM_MACHINE" >> train_real.conf
echo "local_listen_port = $LIGHTGBM_LOCAL_LISTEN_PORT" >> train_real.conf
./LightGBM/lightgbm config=train_real.conf

需要改的參數(shù)是

  • worker-num
  • worker-memory
  • input:這句話是指定了一個(gè)目錄映射,會把指定hdfs目錄拉到worker節(jié)點(diǎn),并將目錄整體重命名為#后面的部分,對應(yīng)train.conf中的data = data/000000_0valid_data = data/000001_0
  • output:也是一個(gè)目錄映射,在訓(xùn)練完后,會把worker節(jié)點(diǎn)的目錄output,就是#后面那部分,拷貝回到hdfs的目錄上,對應(yīng)train.conf中的output_model = output/LightGBM_model.txt

訓(xùn)練日志

21/06/22 14:15:10 INFO Container: Input path: data@[viewfs://c9/yandi/iris/data/000000_0, viewfs://c9/yandi/iris/data/000001_0, viewfs://c9/yandi/iris/data/000002_0, viewfs://c9/yandi/iris/data/000003_0, viewfs://c9/yandi/iris/data/000004_0]
21/06/22 14:15:10 INFO Container: Downloading input file from viewfs://c9/yandi/iris/data/000000_0 to data/000000_0
21/06/22 14:15:10 INFO Container: Downloading input file from viewfs://c9/yandi/iris/data/000001_0 to data/000001_0
21/06/22 14:15:10 INFO Container: Downloading input file from viewfs://c9/yandi/iris/data/000003_0 to data/000003_0
21/06/22 14:15:10 INFO Container: Downloading input file from viewfs://c9/yandi/iris/data/000004_0 to data/000004_0
21/06/22 14:15:31 INFO Container: PYTHONUNBUFFERED=1
21/06/22 14:15:31 INFO Container: INPUT_FILE_LIST=null
21/06/22 14:15:31 INFO Container: LIGHTGBM_NUM_MACHINE=1
21/06/22 14:15:31 INFO Container: LIGHTGBM_LOCAL_LISTEN_PORT=26628
21/06/22 14:15:31 INFO Container: TF_INDEX=0
21/06/22 14:15:41 INFO Container: Executing command:sh lightGBM.sh
21/06/22 14:15:41 INFO Container: [LightGBM] [Info] Finished loading parameters
21/06/22 14:15:41 INFO ContainerReporter: Starting thread to read cpu metrics
21/06/22 14:15:42 INFO ContainerReporter: Resource Utilization => Cpu: %0, Memory: 0.5G
21/06/22 14:15:51 INFO Container: [LightGBM] [Info] Finished loading data in 9.567071 seconds
21/06/22 14:15:51 INFO Container: [LightGBM] [Info] Number of positive: 53487, number of negative: 1228361
21/06/22 14:15:51 INFO Container: [LightGBM] [Info] Total Bins 4669
21/06/22 14:15:52 INFO Container: [LightGBM] [Info] Number of data: 1281848, number of used features: 22
21/06/22 14:15:54 INFO Container: [LightGBM] [Info] Finished initializing training
21/06/22 14:15:54 INFO Container: [LightGBM] [Info] Started training...
21/06/22 14:15:55 INFO Container: [LightGBM] [Info] [binary:BoostFromScore]: pavg=0.041726 -> initscore=-3.133997
21/06/22 14:15:55 INFO Container: [LightGBM] [Info] Start training from score -3.133997
21/06/22 14:16:02 INFO ContainerReporter: Resource Utilization => Cpu: %249, Memory: 0.8G
21/06/22 14:16:14 INFO Container: [LightGBM] [Info] Iteration:1, training auc : 0.999878
21/06/22 14:16:14 INFO Container: [LightGBM] [Info] Iteration:1, training binary_logloss : 0.0841326
21/06/22 14:16:15 INFO Container: [LightGBM] [Info] Iteration:1, valid_1 auc : 0.999866
21/06/22 14:16:15 INFO Container: [LightGBM] [Info] Iteration:1, valid_1 binary_logloss : 0.0843192
21/06/22 14:16:15 INFO Container: [LightGBM] [Info] 20.487200 seconds elapsed, finished iteration 1
21/06/22 14:16:22 INFO ContainerReporter: Resource Utilization => Cpu: %502, Memory: 0.8G
21/06/22 14:16:23 INFO Container: [LightGBM] [Warning] No further splits with positive gain, best gain: -inf
21/06/22 14:16:25 INFO Container: [LightGBM] [Info] Iteration:2, training auc : 0.999878
21/06/22 14:16:25 INFO Container: [LightGBM] [Info] Iteration:2, training binary_logloss : 0.0723086
21/06/22 14:16:27 INFO Container: [LightGBM] [Info] Iteration:2, valid_1 auc : 0.999866
21/06/22 14:16:27 INFO Container: [LightGBM] [Info] Iteration:2, valid_1 binary_logloss : 0.0724657
21/06/22 14:16:27 INFO Container: [LightGBM] [Info] 32.446888 seconds elapsed, finished iteration 2
21/06/22 14:16:34 INFO Container: [LightGBM] [Warning] No further splits with positive gain, best gain: -inf
21/06/22 14:16:35 INFO Container: [LightGBM] [Info] Iteration:3, training auc : 0.999878
21/06/22 14:16:35 INFO Container: [LightGBM] [Info] Iteration:3, training binary_logloss : 0.0632701
21/06/22 14:16:36 INFO Container: [LightGBM] [Info] Iteration:3, valid_1 auc : 0.999866
21/06/22 14:16:36 INFO Container: [LightGBM] [Info] Iteration:3, valid_1 binary_logloss : 0.0634079
21/06/22 14:16:36 INFO Container: [LightGBM] [Info] 41.907969 seconds elapsed, finished iteration 3
21/06/22 14:16:42 INFO ContainerReporter: Resource Utilization => Cpu: %598, Memory: 0.8G
21/06/22 14:16:47 INFO Container: [LightGBM] [Info] Iteration:4, training auc : 0.99994
21/06/22 14:16:47 INFO Container: [LightGBM] [Info] Iteration:4, training binary_logloss : 0.0561602
21/06/22 14:16:48 INFO Container: [LightGBM] [Info] Iteration:4, valid_1 auc : 0.999929
21/06/22 14:16:48 INFO Container: [LightGBM] [Info] Iteration:4, valid_1 binary_logloss : 0.056288
21/06/22 14:16:48 INFO Container: [LightGBM] [Info] 53.568802 seconds elapsed, finished iteration 4
21/06/22 14:16:56 INFO Container: [LightGBM] [Info] Iteration:5, training auc : 0.999942
21/06/22 14:16:56 INFO Container: [LightGBM] [Info] Iteration:5, training binary_logloss : 0.0500692
21/06/22 14:16:58 INFO Container: [LightGBM] [Info] Iteration:5, valid_1 auc : 0.99993
21/06/22 14:16:58 INFO Container: [LightGBM] [Info] Iteration:5, valid_1 binary_logloss : 0.0501897
21/06/22 14:16:58 INFO Container: [LightGBM] [Info] 63.627517 seconds elapsed, finished iteration 5
21/06/22 14:16:58 INFO Container: [LightGBM] [Info] Finished training
21/06/22 14:16:59 INFO Container: worker_0 ,exit code is : 0
21/06/22 14:16:59 INFO Container: Output path: output#viewfs://c9/yandi/iris/output
21/06/22 14:16:59 INFO Container: Upload output output to remote path viewfs://c9/yandi/iris/output/_temporary/container_e38_1622178278006_5469657_01_000003 finished.
21/06/22 14:16:59 INFO Container: Container container_e38_1622178278006_5469657_01_000003 finish successfully
21/06/22 14:17:02 INFO ContainerReporter: Resource Utilization => Cpu: %0, Memory: 0.41G
21/06/22 14:17:07 WARN Container: received kill signal
21/06/22 14:17:07 INFO Container: Container exit
21/06/22 14:17:07 INFO Container: clean worker process    

參考文獻(xiàn)

https://lightgbm.readthedocs.io/en/latest/Parallel-Learning-Guide.html

版權(quán)聲明

以上文章為本人@迪吉老農(nóng)原創(chuàng),首發(fā)于簡書,文責(zé)自負(fù)。文中如有引用他人內(nèi)容的部分(包括文字或圖片),均已明文指出,或做出明確的引用標(biāo)記。如需轉(zhuǎn)載,請聯(lián)系作者,并取得作者的明示同意。感謝。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容