目標(biāo)
1.spark從hive獲取數(shù)據(jù)對(duì)用戶特征進(jìn)行處理寫(xiě)入hbase
2.保留30天用戶特征數(shù)據(jù),用于模型訓(xùn)練(每天訓(xùn)練一次,離線預(yù)測(cè))
用戶畫(huà)像設(shè)計(jì)
用戶畫(huà)像通俗說(shuō)則是根據(jù)用戶的行為給用戶打標(biāo)簽,我們將用戶行為劃分為以下幾個(gè)維度,分別是:
基礎(chǔ)信息: 用戶年齡,性別,等級(jí)等相關(guān)信息
環(huán)境信息: 用戶使用app時(shí)的地域,設(shè)備,網(wǎng)絡(luò)情況(可分為統(tǒng)計(jì)日,最近7日,最近30日,累計(jì)四個(gè)時(shí)間維度對(duì)特征進(jìn)行統(tǒng)計(jì))
行為偏好: 用戶常玩的app類(lèi)型, 登錄時(shí)間,在線時(shí)長(zhǎng)等(可分為統(tǒng)計(jì)日,最近7日,最近30日,累計(jì)四個(gè)時(shí)間維度對(duì)特征進(jìn)行統(tǒng)計(jì))
付費(fèi)偏好: 用戶充值額度, 付費(fèi)次數(shù), 付費(fèi)商品類(lèi)型, 付費(fèi)時(shí)間段等等(可分為統(tǒng)計(jì)日,最近7日,最近30日,累計(jì)四個(gè)時(shí)間維度對(duì)特征進(jìn)行統(tǒng)計(jì))
社交特征: 如果存在社交系統(tǒng), 可以統(tǒng)計(jì)一下好友聊天, 好友開(kāi)局等信息(可分為統(tǒng)計(jì)日,最近7日,最近30日,累計(jì)四個(gè)時(shí)間維度對(duì)特征進(jìn)行統(tǒng)計(jì))
策略玩法特征: 具體游戲(或其他)應(yīng)用核心內(nèi)容的玩法, 使用等信息(可分為統(tǒng)計(jì)日,最近7日,最近30日,累計(jì)四個(gè)時(shí)間維度對(duì)特征進(jìn)行統(tǒng)計(jì))
調(diào)整數(shù)值特征: 可以通過(guò)該一系列特征來(lái)調(diào)整付費(fèi),留存的字段
用戶畫(huà)像Hbase表結(jié)構(gòu): user_profile
| 列簇 | 字段 | |||
|---|---|---|---|---|
| key (rowKey關(guān)鍵字段) | time 統(tǒng)計(jì)日 | game_id 游戲ID | uid用戶id | |
| info (基礎(chǔ)信息) | $game_sex 游戲性別[man, woman] | $vip vip等級(jí) | $level 等級(jí) | |
| env_tag (環(huán)境偏好特征) | $address 地址分布[.*] | $device 設(shè)備分布 [huawei, oppo, apple, ...] | $os 系統(tǒng)分布[android, ios] | $sim 運(yùn)營(yíng)商分布 [yidong, liantong, ...] |
| favor_tag (行為偏好特征) | $game 常玩游戲類(lèi)型[tcg, rpg, mmo, ...] | $login_time 登錄時(shí)段分布[00~23] | $online_time 在線時(shí)長(zhǎng)分布[d+] | |
| pay_tag (付費(fèi)偏好特征) | $pay_amount 充值額度分布[count、max、min、avg] | $pay_frequency 付費(fèi)次數(shù)分布[count、max、min、avg] | $pay_money 支付方式分布(基于金額) [wechat, applepay, ...] | $pay_count 支付方式分布(基于次數(shù)) [wechat, applepay, ...] |
| social_tag (社交偏好特征) | $new_friend_count 新增好友[count、max、min、avg] | $chat_count 好友聊天次數(shù)[count、max、min、avg] | $chat_count_friend 游戲好友聊天次數(shù)[d+] | $chat_count_wechat 微信好友聊天次數(shù)[d+] |
| strategy_tag (策略偏好特征) | $pvp_count 競(jìng)技次數(shù)[count、max、min、avg] | $pve_count 副本次數(shù)[count、max、min、avg] | $role_type 角色類(lèi)型使用 [ap, ad, other] | $fight_type 戰(zhàn)斗類(lèi)型 [pvp, pve] |
| adjust_tag (調(diào)整數(shù)值特征) | $allowance 破產(chǎn)補(bǔ)貼 |
[圖片上傳失敗...(image-5e7d98-1643355969865)]
Hive數(shù)據(jù)結(jié)構(gòu)
hive
show create table action (此為表名, 我這邊是action表)
輸入輸出格式為ORC, Presto針對(duì)這種格式的數(shù)據(jù)做了優(yōu)化查詢, 如果是impala查詢則使用parquet格式。
CREATE TABLE `action`(
`uid` string,
`uid_type` string,
`agent` string,
`ip` string,
`timestamp` timestamp,
`time` timestamp,
`year` string,
`month` string,
`week` string,
`hour` string,
`minute` string,
`properties` map<string,string>)
PARTITIONED BY (
`game_id` int,
`timezone` string,
`event` string,
`day` date)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'colelction.delim'=',',
'field.delim'='\t',
'mapkey.delim'=':',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://slaves01:8020/warehouse/tablespace/managed/hive/event.db/action'
TBLPROPERTIES (
'auto-compaction'='true',
'bucketing_version'='2',
'compaction.file-size'='128MB',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.trigger'='process-time',
'sink.shuffle-by-partition.enable'='true',
'transient_lastDdlTime'='1642571371')
Spark2Hbase邏輯處理
1.獲取游戲列表和對(duì)應(yīng)時(shí)區(qū)(或者傳入字符串?dāng)?shù)組)
2.創(chuàng)建用戶畫(huà)像表user_profile(設(shè)置TTL為30天,壓縮方式為snappy)
3.計(jì)算不同特征tag, 寫(xiě)入用戶畫(huà)像表
├── pom.xml
└── src
└── main
├── resources
│ ├── application.conf # 默認(rèn)配置
│ ├── code2area.csv
│ ├── hive-site.xml # 將/usr/hdp/current/spark2-client/conf/hive-site.xml拷貝過(guò)來(lái)
│ └── reference.conf
└── scala
├── com
│ └── carol
│ └── bigdata
│ ├── App.scala # 程序主入口
│ ├── Config.scala # 配置參數(shù)
│ ├── Task.scala # 程序主要任務(wù)
│ ├── constant
│ │ └── KVConstant.scala
│ ├── task # 具體任務(wù)包
│ │ └── feature
│ │ ├── CalEnvTag.scala # 計(jì)算環(huán)境偏好特征
│ │ ├── CalFavorTag.scala # 計(jì)算行為偏好特征
│ │ ├── CalPayTag.scala # 計(jì)算付費(fèi)特征
│ │ └── cag
│ │ ├── calAdjustTag.scala # 計(jì)算調(diào)整特征(此處由用戶自行填寫(xiě))
│ │ ├── calSocialTag.scala # 計(jì)算社交特征
│ │ └── calStrategyTag.scala # 計(jì)算策略特征
│ │ └── label
│ │ └── calRetentionLabel.scala # 計(jì)算留存標(biāo)簽
│ │ └── model
│ │ ├── algo # 算法模型
│ │ │ ├── DT.scala
│ │ │ ├── LR.scala
│ │ │ ├── ModelTrait.scala # 定義算法模型接口
│ │ │ ├── ModelUtil.scala
│ │ │ ├── RF.scala
│ │ │ └── SVM.scala
│ │ ├── feature
│ │ │ └── FeatureUtil.scala # 特征工程
│ │ └── train
│ │ └── TrainRetention.scala # 留存預(yù)測(cè)訓(xùn)練Demo
│ └── utils
│ ├── FeatureUtil.scala # 計(jì)算特征公共函數(shù)
│ ├── Flag.scala # 命令行參數(shù)
│ ├── FuncUtil.scala
│ ├── HBaseFilter.scala # hbase建表,讀過(guò)濾操作
│ ├── HBaseUtil.scala # hbase建表,讀寫(xiě)操作
│ ├── RddReader.scala # 讀取hive/hdfs轉(zhuǎn)換為RDD
│ └── TimeUtil.scala # 時(shí)間處理
└── org
└── apache
└── hadoop
└── hive
└── shims
└── ShimLoader.java # hadoop3不支持,在源碼93行,加入對(duì)case 3 version的支持
└── spark
└── ml
└── feature
└── VectorDisassembler.scala # 將合并列拆分
樣本設(shè)計(jì)
標(biāo)簽設(shè)計(jì)
假設(shè)此處需要預(yù)測(cè)活躍留存, 以便調(diào)整相關(guān)特征提升用戶留存率。key為hbase表的rowkey相關(guān)字段, retention為標(biāo)簽字段, active_r1代表用戶在次日仍然活躍則為1, 否則為0, 因此我們將此模型抽象為一個(gè)二分類(lèi)模型。(TTL 30days)
| 列簇 | 字段 | |||||
|---|---|---|---|---|---|---|
| key | time 統(tǒng)計(jì)日 | game_id 游戲ID | uid 用戶ID | |||
| retention | active_r1 次日活躍留存[0,1] | active_r2 2日活躍留存[0,1] | active_r3 3日活躍留存[0,1] | active_r7 7日活躍留存[0,1] | active_r15 15日活躍留存[0,1] | active_r30 30日活躍留存[0,1] |
| prediction | pred_active_r1 次日活躍留存[0,1] | pred_active_r2 2日活躍留存[0,1] | pred_active_r3 3日活躍留存[0,1] | pred_active_r7 7日活躍留存[0,1] | pred_active_r15 15日活躍留存[0,1] | pred_active_r30 30日活躍留存[0,1] |

樣本標(biāo)簽表
根據(jù)用戶畫(huà)像和標(biāo)簽表, 合并成樣本標(biāo)簽表,進(jìn)行特征工程處理, 其中合并原則為以每天標(biāo)簽表為左表join用戶畫(huà)像表。(在特征工程中合并訓(xùn)練)
time統(tǒng)計(jì), game_id 游戲ID, uid 用戶ID, 用戶特征(env_tag, favor_tag,...), 實(shí)際標(biāo)簽(active_r1 0/1)
算法模型
特征工程
將特征列類(lèi)型劃分為int, double, map,string列, 分別對(duì)不同的列標(biāo)記tag或分段, 也可以對(duì)數(shù)值列進(jìn)行標(biāo)準(zhǔn)化。如圖所示, 最后一列是次日留存的標(biāo)簽, 前面是隨意摘取的特征列, 這里使用的是spark的hash特征向量化。

模型封裝
定義模型初始化, 訓(xùn)練, 預(yù)測(cè), 保存接口, 具體的算法實(shí)現(xiàn)接口即可。
package com.carol.bigdata.task.model.algo
import java.io.File
import org.apache.spark.ml.classification.{Classifier, OneVsRest}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.DataFrame
trait ModelTrait {
// 未實(shí)現(xiàn)函數(shù),子類(lèi)集成必須實(shí)現(xiàn),已實(shí)現(xiàn)函數(shù)子類(lèi)可以直接使用
def init(params: Map[String, Any]): Unit
// 構(gòu)建pipeline模型
def buildPipeline(featuresCol: String = "features",
labelCol: String = "label",
rawPredictionCol: String = "rawPrediction",
predictionCol: String = "prediction",
objective: String = "binary",
numClass: Int = 2): Pipeline
// 交叉驗(yàn)證
def buildValidator(pipeline: Pipeline,
seed: Int = 1,
numFolds: Int = 2,
parallelNum: Int = 2,
objective: String = "binary"): CrossValidator
// 基于二分類(lèi)器構(gòu)建任意分類(lèi)的PipeLine
def buildOvrTree(binaryModel: Classifier[_, _, _],
featuresCol: String = "features",
labelCol: String = "label",
rawPredictionCol: String = "rawPrediction",
predictionCol: String = "prediction",
objective: String = "binary"): OneVsRest = {
// 構(gòu)造多分類(lèi)器
val ovrTree: OneVsRest = new OneVsRest()
.setClassifier(binaryModel)
.setFeaturesCol(featuresCol)
.setLabelCol(labelCol)
.setPredictionCol(predictionCol)
ovrTree
}
// 基于pipeline和gridBuilder構(gòu)建交叉驗(yàn)證器
def buildValidatorFromGrid(pipeline: Pipeline,
gridBuilder: ParamGridBuilder,
seed: Int = 1,
numFolds: Int = 2,
parallelNum: Int = 2,
objective: String = "binary"): CrossValidator = {
// 交叉驗(yàn)證
val paramGrid: Array[ParamMap] = gridBuilder.build()
// 評(píng)估器
val evaluator = {
if (objective == "binary")
new BinaryClassificationEvaluator
else new MulticlassClassificationEvaluator
}
// 交叉驗(yàn)證模型
val cv: CrossValidator = new CrossValidator()
.setEstimator(pipeline)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(evaluator)
.setSeed(seed)
.setNumFolds(numFolds) // Use 3+ in practice
.setParallelism(parallelNum) // Evaluate up to 2 parameter settings in parallel
//.setCollectSubModels(true) // specified to collect all validated models
cv
}
// 獲取最優(yōu)超參
def getBestParams(crossModel: CrossValidatorModel): Map[String, Any] = {
val bestParamsMap = crossModel.getEstimatorParamMaps.zip(crossModel.avgMetrics).maxBy(_._2)._1
bestParamsMap.toSeq.map(pair => (pair.param.name, pair.value)).toMap
}
// 保存模型
def saveModel(pipeline: PipelineModel, modelPath: String = "model"): Unit = {
// save到本地貨HDFS,供PipelineModel加載
println(s"pipeline model saving...")
pipeline.write.overwrite.save(modelPath)
println(s"pipeline model save success to $modelPath")
}
// 評(píng)估模型
def evalModel(evalData: DataFrame, objective: String = "binary"): Unit = {
val evaluator = {
if (objective == "binary")
new BinaryClassificationEvaluator
else new MulticlassClassificationEvaluator
}
val accuracy: Double = evaluator.evaluate(evalData)
println("accuracy:", accuracy)
}
// 更新微調(diào)參數(shù)
def updateTuneParams(bestParamsMap: ParamMap): Unit
def updateTuneParamsFromCV(crossModel: CrossValidatorModel,
maxBy: Boolean = true,
objective: String = "binary"): Unit
}
模型應(yīng)用
將調(diào)整數(shù)值特征劃分為一系列段位, 將當(dāng)日活躍用戶左連用戶畫(huà)像特征并按段位遍歷組成新的特征標(biāo)簽表, 使用訓(xùn)練好的模型進(jìn)行預(yù)測(cè), 選擇用戶留下來(lái)的分段區(qū)間, 從小到大進(jìn)行排列, 寫(xiě)入離線預(yù)測(cè)表。
系列文章
第一篇: Ambari自動(dòng)化部署
第二篇: 數(shù)據(jù)埋點(diǎn)設(shè)計(jì)和SDK源碼
第三篇: 數(shù)據(jù)采集和驗(yàn)證方案
第四篇: ETL實(shí)時(shí)方案: Kafka->Flink->Hive
第五篇: ETL用戶數(shù)據(jù)處理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函數(shù)
第七篇: 用戶畫(huà)像和留存預(yù)測(cè)